定制并发类(十)实现一个基于优先级的传输队列

声明:本文是《 Java 7 Concurrency Cookbook》的第七章, 作者: Javier Fernández González 译者:郑玉婷

实现一个基于优先级的传输队列

Java 7 API 提供几种与并发应用相关的数据类型。从这里面,我们想来重点介绍以下2种数据类型:

  • LinkedTransferQueue:这个数据类型支持那些有生产者和消费者结构的程序。 在那些应用,你有一个或者多个数据生产者,一个或多个数据消费者和一个被生产者和消费者共享的数据类型。生产者把数据放入数据结构内,然后消费者从数据结构内提取数据。如果数据结构为空,消费者会被阻塞直到有数据可以消费。如果数据结构满了,生产者就会被阻塞直到有空位来放数据。
  • PriorityBlockingQueue:在这个数据结构,元素是按照顺序储存的。元素们必须实现 带有 compareTo() 方法的 Comparable 接口。当你在结构中插入数据时,它会与数据元素对比直到找到它的位置。

LinkedTransferQueue 的元素是按照抵达顺序储存的,所以越早到的越先被消耗。你有可能需要开发 producer/ consumer 程序,它的消耗顺序是由优先级决定的而不是抵达时间。在这个指南,你将学习如何实现在 producer/ consumer 问题中使用的数据结构,这些元素将被按照他们的优先级排序,级别高的会先被消耗。

准备

指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans,打开并创建一个新的java任务。

怎么做呢…

按照这些步骤来实现下面的例子::

001 //1.   创建一个类,名为 MyPriorityTransferQueue,扩展 PriorityBlockingQueue 类并实现 TransferQueue 接口。
002 public class MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> {
003  
004 //2.   声明一个私有 AtomicInteger 属性,名为 counter,用来储存正在等待元素的消费者的数量。
005 private AtomicInteger counter;
006  
007 //3.   声明一个私有 LinkedBlockingQueue 属性,名为 transferred。
008 private LinkedBlockingQueue<E> transfered;
009  
010 //4.   声明一个私有 ReentrantLock 属性,名为 lock。
011 private ReentrantLock lock;
012  
013 //5.   实现类的构造函数,初始化它的属性值。
014 public MyPriorityTransferQueue() {
015     counter=new AtomicInteger(0);
016     lock=new ReentrantLock();
017     transfered=new LinkedBlockingQueue<E>();
018 }
019  
020 //6.   实现 tryTransfer() 方法。此方法尝试立刻发送元素给正在等待的消费者(如果可能)。如果没有任何消费者在等待,此方法返回 false 值。
021 @Override
022 public boolean tryTransfer(E e) {
023     lock.lock();
024     boolean value;
025     if (counter.get()==0) {
026         value=false;
027     } else {
028         put(e);
029         value=true;
030     }
031     lock.unlock();
032     return value;
033 }
034  
035 //7.    实现 transfer() 方法。此方法尝试立刻发送元素给正在等待的消费者(如果可能)。如果没有任何消费者在等待,
036 此方法把元素存入一个特殊queue,为了发送给第一个尝试获取一个元素的消费者并阻塞线程直到元素被消耗。
037 @Override
038 public void transfer(E e) throws InterruptedException {
039     lock.lock();
040     if (counter.get()!=0) {
041         put(e);
042         lock.unlock();
043     } else {
044         transfered.add(e);
045         lock.unlock();
046         synchronized (e) {
047             e.wait();
048         }
049     }
050 }
051  
052 //8.   实现 tryTransfer() 方法,它接收3个参数: 元素,和需要等待消费者的时间(如果没有消费者的话),和用来注明时间的单位。如果有消费者在等待,立刻发送元素。否则,转化时间到毫秒并使用 wait() 方法让线程进入休眠。当消费者取走元素时,如果线程在 wait() 方法里休眠,你将使用 notify() 方法唤醒它。
053 @Override
054 public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException {
055     lock.lock();
056     if (counter.get()!=0) {
057         put(e);
058         lock.unlock();
059         return true;
060     } else {
061         transfered.add(e);
062         long newTimeout= TimeUnit.MILLISECONDS.convert(timeout, unit);
063         lock.unlock();
064         e.wait(newTimeout);
065         lock.lock();
066  
067         if (transfered.contains(e)) {
068             transfered.remove(e);
069             lock.unlock();
070             return false;
071         } else {
072             lock.unlock();
073             return true;
074         }
075     }
076 }
077  
078 //9.   实现 hasWaitingConsumer() 方法。使用 counter 属性值来计算此方法的返回值。如果counter 的值大于0,放回 true。不然,返回 false。
079 @Override
080 public boolean hasWaitingConsumer() {
081     return (counter.get()!=0);
082 }
083  
084 //10. 实现 getWaitingConsumerCount() 方法。返回counter 属性值。
085 @Override
086 public int getWaitingConsumerCount() {
087     return counter.get();
088 }
089  
090 //11.实现 take() 方法。此方法是当消费者需要元素时被消费者调用的。首先,获取之前定义的锁并增加在等待的消费者数量。
091 @Override
092 public E take() throws InterruptedException {
093     lock.lock();
094     counter.incrementAndGet();
095  
096 //12.如果在 transferred queue 中无任何元素。释放锁并使用 take() 方法尝试从queue中获取元素,此方法将让线程进入睡眠直到有元素可以消耗。
097 E value=transfered.poll();
098 if (value==null) {
099     lock.unlock();
100     value=super.take();
101     lock.lock();
102  
103 //13. 否则,从transferred queue 中取走元素并唤醒正在等待要消耗元素的线程(如果有的话)。
104 } else {
105     synchronized (value) {
106         value.notify();
107     }
108 }
109  
110 //14. 最后,增加正在等待的消费者的数量并释放锁。
111 counter.decrementAndGet();
112 lock.unlock();
113 return value;
114 }
115  
116 //15. 实现一个类,名为 Event,扩展 Comparable 接口,把 Event 类参数化。
117 public class Event implements Comparable<Event> {
118  
119 //16. 声明一个私有 String 属性,名为 thread,用来储存创建事件的线程的名字。
120 private String thread;
121  
122 //17.  声明一个私有 int 属性,名为 priority,用来储存事件的优先级。
123 private int priority;
124  
125 //18. 实现类的构造函数,初始化它的属性值。
126 public Event(String thread, int priority){
127     this.thread=thread;
128     this.priority=priority;
129 }
130  
131 //19. 实现一个方法,返回 thread 属性值。
132 public String getThread() {
133     return thread;
134 }
135  
136 //20. 实现一个方法,返回 priority  属性值。
137 public int getPriority() {
138 return priority;
139 }
140  
141 //21. 实现 compareTo() 方法。此方法把当前事件与接收到的参数事件进行对比。返回 -1,如果当前事件的优先级的级别高于参数;返回 1,如果当前事件的优先级低于参数;如果相等,则返回 0。你将获得一个按优先级递减顺序排列的list。有高等级的事件就会被排到queue的最前面。
142 public int compareTo(Event e) {
143     if (this.priority>e.getPriority()) {
144         return -1;
145     } else if (this.priority<e.getPriority()) {
146         return 1;
147     } else {
148         return 0;
149     }
150 }
151  
152 //22. 实现一个类,名为 Producer,它实现 Runnable 接口。
153 public class Producer implements Runnable {
154  
155 //23. 声明一个私有 MyPriorityTransferQueue 属性,接收参数化的 Event  类属性,名为 buffer,用来储存这个生产者生成的事件。
156 private MyPriorityTransferQueue<Event> buffer;
157  
158 //24. 实现类的构造函数,初始化它的属性值。
159 public Producer(MyPriorityTransferQueue<Event> buffer) {
160     this.buffer=buffer;
161 }
162  
163 //25. 这个类的实现 run() 方法。创建 100 个 Event 对象,用他们被创建的顺序决定优先级(越先创建的优先级越高)并使用 put() 方法把他们插入queue中。
164 public void run() {
165     for (int i=0; i<100; i++) {
166         Event event=new Event(Thread.currentThread().getName(),i);
167         buffer.put(event);
168     }
169 }
170  
171 //26. 实现一个类,名为 Consumer,它要实现 Runnable 接口。
172 public class Consumer implements Runnable {
173  
174 //27.  声明一个私有 MyPriorityTransferQueue 属性,参数化 Event 类属性,名为 buffer,用来获取这个类的事件消费者。
175 private MyPriorityTransferQueue<Event> buffer;
176  
177 //28. 实现类的构造函数,初始化它的属性值。
178 public Consumer(MyPriorityTransferQueue<Event> buffer) {
179     this.buffer=buffer;
180 }
181  
182 //29. 实现 run() 方法。它使用 take() 方法消耗1002 Events (这个例子实现的全部事件)并把生成事件的线程数量和它的优先级别写入操控台。
183 @Override
184 public void run() {
185     for (int i=0; i<1002; i++) {
186         try {
187             Event value=buffer.take();
188             System.out.printf("Consumer: %s: %d\n",value. getThread(),value.getPriority());
189         } catch (InterruptedException e) {
190             e.printStackTrace();
191         }
192     }
193 }
194  
195 //30. 创建例子的主类通过创建一个类,名为 Main 并添加 main()方法。
196 public class Main {
197  
198 public static void main(String[] args) throws Exception {
199  
200 //31. 创建一个 MyPriorityTransferQueue 对象,名为 buffer。
201 MyPriorityTransferQueue<Event> buffer=new MyPriorityTransferQu eue<Event>();
202  
203 //32. 创建一个 Producer 任务并运行 10 线程来执行任务。
204 Producer producer=new Producer(buffer);
205 Thread producerThreads[]=new Thread[10];
206 for (int i=0; i<producerThreads.length; i++) {
207     producerThreads[i]=new Thread(producer);
208     producerThreads[i].start();
209 }
210  
211 //33.创建并运行一个 Consumer 任务。
212 Consumer consumer=new Consumer(buffer);
213 Thread consumerThread=new Thread(consumer);
214 consumerThread.start();
215  
216 //34. 写入当前的消费者数量。
217 System.out.printf("Main: Buffer: Consumer count: %d\n",buffer. getWaitingConsumerCount());
218  
219 //35. 使用 transfer() 方法传输一个事件给消费者。
220 Event myEvent=new Event("Core Event",0);
221 buffer.transfer(myEvent);
222 System.out.printf("Main: My Event has ben transfered.\n");
223  
224 //36. 使用 join() 方法等待生产者的完结。
225 for (int i=0; i<producerThreads.length; i++) {
226     try {
227         producerThreads[i].join();
228     } catch (InterruptedException e) {
229         e.printStackTrace();
230     }
231 }
232  
233 //37.  让线程休眠1秒。
234 TimeUnit.SECONDS.sleep(1);
235  
236 //38.写入当前的消费者数量。
237 System.out.printf("Main: Buffer: Consumer count: %d\n",buffer. getWaitingConsumerCount());
238  
239 //39. 使用 transfer() 方法传输另一个事件。
240 myEvent=new Event("Core Event 2",0);
241 buffer.transfer(myEvent);
242  
243 //40. 使用 join() 方法等待消费者完结。
244 consumerThread.join();
245  
246 //41. 写信息表明程序结束。
247 System.out.printf("Main: End of the program\n");

它是怎么工作的…

在这个指南,你已经实现了 MyPriorityTransferQueue 数据结构。这个数据类型是在 producer/consumer 问题中使用的,它的元素是按照优先级排列的。由于 Java 不支持多个继承,所以你首先要决定的是 MyPriorityTransferQueue 类的基类。你扩展了 PriorityBlockingQueue 类,来实现在结构中插入数据按照优先级排序。你也实现了 TransferQueue 接口,添加了与 producer/consumer 相关的3个方法。

MyPriortyTransferQueue 类有以下2个属性:

  1. AtomicInteger 属性,名为 counter: 此属性储存了正在等待从数据类型提取元素的消费者的数量。当一个消费者调用 take()操作来从数据类型中提取元素时,counter 数增加。当消费者结束 take() 操作的执行时,counter 数再次增加。在 hasWaitingConsumer() 和 getWaitingConsumerCount() 方法的实现中使用到了 counter。
  2. ReentrantLock 属性,名为 lock: 此属性是用来控制访问已实现的操作。只有一个线程可以用数据类型。最后一个,LinkedBlockingQueue list 用来储存传输的元素。

在 MyPriorityTransferQueue 中,你实现了一些方法。全部方法都在 TransferQueue 接口中声明了和在PriorityBlockingQueue 接口实现的 take() 方法。在之前已经描述了2个方法了。来看看剩下的方法的描述:

  1. tryTransfer(E e): 此方法尝试直接发送元素给消费者。如果有消费者在等待,此方法储存元素到 priority queue 中为了立刻提供给消费者,并返回 true 值。如果没有消费者在等待,方法返回 false 值。
  2. transfer(E e): 此方法直接发送元素给消费者。如果有消费者在等待,此方法储存元素到 priority queue 中为了立刻提供给消费者。

否则,把元素储存到已传输的元素list 并阻塞线程直到元素被消耗。当线程进入休眠时,你要释放锁,如果不的话,你就阻塞了queue。

  • tryTransfer(E e, long timeout, TimeUnit unit): 此方法与 transfer() 方法相似,只是它的线程被阻塞的时间段是由参数决定的。当线程进入休眠时,你要释放锁,如果不的话,你就阻塞了queue。
  • take(): 此方法返回下一个要被消耗的元素。如果在 transferred 元素list中有元素,就从list中取走元素。否则,就从 priority queue 中取元素。

一旦你实现了数据类型,你就实现了 Event 类。它就是在数据类型里储存的元素构成的类。Event 类有2个属性用来储存生产者的ID和事件的优先级,并实现了 Comparable 接口,为了满足你的数据类型的需要。

接着,你实现了 Producer 和 Consumer 类。在这个例子中,你有 10 个生产者和一个消费者,他们共享同一个 buffer。每个生产者生成100个事件,他们的优先级是递增的, 所以有高优先级的事件在越后面才生成。

例子的主类创建了一个 MyPriorityTransferQueue 对象,10个生产者,和一个消费者,然后使用MyPriorityTransferQueue buffer 的 transfer() 方法来传输2个事件到 buffer。

以下截图是程序运行的部分输出:

你可以发现有着高级别的事件如何先被消费,和一个消费者如何消费传输的事件。

时间: 2024-10-01 19:22:14

定制并发类(十)实现一个基于优先级的传输队列的相关文章

定制并发类(一)引言

声明:本文是< Java 7 Concurrency Cookbook>的第七章,作者: Javier Fernández González 译者:许巧辉 在这个文章中,我们将包含: 定制ThreadPoolExecutor类 实现一个基于优先级的Executor类 实现ThreadFactory接口生成自定义的线程 在一个Executor对象中使用我们的ThreadFactory 定制任务运行在一个计划的线程池中 实现ThreadFactory接口生成自定义的线程给Fork/Join框架 定

定制并发类(三)实现一个基于优先级的Executor类

声明:本文是< Java 7 Concurrency Cookbook>的第七章,作者: Javier Fernández González     译者:许巧辉 实现一个基于优先级的Executor类 在Java并发API的第一个版本中,你必须创建和运行应用程序中的所有线程.在Java版本5中,随着执行者框架(Executor framework)的出现,对于并发任务的执行,一个新的机制被引进. 使用执行者框架(Executor framework),你只要实现你的任务并把它们提交给执行者.

定制并发类(五)在一个Executor对象中使用我们的ThreadFactory

声明:本文是< Java 7 Concurrency Cookbook>的第七章,作者: Javier Fernández González     译者:许巧辉 在一个Executor对象中使用我们的ThreadFactory 在前面的指南中,实现ThreadFactory接口生成自定义线程,我们引进了工厂模式和提供如何实现一个实现ThreadFactory接口的线程的工厂例子. 执行者框架(Executor framework)是一种机制,它允许你将线程的创建与执行分离.它是基于Execu

定制并发类(二)定制ThreadPoolExecutor类

声明:本文是< Java 7 Concurrency Cookbook >的第七章,作者: Javier Fernández González     译者:许巧辉 定制ThreadPoolExecutor类 执行者框架(Executor framework)是一种机制,允许你将线程的创建与执行分离.它是基于Executor和ExecutorService接口及其实现这两个接口的ThreadPoolExecutor类.它有一个内部的线程池和提供允许你提交两种任务给线程池执行的方法.这些任务是:

定制并发类(六)自定义在计划的线程池内运行的任务

声明:本文是< Java 7 Concurrency Cookbook>的第七章, 作者: Javier Fernández González 译者:郑玉婷 自定义在计划的线程池内运行的任务 计划的线程池是 Executor 框架的基本线程池的扩展,允许你定制一个计划来执行一段时间后需要被执行的任务. 它通过 ScheduledThreadPoolExecutor 类来实现,并允许运行以下这两种任务: Delayed 任务:这种任务在一段时间后仅执行一次. Periodic 任务:这种任务在延

定制并发类(八)自定义在 Fork/Join 框架中运行的任务

声明:本文是< Java 7 Concurrency Cookbook>的第七章, 作者: Javier Fernández González 译者:郑玉婷 自定义在 Fork/Join 框架中运行的任务 执行者框架分开了任务的创建和运行.这样,你只要实现 Runnable 对象来使用 Executor 对象.你可以发送 Runnable 任务给执行者,然后它会创建,管理,并终结必要的线程来执行这些任务. Java 7 在 Fork/Join 框架中提供了特殊的执行者.这个框架是设计用来解决那

定制并发类(九)实现一个自定义的Lock类

声明:本文是< Java 7 Concurrency Cookbook >的第七章,作者: Javier Fernández González     译者:许巧辉 实现一个自定义的Lock类 锁是Java并发API提供的基本同步机制之一.它允许程序员保护代码的临界区,所以,在某个时刻只有一个线程能执行这个代码块.它提供以下两种操作: lock():当你想要访问一个临界区时,调用这个方法.如果有其他线程正在运行这个临界区,其他线程将阻塞,直到它们被这个锁唤醒,从而获取这个临界区的访问. unl

定制并发类(四)实现ThreadFactory接口生成自定义的线程

声明:本文是< Java 7 Concurrency Cookbook>的第七章,作者: Javier Fernández González     译者:许巧辉 实现ThreadFactory接口生成自定义的线程 在面向对象编程的世界中,工厂模式(factory pattern)是一个被广泛使用的设计模式.它是一个创建模式,它的目的是开发一个类,这个类的使命是创建一个或多个类的对象.然后,当我们要创建一个类的一个对象时,我们使用这个工厂而不是使用new操作. 使用这个工厂,我们集中对象的创建

定制并发类(七)实现ThreadFactory接口生成自定义的线程给Fork/Join框架

声明:本文是< Java 7 Concurrency Cookbook>的第七章,作者: Javier Fernández González     译者:许巧辉 实现ThreadFactory接口生成自定义的线程给Fork/Join框架 Fork/Join框架是Java7中最有趣的特征之一.它是Executor和ExecutorService接口的一个实现,允许你执行Callable和Runnable任务而不用管理这些执行线程. 这个执行者面向执行能被拆分成更小部分的任务.主要组件如下: 一