Fork/Join框架(六)取消任务

取消任务

当你在一个ForkJoinPool类中执行ForkJoinTask对象,在它们开始执行之前,你可以取消执行它们。ForkJoinTask类提供cancel()方法用于这个目的。当你想要取消一个任务时,有一些点你必须考虑一下,这些点如下:

  • ForkJoinPool类并没有提供任何方法来取消正在池中运行或等待的所有任务。
  • 当你取消一个任务时,你不能取消一个已经执行的任务。

在这个指南中,你将实现取消ForkJoinTask对象的例子。你将查找数在数组中的位置。第一个找到这个数的任务将取消剩下的任务(未找到这个数的任务)。由于Fork/Join框架并没有提供这种功能,所以,你将实现一个辅助类来做这个取消功能。

准备工作…

这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。

如何做…

按以下步骤来实现这个例子:

1.创建ArrayGenerator类。这个类将产生一组随机的、指定大小的整数数字。实现generateArray()方法。它将产生一组数字,它接收数组大小作为参数。

1 public class ArrayGenerator {
2 public int[] generateArray(int size) {
3 int array[]=new int[size];
4 Random random=new Random();
5 for (int i=0; i<size; i++){
6 array[i]=random.nextInt(10);
7 }
8 return array;
9 }

2.创建一个TaskManager类。我们将使用这个类来存储在ForkJoinPool中执行的所有任务。由于ForkJoinPool和ForkJoinTask类的局限性,你将使用这个类来取消ForkJoinPool类的所有任务。

1 public class TaskManager {

3.声明一个对象参数化为ForkJoinTask类型的数列,其中ForkJoinTask类参数化为Integer类型。

1 private List<ForkJoinTask<Integer>> tasks;

4.实现这个类的构造器,它初始化任务数列。

1 public TaskManager(){
2 tasks=new ArrayList<>();
3 }

5.实现addTask()方法。它添加ForkJoinTask对象到任务数列。

1 public void addTask(ForkJoinTask<Integer> task){
2 tasks.add(task);
3 }

6.实现cancelTasks()方法。它将使用cancel()方法取消在数列中的所有ForkJoinTask对象。它接收一个想要取消剩余任务的ForkJoinTask对象作为参数。这个方法取消所有任务。

1 public void cancelTasks(ForkJoinTask<Integer> cancelTask){
2 for (ForkJoinTask<Integer> task :tasks) {
3 if (task!=cancelTask) {
4 task.cancel(true);
5 ((SearchNumberTask)task).writeCancelMessage();
6 }
7 }
8 }

7.实现SearchNumberTask类,指定它继承参数化为Integer类型的RecursiveTask类。这个类将查找在整数数组的元素块中的数。

1 public class SearchNumberTask extends RecursiveTask<Integer> {

8.声明一个私有的、int类型的数字数组。

1 private int numbers[];

9.声明两个私有的、int类型的属性start和end。这些属性将决定任务要处理的数组的元素。

1 private int start, end;

10.声明一个私有的、int类型的属性number,它将存储你将要查找的数。

1 private int number;

11.声明一个私有的、TaskManager类型的属性manager。你将使用这个对象来取消所有任务。

1 private TaskManager manager;

12.声明一个私有的、int类型的常量并初始化它为值-1。当任务没有找到这个数时,它将作为任务的返回值。

1 private final static int NOT_FOUND=-1;

13.实现这个类的构造器来初始化它的属性。

1 public Task(int numbers[], int start, int end, int number,
2 TaskManager manager){
3 this.numbers=numbers;
4 this.start=start;
5 this.end=end;
6 this.number=number;
7 this.manager=manager;
8 }

14.实现compute()方法。写入一条信息(start和end属性值)到控制台表明这个方法的开始。

1 @Override
2 protected Integer compute() {
3 System.out.println("Task: "+start+":"+end);

15.如果start和end之差大于10(这个任务将处理超过10个元素的数组),调用launchTasks()方法,将这个任务的工作拆分成两个任务。

1 int ret;
2 if (end-start>10) {
3 ret=launchTasks();

16.否则,这个任务调用lookForNumber()方法来查找在数组块中的数。

1 } else {
2 ret=lookForNumber();
3 }

17.返回任务的结果。

1 return ret;

18.实现lookForNumber()方法。

1 private int lookForNumber() {

19.对于任务要处理的元素块中的所有元素,将你想要查找的数与存储在元素中的值进行比较。如果他们相等,写入一条信息到控制台表明这种情形,使用TaskManager对象的cancelTasks()方法来取消所有任务,并返回你已经找到的这个数对应元素的位置。

1 for (int i=start; i<end; i++){
2 if (array[i]==number) {
3 System.out.printf("Task: Number %d found in position
4 %d\n",number,i);
5 manager.cancelTasks(this);
6 return i;
7 }

20.在循环的内部,令任务睡眠1秒。

1 try {
2 TimeUnit.SECONDS.sleep(1);
3 } catch (InterruptedException e) {
4 e.printStackTrace();
5 }
6 }

21.最后,返回值-1。

1 return NOT_FOUND;
2 }

22.实现launchTasks()方法。首先,将这个任务要处理的数块分成两个部分,然后,创建两个Task对象来处理它们。

1 private int launchTasks() {
2 int mid=(start+end)/2;
3 Task task1=new Task(array,start,mid,number,manager);
4 Task task2=new Task(array,mid,end,number,manager);

23.添加这个任务到TaskManager对象中。

1 manager.addTask(task1);
2 manager.addTask(task2);

24.使用fork()方法异步执行这两个任务。

1 task1.fork();
2 task2.fork();

25.等待这个任务的结束,返回第一个任务的结果(如果它不等于1),或第二个任务的结果。

1 int returnValue;
2 returnValue=task1.join();
3 if (returnValue!=-1) {
4 return returnValue;
5 }
6 returnValue=task2.join();
7 return returnValue;

26.实现writeCancelMessage()方法,当任务取消时,写一条信息到控制台。

1 public void writeCancelMessage(){
2 System.out.printf("Task: Canceled task from %d to
3 %d",start,end);
4 }

27.实现这个例子的主类,通过创建Main类,并实现main()方法。

1 public class Main {
2 public static void main(String[] args) {

28.使用ArrayGenerator类,创建一个有1000个数字的数组。

1 ArrayGenerator generator=new ArrayGenerator();
2 int array[]=generator.generateArray(1000);

29.创建一个TaskManager对象。

1 TaskManager manager=new TaskManager();

30.使用默认的构造器创建一个ForkJoinPool对象。

1 ForkJoinPool pool=new ForkJoinPool();

31.创建一个Task对象来处理前面生成的数组。

1 Task task=new Task(array,0,1000,5,manager);

32.使用execute()方法,在池中异步执行任务。

1 pool.execute(task);

33.使用shutdown()方法关闭这个池。

1 pool.shutdown();

34.使用ForkJoinPool类的awaitTermination()方法,等待任务的结束。

1 try {
2 pool.awaitTermination(1, TimeUnit.DAYS);
3 } catch (InterruptedException e) {
4 e.printStackTrace();
5 }

35.写入一条信息到控制台,表明程序的结束。

1 System.out.printf("Main: The program has finished\n");

它是如何工作的…

ForkJoinTask提供cancel()方法,允许你取消一个还未执行的任务。这是一个非常重要的点。如果任务已经开始它的执行,那么调用cancel()方法对它没有影响。这个方法接收一个Boolean值,名为mayInterruptIfRunning的参数。这个名字可能让你觉得,如果你传入一个true值给这个方法,这个任务将被取消,即使它正在运行。

Java API文档指出,在ForkJoinTask类的默认实现中,这个属性不起作用。任务只能在它们还未开始执行时被取消。一个任务的取消不会影响到已经提到到池的(其他)任务。它们继续它们的执行。 Fork/Join框架的一个局限性是,它不允许取消在ForkJoinPool中的所有任务。为了克服这个限制,你实现了TaskManager类。它存储被提到池中的所有任务。它有一个方法取消它存储的所有任务。如果一个任务由于它正在运行或已经完成而不能被取消,cancel()方法返回false值,所以,你可以尝试取消所有任务,而不用担心可能有间接的影响。 在这个例子中,你已经实现一个任务,用来在一个数字数组中查找一个数。如Fork/Join框架所推荐的,你将问题分解成更小的子问题。你只关心这个数的出现,所以当你找到它,你取消了其他任务。 以下截图显示这个例子执行的一部分: 参见

  • 在第5章,Fork/Join框架中的创建一个Fork/Join池指南
时间: 2024-09-23 08:22:21

Fork/Join框架(六)取消任务的相关文章

Fork/Join框架(一)引言

在这个章节中,我们将覆盖: 创建一个Fork/Join池 加入任务的结果 异步运行任务 任务抛出异常 取消任务 引言 通常,当你实现一个简单的并发应用程序,你实现一些Runnable对象和相应的 Thread对象.在你的程序中,你控制这些线程的创建.执行和状态.Java 5引入了Executor和ExecutorService接口及其实现类进行了改进(比如:ThreadPoolExecutor类). 执行者框架将任务的创建与执行分离.有了它,你只要实现Runnable对象和使用Executor对

《Java 7并发编程实战手册》第五章Fork/Join框架

感谢人民邮电大学授权并发网发布此书样章,新书已上市,购买请进当当网 本章内容包含: 创建Fork/Join线程池 合并任务的结果 异步运行任务 在任务中抛出异常 取消任务 5.1 简介 通常,使用Java来开发一个简单的并发应用程序时,会创建一些Runnable对象,然后创建对应的Thread 对象来控制程序中这些线程的创建.执行以及线程的状态.自从Java 5开始引入了Executor和ExecutorService接口以及实现这两个接口的类(比如ThreadPoolExecutor)之后,使

聊聊并发(八)——Fork/Join框架介绍

本文首发于InfoQ 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果.比如计算1+2+..+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇

Fork/Join框架简介

1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务 ,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务 并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果.比如计算1+2+..+ 10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务

Java Fork/Join框架_java

Fork/Join框架是ExecutorService接口的一个实现,通过它我们可以实现多进程.Fork/Join可以用来将一个大任务递归的拆分为多个小任务,目标是充分利用所有的资源尽可能增强应用的性能. 和任何ExecutorService接口的实现一样,Fork/Join也会使用线程池来分布式的管理工作线程.Fork/Join框架的独特之处在于它使用了work-stealing(工作窃取)算法.通过这个算法,工作线程在无事可做时可以窃取其它正在繁忙的线程的任务来执行. Fork/Join框架

Fork/Join框架(三)加入任务的结果

加入任务的结果 Fork/Join框架提供了执行返回一个结果的任务的能力.这些任务的类型是实现了RecursiveTask类.这个类继承了ForkJoinTask类和实现了执行者框架提供的Future接口. 在任务中,你必须使用Java API方法推荐的结构: 1 If (problem size < size){ 2 tasks=Divide(task); 3 execute(tasks); 4 groupResults() 5 return result; 6 } else { 7 reso

定制并发类(八)自定义在 Fork/Join 框架中运行的任务

声明:本文是< Java 7 Concurrency Cookbook>的第七章, 作者: Javier Fernández González 译者:郑玉婷 自定义在 Fork/Join 框架中运行的任务 执行者框架分开了任务的创建和运行.这样,你只要实现 Runnable 对象来使用 Executor 对象.你可以发送 Runnable 任务给执行者,然后它会创建,管理,并终结必要的线程来执行这些任务. Java 7 在 Fork/Join 框架中提供了特殊的执行者.这个框架是设计用来解决那

定制并发类(七)实现ThreadFactory接口生成自定义的线程给Fork/Join框架

声明:本文是< Java 7 Concurrency Cookbook>的第七章,作者: Javier Fernández González     译者:许巧辉 实现ThreadFactory接口生成自定义的线程给Fork/Join框架 Fork/Join框架是Java7中最有趣的特征之一.它是Executor和ExecutorService接口的一个实现,允许你执行Callable和Runnable任务而不用管理这些执行线程. 这个执行者面向执行能被拆分成更小部分的任务.主要组件如下: 一

Fork/Join框架(四)异步运行任务

异步运行任务 当你在ForkJoinPool中执行ForkJoinTask时,你可以使用同步或异步方式来实现.当你使用同步方式时,提交任务给池的方法直到提交的任务完成它的执行,才会返回结果.当你使用异步方式时,提交任务给执行者的方法将立即返回,所以这个任务可以继续执行. 你应该意识到这两个方法有很大的区别,当你使用同步方法,调用这些方法(比如:invokeAll()方法)的任务将被阻塞,直到提交给池的任务完成它的执行.这允许ForkJoinPool类使用work-stealing算法,分配一个新