线程同步工具(五)运行阶段性并发任务

声明:本文是《 Java 7 Concurrency Cookbook》的第三章, 作者: Javier Fernández González 译者:郑玉婷

运行阶段性并发任务

Java 并发 API 提供的一个非常复杂且强大的功能是,能够使用Phaser类运行阶段性的并发任务。当某些并发任务是分成多个步骤来执行时,那么此机制是非常有用的。Phaser类提供的机制是在每个步骤的结尾同步线程,所以除非全部线程完成第一个步骤,否则线程不能开始进行第二步。

相对于其他同步应用,我们必须初始化Phaser类与这次同步操作有关的任务数,我们可以通过增加或者减少来不断的改变这个数。

在这个指南,你将学习如果使用Phaser类来同步3个并发任务。这3个任务会在3个不同的文件夹和它们的子文件夹中搜索扩展名是.log并在24小时内修改过的文件。这个任务被分成3个步骤:

1. 在指定的文件夹和子文件夹中获得文件扩展名为.log的文件列表。
2. 过滤第一步的列表中修改超过24小时的文件。
3. 在操控台打印结果。

在步骤1和步骤2的结尾我们要检查列表是否为空。如果为空,那么线程直接结束运行并从phaser类中淘汰。

准备

指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 打开并创建一个新的java任务。

怎么做呢

按照这些步骤来实现下面的例子:

001 import java.io.File;
002 import java.util.ArrayList;
003 import java.util.Date;
004 import java.util.List;
005 import java.util.concurrent.Phaser;
006 import java.util.concurrent.TimeUnit;
007  
008 //1.   创建一个类名为FileSearch并一定实现Runnable 接口。这个类实现的操作是在文件夹和其子文件夹中搜索确定的扩展名并在24小时内修改的文件。
009 public class FileSearch implements Runnable {
010  
011     // 2. 声明一个私有 String 属性来储存搜索开始的时候的文件夹。
012     private String initPath;
013  
014     // 3. 声明另一个私有 String 属性来储存我们要寻找的文件的扩展名。
015     private String end;
016  
017     // 4. 声明一个私有 List 属性来储存我们找到的符合条件的文件的路径。
018     private List<String> results;
019  
020     // 5. 最后,声明一个私有 Phaser 属性来控制任务的不同phaser的同步。
021     private Phaser phaser;
022  
023     // 6. 实现类的构造函数,初始化类的属性们。它接收初始文件夹的路径,文件的扩展名,和phaser 作为参数。
024     public FileSearch(String initPath, String end, Phaser phaser) {
025         this.initPath = initPath;
026         this.end = end;
027         this.phaser = phaser;
028         results = new ArrayList<String>();
029     }
030  
031     // 7. 现在,你必须实现一些要给run() 方法用的辅助方法。第一个是 directoryProcess()
032     // 方法。它接收File对象作为参数并处理全部的文件和子文件夹。对于每个文件夹,此方法会递归调用并传递文件夹作为参数。对于每个文件,此方法会调用fileProcess()
033     // 方法。
034     private void directoryProcess(File file) {
035  
036         File list[] = file.listFiles();
037         if (list != null) {
038             for (int i = 0; i < list.length; i++) {
039  
040                 if (list[i].isDirectory()) {
041                     directoryProcess(list[i]);
042                 } else {
043                     fileProcess(list[i]);
044                 }
045             }
046         }
047     }
048  
049     // 8. 现在,实现 fileProcess() 方法。它接收 File
050     // 对象作为参数并检查它的扩展名是否是我们正在查找的。如果是,此方法会把文件的绝对路径写入结果列表内。
051     private void fileProcess(File file) {
052         if (file.getName().endsWith(end)) {
053             results.add(file.getAbsolutePath());
054         }
055     }
056  
057     // 9. 现在,实现 filterResults()
058     // 方法。不接收任何参数。它过滤在第一阶段获得的文件列表,并删除修改超过24小时的文件。首先,创建一个新的空list并获得当前时间。
059     private void filterResults() {
060         List<String> newResults = new ArrayList<String>();
061         long actualDate = new Date().getTime();
062  
063         // 10. 然后,浏览结果list里的所有元素。对于每个路径,为文件创建File对象 go through all the elements
064         // of the results list. For each path in the list of results, create a
065         // File object for that file and get the last modified date for it.
066         for (int i = 0; i < results.size(); i++) {
067             File file = new File(results.get(i));
068             long fileDate = file.lastModified();
069  
070             // 11. 然后, 对比与真实日期对比,如果相差小于一天,把文件的路径加入到新的结果列表。
071             if (actualDate - fileDate < TimeUnit.MILLISECONDS.convert(1,
072                     TimeUnit.DAYS)) {
073                 newResults.add(results.get(i));
074             }
075         }
076  
077         // 12. 最后,把旧的结果改为新的。
078         results = newResults;
079     }
080  
081     // 13. 现在,实现 checkResults() 方法。此方法在第一个和第二个phase的结尾被调用,并检查结果是否为空。此方法不接收任何参数。
082     private boolean checkResults() {
083  
084         // 14. 首先,检查结果List的大小。如果为0,对象写信息到操控台表明情况,然后调用Phaser对象的
085         // arriveAndDeregister() 方法通知此线程已经结束actual phase,并离开phased操作。
086         if (results.isEmpty()) {
087             System.out.printf("%s: Phase %d: 0 results.\n", Thread
088                     .currentThread().getName(), phaser.getPhase());
089             System.out.printf("%s: Phase %d: End.\n", Thread.currentThread()
090                     .getName(), phaser.getPhase());
091             phaser.arriveAndDeregister();
092             return false;
093  
094             // 15. 另一种情况,如果结果list有元素,那么对象写信息到操控台表明情况,调用 Phaser对象懂得
095             // arriveAndAwaitAdvance() 方法并通知 actual phase,它会被阻塞直到phased
096             // 操作的全部参与线程结束actual phase。
097  
098         } else {
099             System.out.printf("%s: Phase %d: %d results.\n", Thread
100                     .currentThread().getName(), phaser.getPhase(), results
101                     .size());
102             phaser.arriveAndAwaitAdvance();
103             return true;
104         }
105     }
106  
107     // 16. 最好一个辅助方法是 showInfo() 方法,打印results list 的元素到操控台。
108     private void showInfo() {
109         for (int i = 0; i < results.size(); i++) {
110             File file = new File(results.get(i));
111             System.out.printf("%s: %s\n", Thread.currentThread().getName(),
112                     file.getAbsolutePath());
113         }
114         phaser.arriveAndAwaitAdvance();
115     }
116  
117     // 17. 现在,来实现 run() 方法,使用之前描述的辅助方法来执行,并使用Phaser对象控制phases间的改变。首先,调用phaser对象的
118     // arriveAndAwaitAdvance() 方法。直到使用线程被创建完成,搜索行为才会开始。
119     @Override
120     public void run() {
121  
122         phaser.arriveAndAwaitAdvance();
123  
124         // 18. 然后,写信息到操控台表明搜索任务开始。
125  
126         System.out.printf("%s: Starting.\n", Thread.currentThread().getName());
127  
128         // 19. 查看 initPath 属性储存的文件夹名字并使用 directoryProcess()
129         // 方法在文件夹和其子文件夹内查找带特殊扩展名的文件。
130         File file = new File(initPath);
131         if (file.isDirectory()) {
132             directoryProcess(file);
133         }
134  
135         // 20. 使用 checkResults() 方法检查是否有结果。如果没有任何结果,结束线程的执行并返回keyword。
136         if (!checkResults()) {
137             return;
138         }
139  
140         // 21. 使用filterResults() 方法过滤结果list。
141         filterResults();
142  
143         // 22. 再次使用checkResults() 方法检查是否有结果。如果没有,结束线程的执行并返回keyword。
144         if (!checkResults()) {
145             return;
146         }
147  
148         // 23. 使用 showInfo() 方法打印最终的结果list到操控台,撤销线程的登记,并打印信息表明线程的终结。
149         showInfo();
150         phaser.arriveAndDeregister();
151         System.out.printf("%s: Work completed.\n", Thread.currentThread()
152                 .getName());
153  
154     }
155 }
156  
157 // 24. 现在,实现例子的main 类通过创建类名为 Main 并为其添加 main() 方法。
158  
159 class Main {
160  
161     public static void main(String[] args) {
162  
163         // 25. 创建 含3个参与者的 Phaser 对象。
164         Phaser phaser = new Phaser(3);
165  
166         // 26. 创建3个 FileSearch 对象,每个在不同的初始文件夹里搜索.log扩展名的文件。
167         FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
168         FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
169         FileSearch documents = new FileSearch("C:\\Documents And Settings",
170                 "log", phaser);
171  
172         // 27. 创建并开始一个线程来执行第一个 FileSearch 对象。
173         Thread systemThread = new Thread(system, "System");
174         systemThread.start();
175  
176         // 28. 创建并开始一个线程来执行第二个 FileSearch 对象。
177         Thread appsThread = new Thread(apps, "Apps");
178         appsThread.start();
179  
180         // 29. 创建并开始一个线程来执行第三个 FileSearch 对象。
181         Thread documentsThread = new Thread(documents, "Documents");
182         documentsThread.start();
183  
184         // 30. 等待3个线程们的终结。
185  
186         try {
187             systemThread.join();
188             appsThread.join();
189             documentsThread.join();
190         } catch (InterruptedException e) {
191             e.printStackTrace();
192         }
193  
194         // 31. 使用isFinalized()方法把Phaser对象的结束标志值写入操控台。
195         System.out.println("Terminated: " + phaser.isTerminated());
196     }
197 }

它是怎么工作的…

这程序开始创建的 Phaser 对象是用来在每个phase的末端控制线程的同步。Phaser的构造函数接收参与者的数量作为参数。在这里,Phaser有3个参与者。这个数向Phaser表示 Phaser改变phase之前执行 arriveAndAwaitAdvance() 方法的线程数,并叫醒正在休眠的线程。

一旦Phaser被创建,我们运行3个线程分别执行3个不同的FileSearch对象。

在例子里,我们使用 Windows operating system 的路径。如果你使用的是其他操作系统,那么修改成适应你的环境的路径。

FileSearch对象的 run() 方法中的第一个指令是调用Phaser对象的 arriveAndAwaitAdvance() 方法。像之前提到的,Phaser知道我们要同步的线程的数量。当某个线程调用此方法,Phaser减少终结actual phase的线程数,并让这个线程进入休眠 直到全部其余线程结束phase。在run() 方法前面调用此方法,没有任何 FileSearch 线程可以开始他们的工作,直到全部线程被创建。

在phase 1 和 phase 2 的末端,我们检查phase 是否生成有元素的结果list,或者它没有生成结果且list为空。在第一个情况,checkResults() 方法之前提的调用 arriveAndAwaitAdvance()。在第二个情况,如果list为空,那就没有必要让线程继续了,就直接返回吧。但是你必须通知phaser,将会少一个参与者。为了这个,我们使用arriveAndDeregister()。它通知phaser线程结束了actual phase, 但是它将不会继续参见后面的phases,所以请phaser不要再等待它了。

在phase3的结尾实现了 showInfo() 方法, 调用了 phaser 的 arriveAndAwaitAdvance() 方法。这个调用,保证了全部线程在同一时间结束。当此方法结束执行,有一个调用phaser的arriveAndDeregister() 方法。这个调用,我们撤销了对phaser线程的注册,所以当全部线程结束时,phaser 有0个参与者。

最后,main() 方法等待3个线程的完成并调用phaser的 isTerminated() 方法。当phaser 有0个参与者时,它进入termination状态,此状态与此调用将会打印true到操控台。

Phaser 对象可能是在这2中状态:

  1. Active: 当 Phaser 接受新的参与者注册,它进入这个状态,并且在每个phase的末端同步。 在此状态,Phaser像在这个指南里解释的那样工作。此状态不在Java 并发 API中。
  2. Termination: 默认状态,当Phaser里全部的参与者都取消注册,它进入这个状态,所以这时 Phaser 有0个参与者。更具体的说,当onAdvance() 方法返回真值时,Phaser 是在这个状态里。如果你覆盖那个方法,你可以改变它的默认行为。当 Phaser 在这个状态,同步方法 arriveAndAwaitAdvance()会 立刻返回,不会做任何同步。

Phaser 类的一个显著特点是你不需要控制任何与phaser相关的方法的异常。不像其他同步应用,线程们在phaser休眠不会响应任何中断也不会抛出 InterruptedException 异常。只有一个异常会在下面的‘更多’里解释。

下面的裁图是例子的运行结果:

它展示了前2个phases的执行。你可以发现Apps线程在phase 2 结束它的运行由于list 为空。当你执行例子,你会发现一些线程比其他的线程更快结束phase,但是他们必须等待其他全部结束然后才能继续。

更多…

The Phaser类还提供了其他相关方法来改变phase。他们是:

  • arrive(): 此方法示意phaser某个参与者已经结束actual phase了,但是他应该等待其他的参与者才能继续执行。小心使用此法,因为它并不能与其他线程同步。
  • awaitAdvance(int phase): 如果我们传递的参数值等于phaser的actual phase,此方法让当前线程进入睡眠直到phaser的全部参与者结束当前的phase。如果参数值与phaser 的 actual phase不等,那么立刻返回。
  • awaitAdvanceInterruptibly(int phaser): 此方法等同与之前的方法,只是在线程正在此方法中休眠而被中断时候,它会抛出InterruptedException 异常。

Phaser的参与者的注册
当你创建一个 Phaser 对象,你表明了参与者的数量。但是Phaser类还有2种方法来增加参与者的数量。他们是:

  • register(): 此方法为Phaser添加一个新的参与者。这个新加入者会被认为是还未到达 actual phase.
  • bulkRegister(int Parties): 此方法为Phaser添加一个特定数量的参与者。这些新加入的参与都会被认为是还未到达 actual phase.

Phaser类提供的唯一一个减少参与者数量的方法是arriveAndDeregister() 方法,它通知phaser线程已经结束了actual phase,而且他不想继续phased的操作了。

强制终止 Phaser
当phaser有0个参与者,它进入一个称为Termination的状态。Phaser 类提供 forceTermination() 来改变phaser的状态,让它直接进入Termination 状态,不在乎已经在phaser中注册的参与者的数量。此机制可能会很有用在一个参与者出现异常的情况下来强制结束phaser.

当phaser在 Termination 状态, awaitAdvance() 和 arriveAndAwaitAdvance() 方法立刻返回一个负值,而不是一般情况下的正值如果你知道你的phaser可能终止了,那么你可以用这些方法来确认他是否真的终止了。

参见

第八章,测试并发应用:检测Phaser 

时间: 2024-09-18 08:08:04

线程同步工具(五)运行阶段性并发任务的相关文章

线程同步工具(七)在并发任务间交换数据

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 在并发任务间交换数据 Java 并发 API 提供了一种允许2个并发任务间相互交换数据的同步应用.更具体的说,Exchanger 类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的. 这个类在遇到类似生产者和消费者问题时,是非常有用的.来一个

第三章-线程同步工具(引言)

章节提要: 并发地访问资源的控制 并发地访问多个副本资源的控制 等待多个并发事件 在一个相同点同步任务 并发的阶段性任务的运行 并发地阶段性任务的阶段改变的控制 在并发任务间改变数据 介绍 在第二章基本的线程同步中,我们学习了同步和critical section的内容.基本上,当多个并发任务共享一个资源时就称为同步,例如:一个对象或者一个对象的属性.访问这个资源的代码块称为:临界区. 如果机制没有使用恰当,那么可能会导致错误的结果,或者数据不一致,又或者出现异常情况.所以必须采取java语言提

基本线程同步(五)使用Lock同步代码块

声明:本文是< Java 7 Concurrency Cookbook >的第二章,作者: Javier Fernández González     译者:许巧辉 校对:方腾飞 使用Lock同步代码块 Java提供另外的机制用来同步代码块.它比synchronized关键字更加强大.灵活.它是基于Lock接口和实现它的类(如ReentrantLock).这种机制有如下优势: 它允许以一种更灵活的方式来构建synchronized块.使用synchronized关键字,你必须以结构化方式得到释

银行取款[多线程]{未进行线程同步}(junit不适合多线程并发单元测试)

        由于计算机多任务.多进程.多线程的支持,使得计算机资源的服务效率提高,服务器对请求的也使用线程来相应,所有,代码中涉及到同时对共享数据的操作,将在 多线程环境中操作数据,导致数据安全问题.      经典例子:老婆(朱丽叶)老公(罗密欧),使用银行卡和存折,或者网银等,同时对同一账户操作的安全问题.      如果要保证多线程下数据安全,就要实现线程同步(例如:一间小厕所,就得有一个锁,保证同一时间为一个人服务).其他文章讲: 此处用多线程实现,同时取款的模拟实现,未进行线程同步

线程同步工具(六)控制并发阶段性任务的改变

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 控制并发阶段性任务的改变 Phaser 类提供每次phaser改变阶段都会执行的方法.它是 onAdvance() 方法.它接收2个参数:当前阶段数和注册的参与者数:它返回 Boolean 值,如果phaser继续它的执行,则为 false:否则为真,即phaser结束运行并进入 termination 状态. 如果注册参与者为0,此方法

线程同步工具(一)控制并发访问资源

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷     控制并发访问资源 这个指南,你将学习怎样使用Java语言提供的Semaphore机制.Semaphore是一个控制访问多个共享资源的计数器. Semaphore的内容是由Edsger Dijkstra引入并在 THEOS操作系统上第一次使用. 当一个线程想要访问某个共享资源,首先,它必须获得semaphore.如果semaphor

线程同步工具(三)等待多个并发事件完成

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 等待多个并发事件完成 Java并发API提供这样的类,它允许1个或者多个线程一直等待,直到一组操作执行完成. 这个类就是CountDownLatch类.它初始一个整数值,此值是线程将要等待的操作数.当某个线程为了想要执行这些操作而等待时, 它要使用 await()方法.此方法让线程进入休眠直到操作完成. 当某个操作结束,它使用countD

线程同步工具(二)控制并发访问多个资源

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 控制并发访问多个资源 在并发访问资源的控制中,你学习了信号量(semaphores)的基本知识. 在上个指南,你实现了使用binary semaphores的例子.那种semaphores是用来保护访问一个共享资源的,或者说一个代码片段每次只能被一个线程执行.但是semaphores也可以用来保护多个资源的副本,也就是说当你有一个代码片段

线程同步工具(四)在同一个点同步任务

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 在同一个点同步任务 Java 并发 API 提供了可以允许2个或多个线程在在一个确定点的同步应用.它是 CyclicBarrier 类.此类与在此章节的等待多个并发事件完成指南中的 CountDownLatch 类相似,但是它有一些特殊性让它成为更强大的类. CyclicBarrier 类有一个整数初始值,此值表示将在同一点同步的线程数量