线程同步工具(四)在同一个点同步任务

声明:本文是《 Java 7 Concurrency Cookbook》的第三章, 作者: Javier Fernández González 译者:郑玉婷

在同一个点同步任务

Java 并发 API 提供了可以允许2个或多个线程在在一个确定点的同步应用。它是 CyclicBarrier 类。此类与在此章节的等待多个并发事件完成指南中的 CountDownLatch 类相似,但是它有一些特殊性让它成为更强大的类。

CyclicBarrier 类有一个整数初始值,此值表示将在同一点同步的线程数量。当其中一个线程到达确定点,它会调用await() 方法来等待其他线程。当线程调用这个方法,CyclicBarrier阻塞线程进入休眠直到其他线程到达。当最后一个线程调用CyclicBarrier 类的await() 方法,它唤醒所有等待的线程并继续执行它们的任务。

CyclicBarrier 类有个有趣的优势是,你可以传递一个外加的 Runnable 对象作为初始参数,并且当全部线程都到达同一个点时,CyclicBarrier类 会把这个对象当做线程来执行。此特点让这个类在使用 divide 和 conquer 编程技术时,可以充分发挥任务的并行性,

在这个指南,你将学习如何使用 CyclicBarrier 类来让一组线程在一个确定点同步。你也将使用 Runnable 对象,它将会在全部线程都到达确定点后被执行。在这个例子里,你将在数字矩阵中查找一个数字。矩阵会被分成多个子集(使用divide 和 conquer 技术),所以每个线程会在一个子集中查找那个数字。一旦全部行程运行结束,会有一个最终任务来统一他们的结果。

准备

指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 打开并创建一个新的java项目。

怎么做呢

按照这些步骤来实现下面的例子::

001 //1.  我们从实现2个辅助类开始。首先,创建一个类名为 MatrixMock。此类随机生成一个在1-10之间的 数字矩阵,我们将从中查找数字。
002 public class MatrixMock {
003  
004 //2.   声明私有 int matrix,名为 data。
005 private int data[][];
006  
007 //3.   实现类的构造函数。此构造函数将接收矩阵的行数,行的长度,和我们将要查找的数字作为参数。3个参数全部int 类型。
008 public MatrixMock(int size, int length, int number){
009  
010 //4.   初始化构造函数将使用的变量和对象。
011 int counter=0;
012 data=new int[size][length];
013 Random random=new Random();
014  
015 //5.   用随机数字填充矩阵。每生成一个数字就与要查找的数字对比,如果相等,就增加counter值。
016 for (int i=0; i<size; i++) {
017     for (int j=0; j<length; j++){
018         data[i][j]=random.nextInt(10);
019         if (data[i][j]==number){
020             counter++;
021         }
022     }
023 }
024  
025 //6.   最后,在操控台打印一条信息,表示查找的数字在生成的矩阵里的出现次数。此信息是用来检查线程们获得的正确结果的。
026 System.out.printf("Mock: There are %d ocurrences of number in generated data.\n",counter,number); //译者注:把字符串里的number改为%d.
027  
028 //7.    实现 getRow() 方法。此方法接收一个 int为参数,是矩阵的行数。返回行数如果存在,否则返回null。
029 public int[] getRow(int row){
030     if ((row>=0)&&(row<data.length)){
031         return data[row];
032     }
033     return null;
034 }
035  
036 //8.   现在,实现一个类名为 Results。此类会在array内保存被查找的数字在矩阵的每行里出现的次数。
037 public class Results {
038  
039 //9.   声明私有 int array 名为 data。
040 private int data[];
041  
042 //10. 实现类的构造函数。此构造函数接收一个表明array元素量的整数作为参数。
043 public Results(int size){
044     data=new int[size];
045 }
046  
047 //11. 实现 setData() 方法。此方法接收array的某个位置和一个值作为参数,然后把array的那个位置设定为那个值。
048 public void setData(int position, int value){
049     data[position]=value;
050 }
051  
052 //12. 实现 getData() 方法。此方法返回结果 array。
053 public int[] getData(){
054 return data;
055 }
056  
057 //13. 现在你有了辅助类,是时候来实现线程了。首先,实现 Searcher 类。这个类会在随机数字的矩阵中的特定的行里查找数字。创建一个类名为Searcher 并一定实现  Runnable 接口.
058 public class Searcher implements Runnable {
059  
060 //14. 声明2个私有int属性名为 firstRow 和 lastRow。这2个属性是用来确定将要用的子集的行。
061 private int firstRow;
062 private int lastRow;
063  
064 //15. 声明一个私有 MatrixMock 属性,名为 mock。
065 private MatrixMock mock;
066  
067 //16. 声明一个私有 Results 属性,名为 results。
068 private Results results;
069  
070 //17.  声明一个私有 int 属性名为 number,用来储存我们要查找的数字。
071 private int number;
072  
073 //18. 声明一个 CyclicBarrier 对象,名为 barrier。
074 private final CyclicBarrier barrier;
075  
076 //19. 实现类的构造函数,并初始化之前声明的全部属性。
077 public Searcher(int firstRow, int lastRow, NumberMock mock, Results results, int number, CyclicBarrier barrier){
078     this.firstRow=firstRow;
079     this.lastRow=lastRow;
080     this.mock=mock;
081     this.results=results;
082     this.number=number;
083     this.barrier=barrier;
084 }
085  
086 //20. 实现 run() 方法,用来查找数字。它使用内部变量,名为counter,用来储存数字在每行出现的次数。
087 @Override
088 public void run() {
089     int counter;
090  
091 //21. 在操控台打印一条信息表明被分配到这个对象的行。
092 System.out.printf("%s: Processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow);
093  
094 //22. 处理分配给这个线程的全部行。对于每行,记录正在查找的数字出现的次数,并在相对于的 Results 对象中保存此数据。
095 for (int i=firstRow; i<lastRow; i++){
096     int row[]=mock.getRow(i);
097     counter=0;
098     for (int j=0; j<row.length; j++){
099         if (row[j]==number){
100         counter++;
101     }
102 }
103  
104 results.setData(i, counter);
105 }
106  
107 //23. 打印信息到操控台表明此对象已经结束搜索。
108 System.out.printf("%s: Lines processed.\n",Thread. currentThread().getName());
109  
110 //24. 调用 CyclicBarrier 对象的 await() 方法 ,由于可能抛出的异常,要加入处理 InterruptedException and BrokenBarrierException 异常的必需代码。
111 try {
112     barrier.await();
113 } catch (InterruptedException e) {
114     e.printStackTrace();
115 } catch (BrokenBarrierException e) {
116     e.printStackTrace();
117 }
118  
119 //25. 现在,实现一个类来计算数字在这个矩阵里出现的总数。它使用储存了矩阵中每行里数字出现次数的 Results 对象来进行运算。创建一个类,名为 Grouper 并一定实现 Runnable 接口.
120 public class Grouper implements Runnable {
121  
122 //26. 声明一个私有 Results 属性,名为 results。
123 private Results results;
124  
125 //27.  实现类的构造函数,并初始化 Results 属性。
126 public Grouper(Results results){
127 this.results=results;
128 }
129  
130 //28.实现 run() 方法,用来计算结果array里数字出现次数的总和。
131 @Override
132 public void run() {
133  
134 //29. 声明一个 int 变量并写在操控台写一条信息表明开始处理了。
135 int finalResult=0;
136 System.out.printf("Grouper: Processing results...\n");
137  
138 //30. 使用 results 对象的 getData() 方法来获得每行数字出现的次数。然后,处理array的全部元素,把每个元素的值加给 finalResult 变量。
139 int data[]=results.getData();
140 for (int number:data){
141 finalResult+=number;
142 }
143  
144 //31. 在操控台打印结果。
145 System.out.printf("Grouper: Total result: %d.\n",finalResult);
146  
147 //32. 最后, 实现例子的 main 类,通过创建一个类,名为 Main 并为其添加 main() 方法。
148 public class Main {
149  
150 public static void main(String[] args) {
151  
152 //33. 声明并初始5个常熟来储存应用的参数。
153 final int ROWS=10000;
154 final int NUMBERS=1000;
155 final int SEARCH=5;
156 final int PARTICIPANTS=5;
157 final int LINES_PARTICIPANT=2000;
158  
159 //34. Create a MatrixMock 对象,名为 mock. 它将有 10,000 行,每行1000个元素。现在,你要查找的数字是5。
160 MatrixMock mock=new MatrixMock(ROWS, NUMBERS,SEARCH);
161  
162 //35. 创建 Results 对象,名为 results。它将有 10,000 元素。
163 Results results=new Results(ROWS);
164  
165 //36. 创建 Grouper 对象,名为 grouper。
166 Grouper grouper=new Grouper(results);
167  
168 //37.  创建 CyclicBarrier 对象,名为 barrier。此对象会等待5个线程。当此线程结束后,它会执行前面创建的 Grouper 对象。
169 CyclicBarrier barrier=new CyclicBarrier(PARTICIPANTS,grouper);
170  
171 //38. 创建5个 Searcher 对象,5个执行他们的线程,并开始这5个线程。
172 Searcher searchers[]=new Searcher[PARTICIPANTS];
173 for (int i=0; i<PARTICIPANTS; i++){
174     searchers[i]=new Searcher(i*LINES_PARTICIPANT, (i*LINES_ PARTICIPANT)+LINES_PARTICIPANT, mock, results, 5,barrier);
175     Thread thread=new Thread(searchers[i]);
176     thread.start();
177 }
178 System.out.printf("Main: The main thread has finished.\n");

它是怎么工作的…

以下裁图是例子的运行结果:

例子中解决的问题比较简单。我们有一个很大的随机的整数矩阵,然后你想知道这矩阵里面某个数字出现的次数。为了更好的执行,我们使用了 divide 和 conquer 技术。我们 divide 矩阵成5个子集,然后在每个子集里使用一个线程来查找数字。这些线程是 Searcher 类的对象。

我们使用 CyclicBarrier 对象来同步5个线程的完成,并执行 Grouper 任务处理个别结果,最后计算最终结果。

如我们之前提到的,CyclicBarrier 类有一个内部计数器控制到达同步点的线程数量。每次线程到达同步点,它调用 await() 方法告知 CyclicBarrier 对象到达同步点了。CyclicBarrier 把线程放入睡眠状态直到全部的线程都到达他们的同步点。

当全部的线程都到达他们的同步点,CyclicBarrier 对象叫醒全部正在 await() 方法中等待的线程们,然后,选择性的,为CyclicBarrier的构造函数 传递的 Runnable 对象(例子里,是 Grouper 对象)创建新的线程执行外加任务。

更多…

CyclicBarrier 类有另一个版本的 await() 方法:

  • await(long time, TimeUnit unit): 线程会一直休眠直到被中断;内部计数器到达0,或者特定的时间过去了。TimeUnit类有多种常量: DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, and SECONDS.

此类也提供了 getNumberWaiting() 方法,返回被 await() 方法阻塞的线程数,还有 getParties() 方法,返回将与CyclicBarrier同步的任务数。

重置 CyclicBarrier 对象
CyclicBarrier 类与CountDownLatch有一些共同点,但是也有一些不同。最主要的不同是,CyclicBarrier对象可以重置到它的初始状态,重新分配新的值给内部计数器,即使它已经被初始过了。

可以使用 CyclicBarrier的reset() 方法来进行重置操作。当这个方法被调用后,全部的正在await() 方法里等待的线程接收到一个 BrokenBarrierException 异常。此异常在例子中已经用打印stack trace处理了,但是在一个更复制的应用,它可以执行一些其他操作,例如重新开始执行或者在中断点恢复操作。

破坏 CyclicBarrier 对象
CyclicBarrier 对象可能处于一个特殊的状态,称为 broken。当多个线程正在 await() 方法中等待时,其中一个被中断了,此线程会收到 InterruptedException 异常,但是其他正在等待的线程将收到 BrokenBarrierException 异常,并且 CyclicBarrier 会被置于broken 状态中。

CyclicBarrier 类提供了isBroken() 方法,如果对象在 broken 状态,返回true,否则返回false。

时间: 2024-11-05 17:25:12

线程同步工具(四)在同一个点同步任务的相关文章

使用阿里巴巴开源数据库同步工具DATAX实现跨数据库同步

使用阿里巴巴开源数据库同步工具DATAX实现跨数据库同步 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.HDFS.Hive.OceanBase.HBase.OTS.ODPS 等各种异构数据源之间高效的数据同步功能. 点击进入 先请配置DataX 环境变量 Linux.Windows JDK(1.8) Python(推荐Python2.6.X) Apache Maven 3.x (Compile DataX) 下面演示dataX 配置示例:从M

Linux 文件同步工具——inotify+rsync实现实时同步

文章整理自:http://ixdba.blog.51cto.com/2895551/580280 前面我们已经讲解了如何使用rsync实现文件同步,但是rsync会存在一些缺点: 一.rsync的优点与不足 与传统的cp.tar备份方式相比,rsync具有安全性高.备份迅速.支持增量备份等优点,通过rsync可以解决对实时性要求不高的数据备份需求,例如定期的备份文件服务器数据到远端服务器,对本地磁盘定期做数据镜像等. 随着应用系统规模的不断扩大,对数据的安全性和可靠性也提出的更好的要求,rsyn

第三章-线程同步工具(引言)

章节提要: 并发地访问资源的控制 并发地访问多个副本资源的控制 等待多个并发事件 在一个相同点同步任务 并发的阶段性任务的运行 并发地阶段性任务的阶段改变的控制 在并发任务间改变数据 介绍 在第二章基本的线程同步中,我们学习了同步和critical section的内容.基本上,当多个并发任务共享一个资源时就称为同步,例如:一个对象或者一个对象的属性.访问这个资源的代码块称为:临界区. 如果机制没有使用恰当,那么可能会导致错误的结果,或者数据不一致,又或者出现异常情况.所以必须采取java语言提

线程同步工具(七)在并发任务间交换数据

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 在并发任务间交换数据 Java 并发 API 提供了一种允许2个并发任务间相互交换数据的同步应用.更具体的说,Exchanger 类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的. 这个类在遇到类似生产者和消费者问题时,是非常有用的.来一个

源码剖析AQS在几个同步工具类中的使用

感谢网友[张超盟]的投稿 1. 前言 AQS(AbstractQueuedSynchronizer)是 java.util.concurrent的基础.J.U.C中宣传的封装良好的同步工具类Semaphore.CountDownLatch.ReentrantLock.ReentrantReadWriteLock.FutureTask等虽然各自都有不同特征,但是简单看一下源码,每个类内部都包含一个如下的内部类定义: abstract static class Sync extends Abstra

c# 线程同步: 详解lock,monitor,同步事件和等待句柄以及mutex

转自 http://www.cnblogs.com/xd125/archive/2007/12/12/992406.html 最近由于在准备Collection对象培训的PPT,因为涉及到SyncRoot的属性的讲解,所以对怎样在多线程应用程序中同步资源访问做了个总结:对于引用类型和非线程安全的资源的同步处理,有四种相关处理:lock关键字,监视器(Monitor), 同步事件和等待句柄, mutex类. Lock关键字    本人愚钝,在以前编程中遇到lock的问题总是使用lock(this)

阿里巴巴开源离线同步工具 DataX3.0 介绍

一. DataX3.0概览 DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL.Oracle等).HDFS.Hive.ODPS.HBase.FTP等各种异构数据源之间稳定高效的数据同步功能. 设计理念 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源.当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步. 当前使用现状 DataX在阿里巴巴集团

Java并发编程中构建自定义同步工具_java

当Java类库没有提供适合的同步工具时,就需要构建自定义同步工具. 可阻塞状态依赖操作的结构 复制代码 代码如下: acquir lock on object state;//请求获取锁 while(precondition does not hold){//没有满足前提条件    release lock;//先释放锁    wait until precondition might hold;//等待满足前提条件    optionlly fail if interrupted or tim

Firefox书签同步工具Xmarks

前段时间Xmarks被解封,使得这个优秀的浏览器书签同步工具得以重新在国内使用,我们知道,Google Chrome浏览器自带书签同步功能,IE浏览器可以通过Dropbox实现书签同步,而对于Firefox浏览器来说,最好用的书签同步工具无疑是 Xmarks. 特点一:云存储 Xmarks是一款用于书签备份以及自动同步本地书签到服务器端的Firefox扩展,当我们使用多台电脑的Firefox浏览器的时候,只要在各个浏览器安装Xmarks扩展并登录,就可以自动同步Firefox书签,和Dropbo