浅谈Java的Fork/Join并发框架

1. Fork/Join是什么

  Oracle的官方给出的定义是:Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。

 
Fork/Join实现了ExecutorService,所以它的任务也需要放在线程池中执行。它的不同在于它使用了工作窃取算法,空闲的线程可以从满负荷的线程中窃取任务来帮忙执行。(我个人理解的工作窃取大意就是:由于线程池中的每个线程都有一个队列,而且线程间互不影响。那么线程每次都从自己的任务队列的头部获取一个任务出来执行。如果某个时候一个线程的任务队列空了,而其余的线程任务队列中还有任务,那么这个线程就会从其他线程的任务队列中取一个任务出来帮忙执行。就像偷取了其他人的工作一样)

Fork/Join框架的核心是继承了AbstractExecutorService的ForkJoinPool类,它保证了工作窃取算法和ForkJoinTask的正常工作。

下面是引用Oracle官方定义的原文:

The fork/join framework is an implementation of the ExecutorService
interface that helps you take advantage of multiple processors. It is
designed for work that can be broken into smaller pieces recursively.
The goal is to use all the available processing power to enhance the
performance of your application.

As with any ExecutorService implementation, the fork/join framework
distributes tasks to worker threads in a thread pool. The fork/join
framework is distinct because it uses a work-stealing algorithm. Worker
threads that run out of things to do can steal tasks from other threads
that are still busy.

The center of the fork/join framework is the ForkJoinPool class, an
extension of the AbstractExecutorService class. ForkJoinPool implements
the core work-stealing algorithm and can execute ForkJoinTask processes.

2. Fork/Join的基本用法

(1)Fork/Join基类

  上文已经提到,Fork/Join就是要讲一个大的任务分割成若干小的任务,所以第一步当然是要做任务的分割,大致方式如下:


  1. if (这个任务足够小){ 
  2.   执行要做的任务 
  3. } else { 
  4.   将任务分割成两小部分 
  5.   执行两小部分并等待执行结果 

要实现FrokJoinTask我们需要一个继承了RecursiveTask或RecursiveAction的基类,并根据自身业务情况将上面的代码放入基类的coupute方法中。RecursiveTask和RecursiveAction都继承了FrokJoinTask,它俩的区别就是RecursiveTask有返回值而RecursiveAction没有。下面是我做的一个选出字符串列表中还有"a"的元素的Demo:


  1. @Override 
  2. protected List<String> compute() { 
  3.     // 当end与start之间的差小于阈值时,开始进行实际筛选 
  4.     if (end - this.start < threshold) { 
  5.         List<String> temp = list.subList(this.start, end); 
  6.         return temp.parallelStream().filter(s -> s.contains("a")).collect(Collectors.toList()); 
  7.     } else { 
  8.         // 如果当end与start之间的差大于阈值时 
  9.         // 将大任务分解成两个小任务。 
  10.         int middle = (this.start + end) / 2; 
  11.         ForkJoinTest left = new ForkJoinTest(list, this.start, middle, threshold); 
  12.         ForkJoinTest right = new ForkJoinTest(list, middle, end, threshold); 
  13.         // 并行执行两个“小任务” 
  14.         left.fork(); 
  15.         right.fork(); 
  16.         // 把两个“小任务”的结果合并起来 
  17.         List<String> join = left.join(); 
  18.         join.addAll(right.join()); 
  19.         return join; 
  20.     } 

(2)执行类

做好了基类就可以开始调用了,调用时首先我们需要Fork/Join线程池ForkJoinPool,然后向线程池中提交一个ForkJoinTask并得到结果。ForkJoinPool的submit方法的入参是一个ForkJoinTask,返回值也是一个ForkJoinTask,它提供一个get方法可以获取到执行结果。

代码如下:


  1. ForkJoinPool pool = new ForkJoinPool(); 
  2. // 提交可分解的ForkJoinTask任务 
  3. ForkJoinTask<List<String>> future = pool.submit(forkJoinService); 
  4. System.out.println(future.get()); 
  5. // 关闭线程池 
  6. pool.shutdown(); 

就这样我们就完成了一个简单的Fork/Join的开发。

提示:Java8中java.util.Arrays的parallelSort()方法和java.util.streams包中封装的方法也都用到了Fork/Join。(细心的读者可能注意到我在Fork/Join中也有用到stream,所以其实这个Fork/Join是多余的,因为stream已经实现了Fork/Join,不过这只是一个Demo展示,没有任何实际用处也就无所谓了)

引用官方原文:

One such implementation, introduced in Java SE 8, is used by the
java.util.Arrays class for its parallelSort() methods. These methods are
similar to sort(), but leverage concurrency via the fork/join
framework. Parallel sorting of large arrays is faster than sequential
sorting when run on multiprocessor systems.

Another implementation of the fork/join framework is used by methods
in the java.util.streams package, which is part of Project Lambda
scheduled for the Java SE 8 release.

附完整代码以便以后参考:

1. 定义抽象类(用于拓展,此例中没有实际作用,可以不定义此类):


  1. import java.util.concurrent.RecursiveTask; 
  2.  
  3. /** 
  4.  * Description: ForkJoin接口 
  5.  * Designer: jack 
  6.  * Date: 2017/8/3 
  7.  * Version: 1.0.0 
  8.  */ 
  9. public abstract class ForkJoinService<T> extends RecursiveTask<T>{ 
  10.     @Override 
  11.     protected abstract T compute(); 

2. 定义基类


  1. import java.util.List; 
  2. import java.util.stream.Collectors; 
  3.  
  4. /** 
  5.  * Description: ForkJoin基类 
  6.  * Designer: jack 
  7.  * Date: 2017/8/3 
  8.  * Version: 1.0.0 
  9.  */ 
  10. public class ForkJoinTest extends ForkJoinService<List<String>> { 
  11.  
  12.     private static ForkJoinTest forkJoinTest; 
  13.     private int threshold;  //阈值 
  14.     private List<String> list; //待拆分List 
  15.  
  16.     private ForkJoinTest(List<String> list, int threshold) { 
  17.         this.list = list; 
  18.         this.threshold = threshold; 
  19.     } 
  20.  
  21.     @Override 
  22.     protected List<String> compute() { 
  23.         // 当end与start之间的差小于阈值时,开始进行实际筛选 
  24.         if (list.size() < threshold) { 
  25.             return list.parallelStream().filter(s -> s.contains("a")).collect(Collectors.toList()); 
  26.         } else { 
  27.             // 如果当end与start之间的差大于阈值时,将大任务分解成两个小任务。 
  28.             int middle = list.size() / 2; 
  29.             List<String> leftList = list.subList(0, middle); 
  30.             List<String> rightList = list.subList(middle, list.size()); 
  31.             ForkJoinTest left = new ForkJoinTest(leftList, threshold); 
  32.             ForkJoinTest right = new ForkJoinTest(rightList, threshold); 
  33.             // 并行执行两个“小任务” 
  34.             left.fork(); 
  35.             right.fork(); 
  36.             // 把两个“小任务”的结果合并起来 
  37.             List<String> join = left.join(); 
  38.             join.addAll(right.join()); 
  39.             return join; 
  40.         } 
  41.     } 
  42.  
  43.     /** 
  44.      * 获取ForkJoinTest实例 
  45.      * @param list  待处理List 
  46.      * @param threshold 阈值 
  47.      * @return ForkJoinTest实例 
  48.      */ 
  49.     public static ForkJoinService<List<String>> getInstance(List<String> list, int threshold) { 
  50.         if (forkJoinTest == null) { 
  51.             synchronized (ForkJoinTest.class) { 
  52.                 if (forkJoinTest == null) { 
  53.                     forkJoinTest = new ForkJoinTest(list, threshold); 
  54.                 } 
  55.             } 
  56.         } 
  57.         return forkJoinTest; 
  58.     } 

3. 执行类 


  1. import java.util.ArrayList; 
  2. import java.util.Arrays; 
  3. import java.util.List; 
  4. import java.util.concurrent.ExecutionException; 
  5. import java.util.concurrent.ForkJoinPool; 
  6. import java.util.concurrent.ForkJoinTask; 
  7.  
  8. /** 
  9.  * Description: Fork/Join执行类 
  10.  * Designer: jack 
  11.  * Date: 2017/8/3 
  12.  * Version: 1.0.0 
  13.  */ 
  14. public class Test { 
  15.  
  16.     public static void main(String args[]) throws ExecutionException, InterruptedException { 
  17.  
  18.         String[] strings = {"a", "ah", "b", "ba", "ab", "ac", "sd", "fd", "ar", "te", "se", "te", 
  19.                 "sdr", "gdf", "df", "fg", "gh", "oa", "ah", "qwe", "re", "ty", "ui"}; 
  20.         List<String> stringList = new ArrayList<>(Arrays.asList(strings)); 
  21.  
  22.         ForkJoinPool pool = new ForkJoinPool(); 
  23.         ForkJoinService<List<String>> forkJoinService = ForkJoinTest.getInstance(stringList, 20); 
  24.         // 提交可分解的ForkJoinTask任务 
  25.         ForkJoinTask<List<String>> future = pool.submit(forkJoinService); 
  26.         System.out.println(future.get()); 
  27.         // 关闭线程池 
  28.         pool.shutdown(); 
  29.  
  30.     } 
  31.  

作者:珂jack

来源:51CTO

时间: 2024-10-30 11:53:08

浅谈Java的Fork/Join并发框架的相关文章

浅谈java中异步多线程超时导致的服务异常_java

在项目中为了提高大并发量时的性能稳定性,经常会使用到线程池来做多线程异步操作,多线程有2种,一种是实现runnable接口,这种没有返回值,一种是实现Callable接口,这种有返回值. 当其中一个线程超时的时候,理论上应该不 影响其他线程的执行结果,但是在项目中出现的问题表明一个线程阻塞,其他线程返回的接口都为空.其实是个很简单的问题,但是由于第一次碰到,还是想了一些时间的.很简单,就是因为阻塞的那个线 程没有释放,并发量一大,线程池数量就满了,所以其他线程都处于等待状态. 附上一段自己写的调

浅谈java 执行jar包中的main方法_java

浅谈java 执行jar包中的main方法 通过 OneJar 或 Maven 打包后 jar 文件,用命令: java -jar ****.jar 执行后总是运行指定的主方法,如果 jar 中有多个 main 方法,那么如何运行指定的 main 方法呢? 用下面的命令试试看: java -classpath ****.jar ****.****.className [args] "****.****"表示"包名": "className"表示&

浅谈java异常链与异常丢失_java

1.在java的构造方法中提供了 异常链.. 也就是我们可以通过构造方法不断的将 异常串联成一个异常链...   之所以需要异常连,是因为处于代码的可理解性,以及阅读和程序的可维护性...  我们知道我们每抛出一个异常都需要进行try catch ...那么岂不是代码很臃肿... 我们如果可以将异常串联成一个异常连,然后我们只捕获我们的包装 异常,我们知道 RuntimeException 以及其派生类可以不进行try catch 而被jvm自动捕获并处理.. 当然了我们可以自己定义自己的异常类

浅谈Java 对于继承的初级理解_java

概念:继承,是指一个类的定义可以基于另外一个已存在的类,即子类继承父类,从而实现父类的代码的重用.两个类的关系:父类一般具有各个子类共性的特征,而子类可以增加一些更具个性的方法.类的继承具有传递性,即子类还可以继续派生子类,位于上层的类概念更加抽象,位于下层的类的概念更加具体. 1.定义子类: 语法格式 [修饰符] class 子类名 extends 父类名{ 子类体 } 修饰符:public private protected default 子类体是子类在继承父类的内容基础上添加的新的特有内

浅谈java异常处理(父子异常的处理)_java

我当初学java异常处理的时候,对于父子异常的处理,我记得几句话"子类方法只能抛出父类方法所抛出的异常或者是其子异常,子类构造器必须要抛出父类构造器的异常或者其父异常".那个时候还不知道子类方法为什么要这样子抛出异常,后来通过学习<Thinking in Java>,我才明白其中的道理,现在我再来温习一下. 一.子类方法只能抛出父类方法的异常或者是其子异常 对于这种限制,主要是因为子类在做向上转型的时候,不能正确地捕获异常 package thinkinginjava; p

浅谈java异常处理之空指针异常_java

听老师说,在以后的学习中大部分的异常都是空指针异常.所以抽点打游戏的时间来查询一下什么是空指针异常 一:空指针异常产生的主要原因如下: (1)当一个对象不存在时又调用其方法会产生异常obj.method() // obj对象不存在 (2)当访问或修改一个对象不存在的字段时会产生异常obj.method() // method方法不存在 (3)字符串变量未初始化: (4)接口类型的对象没有用具体的类初始化,比如: List lt:会报错 List lt = new ArrayList():则不会报

浅谈Java反射与代理_java

Java反射机制与动态代理,使得Java更加强大,Spring核心概念IoC.AOP就是通过反射机制与动态代理实现的. 1 Java反射 示例: User user = new User(); user.setTime5Flag("test"); Class<?> cls = Class.forName("com.test.User"); //接口必须public,无论是否在本类内部使用!或者使用cls.getDeclaredMethod(),或者遍历修

浅谈java中BigDecimal的equals与compareTo的区别_java

这两天在处理支付金额校验的时候出现了点问题,有个金额比较我用了BigDecimal的equals方法来比较两个金额是否相等,结果导致金额比较出现错误(比如3.0与3.00的比较等). [注:以下所讲都是以sun jdk 1.4.2版本为例,其他版本实现未必一致,请忽略] 首先看一下BigDecimal的equals方法: public boolean equals(Object x){ if (!(x instanceof BigDecimal)) return false; BigDecima

浅谈java+内存分配及变量存储位置的区别_java

Java内存分配与管理是Java的核心技术之一,之前我们曾介绍过Java的内存管理与内存泄露以及Java垃圾回收方面的知识,今天我们再次深入Java核心,详细介绍一下Java在内存分配方面的知识.一般Java在内存分配时会涉及到以下区域: ◆寄存器:我们在程序中无法控制 ◆栈:存放基本类型的数据和对象的引用,但对象本身不存放在栈中,而是存放在堆中(new 出来的对象) ◆堆:存放用new产生的数据 ◆静态域:存放在对象中用static定义的静态成员 ◆常量池:存放常量 ◆非RAM存储:硬盘等永久