Fork/Join框架(四)异步运行任务

异步运行任务

当你在ForkJoinPool中执行ForkJoinTask时,你可以使用同步或异步方式来实现。当你使用同步方式时,提交任务给池的方法直到提交的任务完成它的执行,才会返回结果。当你使用异步方式时,提交任务给执行者的方法将立即返回,所以这个任务可以继续执行。

你应该意识到这两个方法有很大的区别,当你使用同步方法,调用这些方法(比如:invokeAll()方法)的任务将被阻塞,直到提交给池的任务完成它的执行。这允许ForkJoinPool类使用work-stealing算法,分配一个新的任务给正在执行睡眠任务的工作线程。反之,当你使用异步方法(比如:fork()方法),这个任务将继续它的执行,所以ForkJoinPool类不能使用work-stealing算法来提高应用程序的性能。在这种情况下,只有当你调用join()或get()方法来等待任务的完成时,ForkJoinPool才能使用work-stealing算法。

在这个指南中,你将学习如何使用ForkJoinPool和ForkJoinTask类提供的异步方法来管理任务。你将实现一个程序,在一个文件夹及其子文件夹内查找确定扩展名的文件。你将实现ForkJoinTask类来处理文件夹的内容。对于文件夹里的每个子文件夹,它将以异步的方式提交一个新的任务给ForkJoinPool类。对于文件夹里的每个文件,任务将检查文件的扩展名,如果它被处理,并把它添加到结果列表。

如何做…

按以下步骤来实现这个例子:

1.创建FolderProcessor类,指定它继承RecursiveTask类,并参数化为List<String>类型。

1 public class FolderProcessor extends RecursiveTask<List<String>> {

2.声明这个类的序列号版本UID。这个元素是必需的,因为RecursiveTask类的父类,ForkJoinTask类实现了Serializable接口。

1 private static final long serialVersionUID = 1L;

3.声明一个私有的、String类型的属性path。这个属性将存储任务将要处理的文件夹的全路径。

1 private String path;

4.声明一个私有的、String类型的属性extension。这个属性将存储任务将要查找的文件的扩展名。

1 private String extension;

5.实现这个类的构造器,初始化它的属性。

1 public FolderProcessor (String path, String extension) {
2 this.path=path;
3 this.extension=extension;
4 }

6.实现compute()方法。正如你用List<String>类型参数化RecursiveTask类,这个方法将返回这个类型的一个对象。

1 @Override
2 protected List<String> compute() {

7.声明一个String对象的数列,用来保存存储在文件夹中的文件。

1 List<String> list=new ArrayList<>();

8.声明一个FolderProcessor任务的数列,用来保存将要处理存储在文件夹内的子文件夹的子任务。

1 List<FolderProcessor> tasks=new ArrayList<>();

9.获取文件夹的内容。

1 File file=new File(path);
2 File content[] = file.listFiles();

10.对于文件夹里的每个元素,如果是子文件夹,则创建一个新的FolderProcessor对象,并使用fork()方法异步地执行它。

1 if (content != null) {
2 for (int i = 0; i < content.length; i++) {
3 if (content[i].isDirectory()) {
4 FolderProcessor task=new FolderProcessor(content[i].
5 getAbsolutePath(), extension);
6 task.fork();
7 tasks.add(task);

11.否则,使用checkFile()方法比较这个文件的扩展名和你想要查找的扩展名,如果它们相等,在前面声明的字符串数列中存储这个文件的全路径。

1 } else {
2 if (checkFile(content[i].getName())){
3 list.add(content[i].getAbsolutePath());
4 }
5 }
6 }

12.如果FolderProcessor子任务的数列超过50个元素,写入一条信息到控制台表明这种情况。

1 if (tasks.size()>50) {
2 System.out.printf("%s: %d tasks ran.\n",file.
3 getAbsolutePath(),tasks.size());
4 }

13.调用辅助方法addResultsFromTask(),将由这个任务发起的子任务返回的结果添加到文件数列中。传入参数:字符串数列和FolderProcessor子任务数列。

1 addResultsFromTasks(list,tasks);

14.返回字符串数列。

1 return list;

15.实现addResultsFromTasks()方法。对于保存在tasks数列中的每个任务,调用join()方法,这将等待任务执行的完成,并且返回任务的结果。使用addAll()方法将这个结果添加到字符串数列。

1 private void addResultsFromTasks(List<String> list,
2 List<FolderProcessor> tasks) {
3 for (FolderProcessor item: tasks) {
4 list.addAll(item.join());
5 }
6 }

16.实现checkFile()方法。这个方法将比较传入参数的文件名的结束扩展是否是你想要查找的。如果是,这个方法返回true,否则,返回false。

1 private boolean checkFile(String name) {
2 return name.endsWith(extension);
3 }

17.实现这个例子的主类,通过创建Main类,并实现main()方法。

1 public class Main {
2 public static void main(String[] args) {

18.使用默认构造器创建ForkJoinPool。

1 ForkJoinPool pool=new ForkJoinPool();

19.创建3个FolderProcessor任务。用不同的文件夹路径初始化每个任务。

1 FolderProcessor system=new FolderProcessor("C:\\Windows",
2 "log");
3 FolderProcessor apps=new
4 FolderProcessor("C:\\Program Files","log");
5 FolderProcessor documents=new FolderProcessor("C:\\Documents
6 And Settings","log");

20.在池中使用execute()方法执行这3个任务。

1 pool.execute(system);
2 pool.execute(apps);
3 pool.execute(documents);

21.将关于池每秒的状态信息写入到控制台,直到这3个任务完成它们的执行。

01 do {
02 System.out.printf("******************************************\n");
03 System.out.printf("Main: Parallelism: %d\n",pool.
04 getParallelism());
05 System.out.printf("Main: Active Threads: %d\n",pool.
06 getActiveThreadCount());
07 System.out.printf("Main: Task Count: %d\n",pool.
08 getQueuedTaskCount());
09 System.out.printf("Main: Steal Count: %d\n",pool.
10 getStealCount());
11 System.out.printf("*****************************************
12 *\n");
13 try {
14 TimeUnit.SECONDS.sleep(1);
15 } catch (InterruptedException e) {
16 e.printStackTrace();
17 }
18 } while((!system.isDone())||(!apps.isDone())||(!documents.
19 isDone()));

22.使用shutdown()方法关闭ForkJoinPool。

1 pool.shutdown();

23.将每个任务产生的结果数量写入到控制台。

1 List<String> results;
2 results=system.join();
3 System.out.printf("System: %d files found.\n",results.size());
4 results=apps.join();
5 System.out.printf("Apps: %d files found.\n",results.size());
6 results=documents.join();
7 System.out.printf("Documents: %d files found.\n",results.
8 size());

它是如何工作的…

以下截图显示了这个例子执行的一部分:

这个例子的关键是FolderProcessor类。每个任务处理文件夹的内容。如你所知,这个内容有以下两种元素:

  • 文件
  • 其他文件夹

如果任务找到一个文件夹,它创建另一个Task对象来处理这个文件夹,并使用fork()方法把它(Task对象)提交给池。这个方法提交给池的任务将被执行,如果池中有空闲的工作线程或池可以创建一个新的工作线程。这个方法会立即返回,所以这个任务可以继续处理文件夹的内容。对于每个文件,任务将它的扩展与所想要查找的(扩展)进行比较,如果它们相等,将文件名添加到结果数列。

一旦这个任务处理完指定文件夹的所有内容,它将使用join()方法等待已提交到池的所有任务的结束。这个方法在一个任务等待其执行结束时调用,并返回compute()方法返回的值。这个任务将它自己发送的所有任务的结果和它自己的结果分组,并返回作为compute()方法的一个返回值的数组。

ForkJoinPool类同时允许任务的执行以异步的方式。你已经使用execute()方法,提交3个初始任务给池。在Main类中,你也使用shutdown()方法结束池,并打印关于正在池中运行任务的状态和变化的信息。ForkJoinPool类包含更多方法,可用于这个目的(异步执行任务)。参见监控一个Fork/Join池指南,看这些方法完整的列表。

不止这些…

在这个示例中,你已经使用了join()方法来等待任务的结束,并获得它们的结果。对于这个目的,你也可以使用get()方法的两个版本之一:

  • get():这个版本的get()方法,如果ForkJoinTask已经结束它的执行,则返回compute()方法的返回值,否则,等待直到它完成。
  • get(long timeout, TimeUnit unit):这个版本的get()方法,如果任务的结果不可用,则在指定的时间内等待它。如果超时并且任务的结果仍不可用,这个方法返回null值。TimeUnit类是一个枚举类,包含以下常量:DAYS,HOURS,MICROSECONDS, MILLISECONDS,MINUTES, NANOSECONDS 和 SECONDS。

get()和join()有两个主要的区别:

  • join()方法不能被中断。如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。
  • 如果任务抛出任何未受检异常,get()方法将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。

参见

  • 在第5章,Fork/Join框架中的创建一个Fork/Join池指南
  • 在第8章,测试并发应用程序中的监控一个Fork/Join池指南
时间: 2025-01-19 07:14:42

Fork/Join框架(四)异步运行任务的相关文章

Java Fork Join 框架(四)性能

原文 http://gee.cs.oswego.edu/dl/papers/fj.pdf   作者:Doug Lea   译者:萧欢 4性能 如今,随着编译器与Java虚拟机性能的不断提升,性能测试结果也仅仅只能适用一时.但是,本节中所提到的测试结果数据却能揭示Fork/join框架的基本特性. 下面表格中简单介绍了在下文将会用到的一组fork/join测试程序.这些程序是从util.concurrent包里的示例代码改编而来,用来展示fork/join框架在解决不同类型的问题模型时所表现的差异

定制并发类(八)自定义在 Fork/Join 框架中运行的任务

声明:本文是< Java 7 Concurrency Cookbook>的第七章, 作者: Javier Fernández González 译者:郑玉婷 自定义在 Fork/Join 框架中运行的任务 执行者框架分开了任务的创建和运行.这样,你只要实现 Runnable 对象来使用 Executor 对象.你可以发送 Runnable 任务给执行者,然后它会创建,管理,并终结必要的线程来执行这些任务. Java 7 在 Fork/Join 框架中提供了特殊的执行者.这个框架是设计用来解决那

聊聊并发(八)——Fork/Join框架介绍

本文首发于InfoQ 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果.比如计算1+2+..+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇

Fork/Join框架(一)引言

在这个章节中,我们将覆盖: 创建一个Fork/Join池 加入任务的结果 异步运行任务 任务抛出异常 取消任务 引言 通常,当你实现一个简单的并发应用程序,你实现一些Runnable对象和相应的 Thread对象.在你的程序中,你控制这些线程的创建.执行和状态.Java 5引入了Executor和ExecutorService接口及其实现类进行了改进(比如:ThreadPoolExecutor类). 执行者框架将任务的创建与执行分离.有了它,你只要实现Runnable对象和使用Executor对

《Java 7并发编程实战手册》第五章Fork/Join框架

感谢人民邮电大学授权并发网发布此书样章,新书已上市,购买请进当当网 本章内容包含: 创建Fork/Join线程池 合并任务的结果 异步运行任务 在任务中抛出异常 取消任务 5.1 简介 通常,使用Java来开发一个简单的并发应用程序时,会创建一些Runnable对象,然后创建对应的Thread 对象来控制程序中这些线程的创建.执行以及线程的状态.自从Java 5开始引入了Executor和ExecutorService接口以及实现这两个接口的类(比如ThreadPoolExecutor)之后,使

Fork/Join框架(二)创建一个Fork/Join池

创建一个Fork/Join池 在这个指南中,你将学习如何使用Fork/Join框架的基本元素.它包括: 创建一个ForkJoinPool对象来执行任务. 创建一个ForkJoinPool执行的ForkJoinTask类. 你将在这个示例中使用Fork/Join框架的主要特点,如下: 你将使用默认构造器创建ForkJoinPool. 在这个任务中,你将使用Java API文档推荐的结构: 1 If (problem size < default size){ 2 tasks=divide(task

Fork/Join框架(六)取消任务

取消任务 当你在一个ForkJoinPool类中执行ForkJoinTask对象,在它们开始执行之前,你可以取消执行它们.ForkJoinTask类提供cancel()方法用于这个目的.当你想要取消一个任务时,有一些点你必须考虑一下,这些点如下: ForkJoinPool类并没有提供任何方法来取消正在池中运行或等待的所有任务. 当你取消一个任务时,你不能取消一个已经执行的任务. 在这个指南中,你将实现取消ForkJoinTask对象的例子.你将查找数在数组中的位置.第一个找到这个数的任务将取消剩

Fork/Join框架简介

1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务 ,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务 并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果.比如计算1+2+..+ 10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务

Java Fork/Join框架_java

Fork/Join框架是ExecutorService接口的一个实现,通过它我们可以实现多进程.Fork/Join可以用来将一个大任务递归的拆分为多个小任务,目标是充分利用所有的资源尽可能增强应用的性能. 和任何ExecutorService接口的实现一样,Fork/Join也会使用线程池来分布式的管理工作线程.Fork/Join框架的独特之处在于它使用了work-stealing(工作窃取)算法.通过这个算法,工作线程在无事可做时可以窃取其它正在繁忙的线程的任务来执行. Fork/Join框架