线程同步工具(六)控制并发阶段性任务的改变

声明:本文是《 Java 7 Concurrency Cookbook》的第三章, 作者: Javier Fernández González 译者:郑玉婷

控制并发阶段性任务的改变

Phaser 类提供每次phaser改变阶段都会执行的方法。它是 onAdvance() 方法。它接收2个参数:当前阶段数和注册的参与者数;它返回 Boolean 值,如果phaser继续它的执行,则为 false;否则为真,即phaser结束运行并进入 termination 状态。

如果注册参与者为0,此方法的默认的实现值为真,要不然就是false。如果你扩展Phaser类并覆盖此方法,那么你可以修改它的行为。通常,当你要从一个phase到另一个,来执行一些行动时,你会对这么做感兴趣的。

在这个指南,你将学习如何控制phaser的 phase的改变,通过实现自定义版本的 Phaser类并覆盖 onAdvance() 方法来执行一些每个phase 都会改变的行动。你将要实现一个模拟测验,有些学生要完成他们的练习。全部的学生都必须完成同一个练习才能继续下一个练习。

准备

指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 打开并创建一个新的java项目。

怎么做呢

按照这些步骤来实现下面的例子::

001 package tool;
002  
003 import java.util.Date;
004 import java.util.concurrent.Phaser;
005 import java.util.concurrent.TimeUnit;
006  
007 //1.   创建一个类,名为 MyPhaser,并特别的扩展 Phaser 类。
008 public class MyPhaser extends Phaser {
009  
010     // 2. 覆盖 onAdvance() 方法。根据 phase 的属性的值,我们将调用不同的辅助方法。如果 phase 等于 0,调用
011     // studentsArrived() 方法;又如果 phase 等于 1,调用 finishFirstExercise() 方法;又如果 phase
012     // 等于 2,调用 finishSecondExercise() 方法;再如果 phase 等于 3,调用 finishExam()
013     // 方法。否则,返回真值,表示phaser已经终结。
014     @Override
015     protected boolean onAdvance(int phase, int registeredParties) {
016         switch (phase) {
017         case 0:
018             return studentsArrived();
019         case 1:
020             return finishFirstExercise();
021         case 2:
022             return finishSecondExercise();
023         case 3:
024             return finishExam();
025         default:
026             return true;
027         }
028     }
029  
030     // 3. 实现辅助方法 studentsArrived()。它在操控台写2条信息,并返回false值来表明phaser将继续执行。
031     private boolean studentsArrived() {
032         System.out.printf("Phaser: The exam are going to start. The students are ready.\n");
033         System.out.printf("Phaser: We have %d students.\n",
034                 getRegisteredParties());
035         return false;
036     }
037  
038     // 4. 实现辅助方法 finishFirstExercise()。它在操控台写2条信息,并返回false值来表明phaser将继续执行。
039     private boolean finishFirstExercise() {
040         System.out.printf("Phaser: All the students have finished the first exercise.\n");
041         System.out.printf("Phaser: It's time for the second one.\n");
042         return false;
043     }
044  
045     // 5. 实现辅助方法 finishSecondExercise()。它在操控台写2条信息,并返回false值来表明phaser将继续执行。
046     private boolean finishSecondExercise() {
047         System.out.printf("Phaser: All the students have finished the second exercise.\n");
048         System.out.printf("Phaser: It's time for the third one.\n");
049         return false;
050     }
051  
052     // 6. 实现辅助方法 finishExam()。它在操控台写2条信息,并返回false值来表明phaser将继续执行。
053     private boolean finishExam() {
054         System.out.printf("Phaser: All the students have finished the exam.\n");
055         System.out.printf("Phaser: Thank you for your time.\n");
056         return true;
057     }
058  
059     // 7. 创建一个类,名为 Student,并一定实现 Runnable 接口。这个类将模拟测验的学生。
060     public class Student implements Runnable {
061  
062         // 8. 声明 a Phaser 对象,名为 phaser.
063         private Phaser phaser;
064  
065         // 9. 实现类的构造函数,初始 Phaser 对象。
066         public Student(Phaser phaser) {
067             this.phaser = phaser;
068         }
069  
070         // 10. 实现 run() 方法,模拟真实测验。
071         @Override
072         public void run() {
073  
074             // 11. 首先,方法写一条信息到操控台表明学生到达考场并调用 phaser 的 arriveAndAwaitAdvance()
075             // 方法来等待其他线程们。
076             System.out.printf("%s: Has arrived to do the exam. %s\n", Thread
077                     .currentThread().getName(), new Date());
078             phaser.arriveAndAwaitAdvance();
079  
080             // 12. 然后,写信息到操控台,调用私有 doExercise1() 方法模拟第一场测验,写另一条信息到操控台并调用 phaser
081             // 的 arriveAndAwaitAdvance() 方法来等待其他学生结束第一场测验。
082             System.out.printf("%s: Is going to do the first exercise. %s\n",
083                     Thread.currentThread().getName(), new Date());
084             doExercise1();
085             System.out.printf("%s: Has done the first exercise. %s\n", Thread
086                     .currentThread().getName(), new Date());
087             phaser.arriveAndAwaitAdvance();
088  
089             // 13. 为第二场和第三场实现相同的代码。
090             System.out.printf("%s: Is going to do the second exercise.%s\n",
091                     Thread.currentThread().getName(), new Date());
092             doExercise2();
093             System.out.printf("%s: Has done the second exercise. %s\n", Thread
094                     .currentThread().getName(), new Date());
095             phaser.arriveAndAwaitAdvance();
096             System.out.printf("%s: Is going to do the third exercise. %s\n",
097                     Thread.currentThread().getName(), new Date());
098             doExercise3();
099             System.out.printf("%s: Has finished the exam. %s\n", Thread
100                     .currentThread().getName(), new Date());
101             phaser.arriveAndAwaitAdvance();
102         }
103  
104         // 14. 实现辅助方法 doExercise1()。此方法让线程随机休眠一段时间。
105         private void doExercise1() {
106             try {
107                 long duration = (long) (Math.random() * 10);
108                 TimeUnit.SECONDS.sleep(duration);
109             } catch (InterruptedException e) {
110                 e.printStackTrace();
111             }
112         }
113  
114         // 15. 实现辅助方法 doExercise2()。此方法让线程随机休眠一段时间。
115         private void doExercise2() {
116             try {
117                 long duration = (long) (Math.random() * 10);
118                 TimeUnit.SECONDS.sleep(duration);
119             } catch (InterruptedException e) {
120                 e.printStackTrace();
121             }
122         }
123  
124         // 16. 实现辅助方法 doExercise3()。此方法让线程随机休眠一段时间。
125         private void doExercise3() {
126             try {
127                 long duration = (long) (Math.random() * 10);
128                 TimeUnit.SECONDS.sleep(duration);
129             } catch (InterruptedException e) {
130                 e.printStackTrace();
131             }
132         }
133     }
134 }

实现例子的main类,创建名为Main的类并添加main() 方法。

01 package tool;
02  
03 import tool.MyPhaser.Student;
04  
05 //17.  实现例子的main类,创建名为Main的类并添加main() 方法。
06 public class Main {
07  
08     public static void main(String[] args) {
09  
10         // 18. 创建 MyPhaser对象。
11         MyPhaser phaser = new MyPhaser();
12  
13         // 19. 创建5个 Student 对象并使用register()方法在phaser中注册他们。
14         MyPhaser.Student students[] = new Student[5];
15         for (int i = 0; i < students.length; i++) {
16             students[i] = phaser.new Student(phaser);
17             phaser.register();
18         }
19  
20         // 20. 创建5个线程来运行students并开始它们。
21         Thread threads[] = new Thread[students.length];
22         for (int i = 0; i < students.length; i++) {
23             threads[i] = new Thread(students[i], "Student " + i);
24             threads[i].start();
25         }
26  
27         // 21. 等待5个线程的终结。
28         for (int i = 0; i < threads.length; i++) {
29             try {
30                 threads[i].join();
31             } catch (InterruptedException e) {
32                 e.printStackTrace();
33             }
34         }
35  
36         // 22. 调用isTerminated()方法来写一条信息表明phaser是在termination状态。
37         System.out.printf("Main: The phaser has finished: %s.\n",
38                 phaser.isTerminated());
39     }
40 }

它是怎么工作的…

这个练习模拟了有3个测验的真实测试。全部的学生必须都完成同一个测试才能开始下一个测试。为了实现这个必须使用同步,我们使用了Phaser类,但是你实现了你自己的phaser通过扩展原来的类,并覆盖onAdvance() 方法.

在阶段改变之前和在唤醒 arriveAndAwaitAdvance() 方法中休眠的全部线程们之前,此方法被 phaser 调用。这个方法接收当前阶段数作为参数,0是第一个phase ,还有注册的参与者数。最有用的参数是actual phase。如果你要基于不同的当前阶段执行不同的操作,那么你必须使用选择性结构(if/else 或 switch)来选择你想执行的操作。例子里,我们使用了 switch 结构来为每个phase的改变选择不同的方法。

onAdvance() 方法返回 Boolean 值表明 phaser 终结与否。如果返回 false 值,表示它还没有终结,那么线程将继续执行其他phases。如果phaser 返回真值,那么phaser将叫醒全部待定的线程们,并且转移phaser到terminated 状态,所以之后的任何对phaser的方法的调用都会被立刻返回,还有isTerminated() 方法将返回真值。

在核心类,当你创建 MyPhaser 对象,在phaser中你不用表示参与者的数量。你为每个 Student 对象调用了 register() 方法创建了phaser的参与者的注册。这个调用不会在Student 对象或者执行它的线程与phaser之间这个建立任何关系。 说真的,phaser的参与者数就是个数字而已。phaser与参与者之间没有任何关系。

下面的裁图展示了例子的执行结果:

你可以发现学生们结束第一个练习的时间是不同的。当全部都结束练习时,phaser 调用onAdvance() 方法写信息到操控台,接着全部的学生在同一时间开始第二场测试。

时间: 2024-09-17 03:15:27

线程同步工具(六)控制并发阶段性任务的改变的相关文章

线程同步工具(五)运行阶段性并发任务

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 运行阶段性并发任务 Java 并发 API 提供的一个非常复杂且强大的功能是,能够使用Phaser类运行阶段性的并发任务.当某些并发任务是分成多个步骤来执行时,那么此机制是非常有用的.Phaser类提供的机制是在每个步骤的结尾同步线程,所以除非全部线程完成第一个步骤,否则线程不能开始进行第二步. 相对于其他同步应用,我们必须初始化Phas

线程同步工具(一)控制并发访问资源

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷     控制并发访问资源 这个指南,你将学习怎样使用Java语言提供的Semaphore机制.Semaphore是一个控制访问多个共享资源的计数器. Semaphore的内容是由Edsger Dijkstra引入并在 THEOS操作系统上第一次使用. 当一个线程想要访问某个共享资源,首先,它必须获得semaphore.如果semaphor

线程同步工具(二)控制并发访问多个资源

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 控制并发访问多个资源 在并发访问资源的控制中,你学习了信号量(semaphores)的基本知识. 在上个指南,你实现了使用binary semaphores的例子.那种semaphores是用来保护访问一个共享资源的,或者说一个代码片段每次只能被一个线程执行.但是semaphores也可以用来保护多个资源的副本,也就是说当你有一个代码片段

线程同步工具(七)在并发任务间交换数据

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 在并发任务间交换数据 Java 并发 API 提供了一种允许2个并发任务间相互交换数据的同步应用.更具体的说,Exchanger 类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的. 这个类在遇到类似生产者和消费者问题时,是非常有用的.来一个

第三章-线程同步工具(引言)

章节提要: 并发地访问资源的控制 并发地访问多个副本资源的控制 等待多个并发事件 在一个相同点同步任务 并发的阶段性任务的运行 并发地阶段性任务的阶段改变的控制 在并发任务间改变数据 介绍 在第二章基本的线程同步中,我们学习了同步和critical section的内容.基本上,当多个并发任务共享一个资源时就称为同步,例如:一个对象或者一个对象的属性.访问这个资源的代码块称为:临界区. 如果机制没有使用恰当,那么可能会导致错误的结果,或者数据不一致,又或者出现异常情况.所以必须采取java语言提

基本线程同步(六)使用读/写锁同步数据访问

使用读/写锁同步数据访问 锁所提供的最重要的改进之一就是ReadWriteLock接口和唯一 一个实现它的ReentrantReadWriteLock类.这个类提供两把锁,一把用于读操作和一把用于写操作.同时可以有多个线程执行读操作,但只有一个线程可以执行写操作.当一个线程正在执行一个写操作,不可能有任何线程执行读操作. 在这个指南中,你将会学习如何使用ReadWriteLock接口实现一个程序,使用它来控制访问一个存储两个产品价格的对象. 准备工作- 你应该事先阅读使用Lock同步代码块的指南

线程同步工具(三)等待多个并发事件完成

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 等待多个并发事件完成 Java并发API提供这样的类,它允许1个或者多个线程一直等待,直到一组操作执行完成. 这个类就是CountDownLatch类.它初始一个整数值,此值是线程将要等待的操作数.当某个线程为了想要执行这些操作而等待时, 它要使用 await()方法.此方法让线程进入休眠直到操作完成. 当某个操作结束,它使用countD

线程同步工具(四)在同一个点同步任务

声明:本文是< Java 7 Concurrency Cookbook>的第三章, 作者: Javier Fernández González 译者:郑玉婷 在同一个点同步任务 Java 并发 API 提供了可以允许2个或多个线程在在一个确定点的同步应用.它是 CyclicBarrier 类.此类与在此章节的等待多个并发事件完成指南中的 CountDownLatch 类相似,但是它有一些特殊性让它成为更强大的类. CyclicBarrier 类有一个整数初始值,此值表示将在同一点同步的线程数量

四种进程或线程同步互斥的控制方法

进程|控制 很想整理一下自己对进程线程同步互斥的理解.正巧周六一个刚刚回到学校的同学请客吃饭.在吃饭的过程中,有两个同学,为了一个问题争论的面红耳赤.一个认为.Net下的进程线程控制模型更加合理.一个认为Java下的线程池策略比.Net的好.大家的话题一下转到了进程线程同步互斥的控制问题上.回到家,想了想就写了这个东东.  现在流行的进程线程同步互斥的控制机制,其实是由最原始最基本的4种方法实现的.由这4种方法组合优化就有了.Net和Java下灵活多变的,编程简便的线程进程控制手段.  这4种方