线程同步工具(七)在并发任务间交换数据

声明:本文是《 Java 7 Concurrency Cookbook》的第三章, 作者: Javier Fernández González 译者:郑玉婷

在并发任务间交换数据

Java 并发 API 提供了一种允许2个并发任务间相互交换数据的同步应用。更具体的说,Exchanger 类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的。

这个类在遇到类似生产者和消费者问题时,是非常有用的。来一个非常经典的并发问题:你有相同的数据buffer,一个或多个数据生产者,和一个或多个数据消费者。只是Exchange类只能同步2个线程,所以你只能在你的生产者和消费者问题中只有一个生产者和一个消费者时使用这个类。

在这个指南,你将学习如何使用 Exchanger 类来解决只有一个生产者和一个消费者的生产者和消费者问题。

准备

这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。

怎么做呢

按照这些步骤来实现下面的例子:

01 package tool;
02 import java.util.List;
03 import java.util.concurrent.Exchanger;
04  
05 //1. 首先,从实现producer开始吧。创建一个类名为Producer并一定实现 Runnable 接口。
06 public class Producer implements Runnable {
07  
08 // 2. 声明 List<String>对象,名为 buffer。这是等等要被相互交换的数据类型。
09 private List<String> buffer;
10  
11 // 3. 声明 Exchanger<List<String>>; 对象,名为exchanger。这个 exchanger 对象是用来同步producer和consumer的。
12 private final Exchanger<List<String>> exchanger;
13  
14 // 4. 实现类的构造函数,初始化这2个属性。
15 public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
16 this.buffer = buffer;
17 this.exchanger = exchanger;
18 }
19  
20 // 5. 实现 run() 方法. 在方法内,实现10次交换。
21 @Override
22 public void run() {
23 int cycle = 1;
24 for (int i = 0; i < 10; i++) {           System.out.printf("Producer: Cycle %d\n", cycle);
25  
26 // 6. 在每次循环中,加10个字符串到buffer。
27 for (int j = 0; j <10; j++) {
28 String message = "Event " + ((i * 10) + j);
29 System.out.printf("Producer: %s\n", message);
30 buffer.add(message);
31 }
32  
33 // 7. 调用 exchange() 方法来与consumer交换数据。此方法可能会抛出InterruptedException 异常, 加上处理代码。
34 try {
35 buffer = exchanger.exchange(buffer);
36 } catch (InterruptedException e) {
37 e.printStackTrace();
38 }
39 System.out.println("Producer: " + buffer.size());
40 cycle++;
41 }
42 }
43 }
01 //8. 现在, 来实现consumer。创建一个类名为Consumer并一定实现 Runnable 接口。
02 package tool;
03 import java.util.List;
04 import java.util.concurrent.Exchanger;
05 public class Consumer implements Runnable {
06  
07 // 9. 声明名为buffer的 List<String>对象。这个对象类型是用来相互交换的。
08 private List<String> buffer;
09  
10 // 10. 声明一个名为exchanger的 Exchanger<List<String>> 对象。用来同步 producer和consumer。
11 private final Exchanger<List<String>> exchanger;
12  
13 // 11. 实现类的构造函数,并初始化2个属性。
14 public Consumer(List<String>buffer, Exchanger<List<String>> exchanger) {
15 this.buffer = buffer;
16 this.exchanger = exchanger;
17 }
18  
19 // 12. 实现 run() 方法。在方法内,实现10次交换。
20 @Override
21 public void run() {
22 int cycle = 1;
23 for (int i = 0; i < 10; i++) {
24 System.out.printf("Consumer: Cycle %d\n", cycle);
25  
26 // 13. 在每次循环,首先调用exchange()方法来与producer同步。Consumer需要消耗数据。此方法可能会抛出InterruptedException异常, 加上处理代码。
27 try {
28 buffer = exchanger.exchange(buffer);
29 } catch (InterruptedException e) {              e.printStackTrace();
30 }
31  
32 // 14. 把producer发来的在buffer里的10字符串写到操控台并从buffer内删除,留空。System.out.println("Consumer: " + buffer.size());
33 for (int j = 0; j <10; j++) {
34 String message = buffer.get(0);
35 System.out.println("Consumer: " + message);
36 buffer.remove(0);
37 }
38 cycle++;
39 }

 

01 //15.现在,实现例子的主类通过创建一个类,名为Core并加入 main() 方法。
02 package tool;
03 import java.util.ArrayList;
04 mport java.util.List;
05 import java.util.concurrent.Exchanger;
06  
07 public class Core {
08 public static void main(String[] args) {
09  
10 // 16. 创建2个buffers。分别给producer和consumer使用.
11 List<String> buffer1 = new ArrayList<String>();
12 List<String> buffer2 = new ArrayList<String>();
13  
14 // 17. 创建Exchanger对象,用来同步producer和consumer。
15 Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
16  
17 // 18. 创建Producer对象和Consumer对象。
18 Producer producer = new Producer(buffer1, exchanger);
19 Consumer consumer = new Consumer(buffer2, exchanger);
20  
21 // 19. 创建线程来执行producer和consumer并开始线程。
22 Thread threadProducer = new Thread(producer);
23 Thread threadConsumer = new Thread(consumer); threadProducer.start();
24 threadConsumer.start();
25 }

它是怎么工作的…

消费者开始时是空白的buffer,然后调用Exchanger来与生产者同步。因为它需要数据来消耗。生产者也是从空白的buffer开始,然后创建10个字符串,保存到buffer,并使用exchanger与消费者同步。

在这儿,2个线程(生产者和消费者线程)都是在Exchanger里并交换了数据类型,所以当消费者从exchange() 方法返回时,它有10个字符串在buffer内。当生产者从 exchange() 方法返回时,它有空白的buffer来重新写入。这样的操作会重复10遍。

如你执行例子,你会发现生产者和消费者是如何并发的执行任务和在每个步骤它们是如何交换buffers的。与其他同步工具一样会发生这种情况,第一个调用 exchange()方法会进入休眠直到其他线程的达到。

更多…

Exchanger 类有另外一个版本的exchange方法:

  • exchange(V data, long time, TimeUnit unit):V是声明Phaser的参数种类(例子里是 List)。 此线程会休眠直到另一个线程到达并中断它,或者特定的时间过去了。TimeUnit类有多种常量:DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, 和 SECONDS。
时间: 2024-08-23 00:07:50

线程同步工具(七)在并发任务间交换数据的相关文章

第三章-线程同步工具(引言)

章节提要: 并发地访问资源的控制 并发地访问多个副本资源的控制 等待多个并发事件 在一个相同点同步任务 并发的阶段性任务的运行 并发地阶段性任务的阶段改变的控制 在并发任务间改变数据 介绍 在第二章基本的线程同步中,我们学习了同步和critical section的内容.基本上,当多个并发任务共享一个资源时就称为同步,例如:一个对象或者一个对象的属性.访问这个资源的代码块称为:临界区. 如果机制没有使用恰当,那么可能会导致错误的结果,或者数据不一致,又或者出现异常情况.所以必须采取java语言提

python使用Queue在多个子进程间交换数据的方法_python

本文实例讲述了python使用Queue在多个子进程间交换数据的方法.分享给大家供大家参考.具体如下: 这里将Queue作为中间通道进行数据传递,Queue是线程和进程安全的 from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start()

线程同步工具(五)运行阶段性并发任务

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 运行阶段性并发任务 Java 并发 API 提供的一个非常复杂且强大的功能是,能够使用Phaser类运行阶段性的并发任务.当某些并发任务是分成多个步骤来执行时,那么此机制是非常有用的.Phaser类提供的机制是在每个步骤的结尾同步线程,所以除非全部线程完成第一个步骤,否则线程不能开始进行第二步. 相对于其他同步应用,我们必须初始化Phas

基本线程同步(七)修改Lock的公平性

修改Lock的公平性 在ReentrantLock类和 ReentrantReadWriteLock类的构造器中,允许一个名为fair的boolean类型参数,它允许你来控制这些类的行为.默认值为 false,这将启用非公平模式.在这个模式中,当有多个线程正在等待一把锁(ReentrantLock或者 ReentrantReadWriteLock),这个锁必须选择它们中间的一个来获得进入临界区,选择任意一个是没有任何标准的.true值将开启公平 模式.在这个模式中,当有多个线程正在等待一把锁(R

线程同步工具(六)控制并发阶段性任务的改变

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 控制并发阶段性任务的改变 Phaser 类提供每次phaser改变阶段都会执行的方法.它是 onAdvance() 方法.它接收2个参数:当前阶段数和注册的参与者数:它返回 Boolean 值,如果phaser继续它的执行,则为 false:否则为真,即phaser结束运行并进入 termination 状态. 如果注册参与者为0,此方法

线程同步工具(一)控制并发访问资源

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷     控制并发访问资源 这个指南,你将学习怎样使用Java语言提供的Semaphore机制.Semaphore是一个控制访问多个共享资源的计数器. Semaphore的内容是由Edsger Dijkstra引入并在 THEOS操作系统上第一次使用. 当一个线程想要访问某个共享资源,首先,它必须获得semaphore.如果semaphor

线程同步工具(三)等待多个并发事件完成

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 等待多个并发事件完成 Java并发API提供这样的类,它允许1个或者多个线程一直等待,直到一组操作执行完成. 这个类就是CountDownLatch类.它初始一个整数值,此值是线程将要等待的操作数.当某个线程为了想要执行这些操作而等待时, 它要使用 await()方法.此方法让线程进入休眠直到操作完成. 当某个操作结束,它使用countD

线程同步工具(二)控制并发访问多个资源

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 控制并发访问多个资源 在并发访问资源的控制中,你学习了信号量(semaphores)的基本知识. 在上个指南,你实现了使用binary semaphores的例子.那种semaphores是用来保护访问一个共享资源的,或者说一个代码片段每次只能被一个线程执行.但是semaphores也可以用来保护多个资源的副本,也就是说当你有一个代码片段

线程同步工具(四)在同一个点同步任务

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 在同一个点同步任务 Java 并发 API 提供了可以允许2个或多个线程在在一个确定点的同步应用.它是 CyclicBarrier 类.此类与在此章节的等待多个并发事件完成指南中的 CountDownLatch 类相似,但是它有一些特殊性让它成为更强大的类. CyclicBarrier 类有一个整数初始值,此值表示将在同一点同步的线程数量