Java7中的ForkJoin并发框架初探(中)

根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask这样5个类。本文就对JDK1.7中增加这5个工具类实现做简要分析。

0. JDK中ForkJoin实现概述

在JavaSE7的API和JDK1.7中,分别集成了支持ForkJoin的五个类:

  • ForkJoinPool 实现ForkJoin的线程池
  • ForkJoinWorkerThread  实现ForkJoin的线程
  • ForkJoinTask<V> 一个描述ForkJoin的抽象类
  • RecursiveAction 无返回结果的ForkJoinTask实现
  • RecursiveTask<V> 有返回结果的ForkJoinTask实现

ForkJoinPool维护了多个线程构成的数组,维护了任务提交队列,给出了多个线程之间工作窃取的实现。给出了任务类型适配,和提交任务逻辑的实现。需要和线程紧密配合。

而ForkJoinWorkerThread则继承了java.lang.Thread类,维护了线程自己的队列,同一个任务fork()操作原则上会添加到同一个线程队列中。而这个线程类需要和ForkJoinPool紧密合作,有指向对应ForkJoinPool对象的引用。

ForkJoinTask则实现了Future接口,除了对接口的实现外,主要是fork()和join()操作。注意,貌似fork()只有ForkJoinWorkerThread 中才能执行。

两个子类RecursiveAction和RecursiveTask则实现比较简单,区别就在于返回值的处理不同。

1. ForkJoinPool

ForkJoinPool是实现了 Fork Join 的线程池。看JDK源码我们知道ForkJoinPool是extends AbstractExecutorService的,也就是说间接地实现了Executor和ExecutorService接口。实际上也就意味着ForkJoinPool是继ThreadPoolExecutor后的又一个Executor(Service)的具体实现。

1.1. 构建初始化

我们先看ForkJoinPool的构造方法,一共有3个重载的实现。有一个单参数的默认实现,通常我们使用这个就足够了,这最终会以默认的参数调用3参数的构造方法。我们再来看3个参数的构造方法实现。其中:

  • int parallelism 第一个参数是并行度,这个参数简介影响着(会额外做一些运算)这个ForkJoinPool的ForkJoinWorkerThread 线程数。默认情况下,这个参数是任务运行环境的处理器个数,比如系统提供的处理器数目为4,初始化线程池会开启16个线程。
  • ForkJoinWorkerThreadFactory factory 这个是ForkJoinPool构建新线程ForkJoinWorkerThread 对象的工厂,类似于ThreadPoolExecutor中用到的ThreadFactory。
  • Thread.UncaughtExceptionHandler handler 这个前面并发的文章页提到过,是线程异常处理器,这里不多说了。

1.2. 任务提交

前面已经提到,ForkJoinPool也是Executor(Service)的实现,那么execute()和submit()这样向ThreadPoolExecutor提交任务的方法对于ForkJoinPool来说也是一样有效的。

需要说明的是,除了增加支持ForkJoinTask对象参数的重载实现外,还在Runnable和Callable参数的方法中对原始的Runnable和Callable对象做了到ForkJoinTask的适配,使用的分别是ForkJoinTask的静态内部类AdaptedRunnable和AdaptedCallable的对象。而这两个类型参数对应的方法最终都会调用ForkJoinTask参数的方法:


1

2

3

4

5

6

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {

    if (task == null)

        throw new NullPointerException();

    forkOrSubmit(task);

    return task;

}

我们接下来再看下任务提交中被调用到的forkOrSubmit()方法:


1

2

3

4

5

6

7

8

9

10

11

private <T> void forkOrSubmit(ForkJoinTask<T> task) {

    ForkJoinWorkerThread w;

    Thread t = Thread.currentThread();

    if (shutdown)

        throw new RejectedExecutionException();

    if ((t instanceof ForkJoinWorkerThread) &&

        (w = (ForkJoinWorkerThread)t).pool == this)

        w.pushTask(task);

    else

        addSubmission(task);

}

逻辑很容易理解,先判断ForkJoinPool的状态,若已停止,则抛异常返回。之后如果当前线程是ForkJoinWorkerThread类型的,则将任务追加到ForkJoinWorkerThread对象中维护的队列上,否则将新的任务放入ForkJoinPool的提交队列中,并通知线程工作。

1.3. 线程的启动和工作

前面已经强调过,ForkJoinPool和ForkJoinWorkerThread是紧密相关,耦合在一起的。Thread的start()会调用run(),而ForkJoinWorkerThread类重写了run()方法,会调用对应的线程池ForkJoinPool对象的work()方法。

我们来看一下work()方法的实现。


1

2

3

4

5

6

7

8

9

10

11

final void work(ForkJoinWorkerThread w) {

    boolean swept = false;                // true on empty scans

    long c;

    while (!w.terminate && (int)(c = ctl) >= 0) {

        int a;                            // active count

        if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)

            swept = scan(w, a);

        else if (tryAwaitWork(w, c))

            swept = false;

    }

}

里面主要是一个while循环体,只要当前的线程和线程池不是处于终止状态,则这个循环一直执行。执行的内容则是这样的,如果能够根据scan()方法得到任务,并执行,否则进入阻塞状态。

我们来看一下scan()方法的实现。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

private boolean scan(ForkJoinWorkerThread w, int a) {

    int g = scanGuard; // mask 0 avoids useless scans if only one active

    int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;

    ForkJoinWorkerThread[] ws = workers;

    if (ws == null || ws.length <= m)         // staleness check

        return false;

    for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {

        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;

        ForkJoinWorkerThread v = ws[k & m];

        if (v != null && (b = v.queueBase) != v.queueTop &&

            (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {

            long u = (i << ASHIFT) + ABASE;

            if ((t = q[i]) != null && v.queueBase == b &&

                UNSAFE.compareAndSwapObject(q, u, t, null)) {

                int d = (v.queueBase = b + 1) - v.queueTop;

                v.stealHint = w.poolIndex;

                if (d != 0)

                    signalWork();             // propagate if nonempty

                w.execTask(t);

            }

            r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);

            return false;                     // store next seed

        }

        else if (j < 0) {                     // xorshift

            r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;

        }

        else

            ++k;

    }

    if (scanGuard != g)                       // staleness check

        return false;

    else {                                    // try to take submission

        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;

        if ((b = queueBase) != queueTop &&

            (q = submissionQueue) != null &&

            (i = (q.length - 1) & b) >= 0) {

            long u = (i << ASHIFT) + ABASE;

            if ((t = q[i]) != null && queueBase == b &&

                UNSAFE.compareAndSwapObject(q, u, t, null)) {

                queueBase = b + 1;

                w.execTask(t);

            }

            return false;

        }

        return true;                         // all queues empty

    }

}

看起来很复杂,实际的原理则很简单,就是先尝试做任务窃取( Work Stealing ),如果不满足条件则到提交队列中获取任务。而ForkJoinWorkerThread线程本身也维护了线程内fork和join任务操作得到的队列,结合起来,总体执行任务的顺序就是:

  • 线程会先执行ForkJoinWorkerThread对象内维护的任务队列中的任务,即ForkJoinWorkerThread的execTask()方法中的循环实现。通常是LIFO,即去最新的任务。也有特殊情况,这个根据变量locallyFifo的值来判断。
  • 之后会尝试做任务窃取,尝试从其他线程中获取任务
  • 任务窃取条件不满足时,到提交队列中获取提交的任务

1.4. ForkJoinPool的其它属性

除了上述提到的操作,ForkJoin中还维护了

  • 线程数组和提交任务的队列,这是最基本的
  • 操作相关的锁和条件对象
  • volatile long ctl; 等线程池ForkJoinPool状态的属性
  • static final Random workerSeedGenerator; 等和任务窃取策略相关的一系列属性
  •  private volatile long stealCount; 等数据统计相关属性

等数据属性。

2. ForkJoinWorkerThread

ForkJoinWorkerThread扩展于Thread类,但提供了很多支持ForkJoin的特性。

上文在介绍ForkJoinPool的时候已经对这个类做了很多描述,也强调过线程类ForkJoinWorkerThread和ForkJoinPool相互依赖,放在一起才有意义。实际上,还要提到描述Fork Join任务的类ForkJoinTask。

除了上面提到的以外,对于ForkJoinWorkerThread这个类,再稍微提一下这样几个点:

  • ForkJoinTask<?>[] queue; 这是维护和ForkJoin相关的(子)任务队列,还有queueTop和queueBase属性,分别标记队列的尾部和头部
  • final ForkJoinPool pool; 指向线程池的引用,需要注意的是,这个属性被final修饰
  • 和ForkJoinTask的fork()和join()方法相关的方法——pushTask()和unpushTask(),分别负责在当前ForkJoinWorkerThread对象维护的队列中新增和取回任务
  • 其它与状态和统计相关的属性

3. ForkJoinTask及两个抽象子类

ForkJoinTask是ForkJoin框架中的主体,是ForkJoin中任务的体现。这个类实现了Future和Serializable接口。除了Futrue接口要满足的方法外,我想有这样3个方法是有必要知道的,分别是fork()、join()和exec()。

对于fork(),这个也许大家都很熟悉了,在这里也就是分解出子任务的执行。这个在实现上很简单那,就是在当前线程ForkJoinWorkerThread对象维护的队列中加入新的子任务。实现如下:



public final ForkJoinTask fork() {

    ((ForkJoinWorkerThread) Thread.currentThread())

        .pushTask(this);

    return this;

}

需要注意的是fork()方法的调用是在当前线程对象为ForkJoinWorkerThread的条件下。

我们再来看看对应的join()实现:


1

2

3

4

5

6

public final V join() {

    if (doJoin() != NORMAL)

        return reportResult();

    else

        return getRawResult();

}

显然,它有调用了doJoin()方法,我们再来深入了解下。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

private int doJoin() {

    Thread t; ForkJoinWorkerThread w; int s; boolean completed;

    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {

        if ((s = status) < 0)

            return s;

        if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {

            try {

                completed = exec();

            catch (Throwable rex) {

                return setExceptionalCompletion(rex);

            }

            if (completed)

                return setCompletion(NORMAL);

        }

        return w.joinTask(this);

    }

    else

        return externalAwaitDone();

}

大概的逻辑是这样的,在当前线程对象为ForkJoinWorkerThread的条件下,从队列中取回当前任务ForkJoinTask对象,并尝试在调用线程对其直接执行,否则当前线程调用wait()阻塞等待。更深入的理解可续继续查阅源码。

最后,我们再来看看exec()方法,这个是在ForkJoinTask中是没有给出实现的。

在JDK中,有ForkJoinTask的两个抽象子类RecursiveAction和RecursiveTask,他们分别给出了exec()的实现,这也是这两个子类主要做的事情,实际上是调用了各自的compute()方法,而在RecursiveAction和RecursiveTask中compute()又是未给出实现的。

实际上,compute()方法就是Fork Join要执行的内容,是Fork Join任务的实质,需要开发者给出。

而RecursiveAction和RecursiveTask就是方便开发者使用Fork Join的,RecursiveAction和RecursiveTask这两个类的区别仅仅是返回结果的情况不同。而这个compute()方法就是留给开发者继承扩展使用的。这个会在下篇文章详细讲述。

特别说明:尊重作者的劳动成果,转载请注明出处哦~~~http://blog.yemou.net/article/query/info/tytfjhfascvhzxcytp85

时间: 2024-09-11 04:48:40

Java7中的ForkJoin并发框架初探(中)的相关文章

Java7中的ForkJoin并发框架初探(上)

这篇我们来简要了解一下JavaSE7中提供的一个新特性 -- Fork Join 框架. 0. 处理器发展和需求背景 回想一下并发开发的初衷,其实可以说是有两点,或者说可以从两个方面看. 对于单核的处理器来说,在进行IO操作等比较费时的操作进行时,如果执行任务的方式是单任务的,那么CPU将会"空转",知道IO操作结束.如果有多任务的调度机制,则在一个任务不需要CPU支持的时候,CPU可以被调度处理其他任务.简单地讲,并发可以提高CPU计算资源的利用率. 对于多核,或者多个计算资源的情况

Java7中的ForkJoin并发框架初探(下)

前两篇文章已经对Fork Join的设计和JDK中源码的简要分析.这篇文章,我们来简单地看看我们在开发中怎么对JDK提供的工具类进行应用,以提高我们的需求处理效率. Fork Join这东西确实用好了能给我们的任务处理提高效率,也为开发带来方便.但Fork Join不是那么容易用好的,我们先来看几个例子(反例). 0. 反例错误分析 我们先来看看这篇文章中提供的例子:http://www.iteye.com/topic/643724 (因为是反例,就不提供超链接了,只以普通文本给出URL) 这篇

C#开发微信门户及应用(48) - 在微信框架中整合CacheManager 缓存框架

 在我们的很多框架或者项目应用中,缓存在一定程度上可以提高程序的响应速度,以及减轻服务器的承载压力,因此在一些地方我们都考虑引入缓存模块,这篇随笔介绍使用开源缓存框架CacheManager来实现数据的缓存,在微信开发框架中,我们有一些常用的处理也需要应用到缓存,因此本随笔以微信框架为例介绍缓存的实际使用,实际上,在我们很多框架中,如混合式开发框架.Web开发框架.Bootstrap开发框架中,这个模块都是通用的. 1.框架的缓存设计 在我们的微信开发框架中,缓存作为数据库和对外接口之间的一个分

怎么在Word中添加箭头等框架流程符号

  怎么在Word中添加箭头等框架流程符号?办公过程中经常会使用WORD工具制作文本,初入职场的你还在简单敲一堆又一堆的字儿就上交领导吗?怎样让文本更生动立体呢?下面给大家分享一下,在word文档中添加箭头框架等流程符号的方法,需要的朋友赶紧来看一下吧. 1.首先,打开自己创建的word文档 ,假如现在想插入一个箭头. 2.在上边的菜单栏找到"插入",在子菜单栏找到"形状"随即出来一堆形状,找到""点击. 3.随即会发现鼠标变成了一个十字星,如图

Python Web框架Pylons中使用MongoDB的例子

 这篇文章主要介绍了Python Web框架Pylons中使用MongoDB 的例子,大家参考使用 Pylons 经过漫长的开发,终于放出了 1.0 版本.对于正规的产品开发来说,1.0 版本的意义很大,这表明 Pylons 的 API 终于稳定下来了.   Pylons 虽是山寨 Rails 而生,但作为一个纯 Python 的 Web 框架,它有一个鲜明的特点:可定制性强.框架每一层都没重新发明轮子,而是尽量整合现有的 Python 库.在 MVC 的 Model 层,Pylons 默认支持

yui3在框架设计中的牺牲和让步

相信每个前端工程师都有自己喜爱的javascript框架,说情感也好,道信仰也罢,javascript框架带给人的不仅仅是便捷的开发,更有一种纯粹的逻辑美感,不管是jquery曼妙的简洁,还是yui魔术般的沙箱,都使我们产生无穷的想象.然而,js框架却又必然无法做到面面俱到的完美无瑕,比如jquery在OO方面做出的让步,以及yui在性能上做的牺牲,无不给人传达一种缺憾美.一种理想的现实主义.今天,我们来看看yui3在框架设计中的这些牺牲和让步,以便让我们更加深刻的理解yui3的全貌,并将其优势

MFC框架程序中全屏显示特性的实现

在开发图像显示程序以及视频应用程序时,常常需要全屏显示特性,比如ACD See和豪杰解霸等应用都有全屏显示功能.本文将介绍如何在MFC框架程序中实现视图的全屏显示,也就是说将标题.菜单.工具栏.状态栏以及窗口的所有边框全部被隐藏,视图充满整个屏幕.并提供全屏显示与框架窗口之间的快捷切换操作. 大家知道,在MFC框架中并没有提供现成的类或者函数来实现全屏显示特性,至少我到目前为止是没有发现.但是要实现这个特性也并不难.其基本思路是调整主窗口的大小和位置,使视图的显示充满屏幕.它需要以屏幕左上角为原

.Net框架类库中定时器类的使用技巧

不论在客户端应用程序还是服务器组件(包括窗口服务)定时器通常扮演一个重要的角色.写一个高效的定时器驱动型可管理代码要求对程序流程有一个清晰的理解及掌握.NET线程模型的精妙之处..NET框架类库提供了三种不同的定时器类:System.Windows.Forms.Timer, System.Timers.Timer, 和System.Threading.Timer.每个类为不同的场合进行设计和优化.本文章将研究这三个类并让你理解如何及何时应该使用哪一个类. Microsoft Windows里的定

DeepEarth中的几何图形基础框架模型

众所周知,DeepEarth是一套基于Silverlight的DeepZoom技术实现的开源地图开发组件,其内部提供 了常用的地图开发工具控件和通用的基础模型,如线条.多边形.不规则图形.图片等一系列图形图像的 基础架构模型,这为使用DeepEarth进行地图二次开发提供了非常方便的基础架构接口.本篇将介绍在 DeepEarth中的几何图形架构设计和所提供的基础架构模型框架. 在DeepEarth的几何图形架构中提供了最基础的几何图形应用开发模型,包括几何图层.点.线.多边 形.不规则图形等,在