Java7中的ForkJoin并发框架初探(下)

前两篇文章已经对Fork Join的设计和JDK中源码的简要分析。这篇文章,我们来简单地看看我们在开发中怎么对JDK提供的工具类进行应用,以提高我们的需求处理效率。

Fork Join这东西确实用好了能给我们的任务处理提高效率,也为开发带来方便。但Fork Join不是那么容易用好的,我们先来看几个例子(反例)。

0. 反例错误分析

我们先来看看这篇文章中提供的例子:http://www.iteye.com/topic/643724 (因为是反例,就不提供超链接了,只以普通文本给出URL)

这篇文章是我学习和整理Fork Join时搜索到的一篇文章,其实总的来说这篇文章前面分析得还是比较好的,只是给出的第一个例子(有返回结果的RecursiveTask应用的例子)没有正确地对Fork Join进行应用。为了方便分析,还是贴下这个例子中具体的的代码吧。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

public class Calculator extends RecursiveTask {

  

    private static final int THRESHOLD = 100;

    private int start;

    private int end;

  

    public Calculator(int start, int end) {

        this.start = start;

        this.end = end;

    }

  

    @Override

    protected Integer compute() {

        int sum = 0;

        if((start - end) < THRESHOLD){

            for(int i = start; i< end;i++){

                sum += i;

            }

        }else{

            int middle = (start + end) /2;

            Calculator left = new Calculator(start, middle);

            Calculator right = new Calculator(middle + 1, end);

            left.fork();

            right.fork();

  

            sum = left.join() + right.join();

        }

        return sum;

    }

  

}

我们看到其中一段已经高亮的代码,显示对两个子任务进行fork()调用,即分别提交给当前线程的任务队列,依次加到末尾。紧接着,又按照调用fork()的顺序执行两个子任务对象的join()方法。

其实,这样就有一个问题,在每次迭代中,第一个子任务会被放到线程队列的倒数第二个位置,第二个子任务是最后一个位置。当执行join()调用的时候,由于第一个子任务不在队列尾而不能通过执行ForkJoinWorkerThread的unpushTask()方法取出任务并执行,线程最终只能挂起阻塞,等待通知。而Fork Join本来的做法是想通过子任务的合理划分,避免过多的阻塞情况出现。这样,这个例子中的操作就违背了Fork Join的初衷,每次子任务的迭代,线程都会因为第一个子任务的join()而阻塞,加大了代码运行的成本,提高了资源开销,不利于提高程序性能。

除此之外,这段程序还是不能进入Fork Join的过程,因为还有一个低级错误。看下第15、16行代码的条件,就清楚了。按照逻辑,start必然是比end小的。这将导致所有任务都将以循环累加的方式完成,而不会执行fork()和join()。

由此可见,Fork Join的使用还是要注意对其本身的理解和对开发过程中细节的把握的。我们看下JDK中RecursiveAction和RecursiveTask这两个类。

1. RecursiveAction分析及应用实例

这两个类都是继承了ForkJoinTask,本身给出的实现逻辑并不多不复杂,在JDK的类文件中,它的注释比源码还要多。我们可以看下它的实现代码。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

public abstract class RecursiveAction extends ForkJoinTask<Void> {

    private static final long serialVersionUID = 5232453952276485070L;

  

    protected abstract void compute();

  

    public final Void getRawResult() { return null; }

  

    protected final void setRawResult(Void mustBeNull) { }

  

    protected final boolean exec() {

        compute();

        return true;

    }

}

我们看到其中两个方法是关于处理空返回值的方法。而exec方法则是调用了compute(),这个compute就是我们使用Fork Join时需要自己实现的逻辑。

我们可以看下API中给出的一个最简单最具体的例子:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

class IncrementTask extends RecursiveAction {

   final long[] array; final int lo; final int hi;

   IncrementTask(long[] array, int lo, int hi) {

     this.array = array; this.lo = lo; this.hi = hi;

   }

   protected void compute() {

     if (hi - lo < THRESHOLD) {

       for (int i = lo; i < hi; ++i)

         array[i]++;

     }

     else {

       int mid = (lo + hi) >>> 1;

       invokeAll(new IncrementTask(array, lo, mid),

                 new IncrementTask(array, mid, hi));

     }

   }

 }

大致的逻辑就是,对给定一个特定数组的某段,进行逐个加1的操作。我们看到else中的代码块,显示取一个lo和hi的中间值,此后分割成两个子任务,并进行invokeAll()调用。我们来看下继承自FutureTask的invokeAll()方法实现。很简单:


1

2

3

4

5

public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {

    t2.fork();

    t1.invoke();

    t2.join();

}

对于参数中的两个子任务,对第二个子任务进行fork(),即放入线程对应队列的结尾,然后执行第一个子任务,再调用第二个子任务的join(),实际上就是跳转到第二个子任务,进行执行(当然如果不能执行,就需要阻塞等待了)。

其实invokeAll()是个重载方法,同名的还有另外两个,基本逻辑都是一样的,我们拿出一个通用一点的来看一下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

public static void invokeAll(ForkJoinTask<?>... tasks) {

    Throwable ex = null;

    int last = tasks.length - 1;

    for (int i = last; i >= 0; --i) {

        ForkJoinTask<?> t = tasks[i];

        if (t == null) {

            if (ex == null)

                ex = new NullPointerException();

        }

        else if (i != 0)

            t.fork();

        else if (t.doInvoke() < NORMAL && ex == null)

            ex = t.getException();

    }

    for (int i = 1; i <= last; ++i) {

        ForkJoinTask<?> t = tasks[i];

        if (t != null) {

            if (ex != null)

                t.cancel(false);

            else if (t.doJoin() < NORMAL && ex == null)

                ex = t.getException();

        }

    }

    if (ex != null)

        UNSAFE.throwException(ex);

}

我们发现第一个子任务(i==0的情况)没有进行fork,而是直接执行,其余的统统先调用fork()放入任务队列,之后再逐一join()。其实我们注意到一个要点就是第一个任务不要fork()再join(),也就是上面中例子的错误所在,这样会造成阻塞,而不能充分利用Fork Join的特点,也就不能保证任务执行的性能。

Oracle的JavaSE7 API中在RecursiveAction里还有一个更复杂的例子,是计算double数组平方和的,由于代码较长,就不列在这里了。总体思路和上面是一样的,额外增加了动态阈值的判断,感兴趣的想深入理解的可以到这里去参考一下。

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/RecursiveAction.html

2. RecursiveTask简要说明

其实说完了RecursiveAction,RecursiveTask可以用“同理”来解释。实现代码也很简单:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {

    private static final long serialVersionUID = 5232453952276485270L;

  

    V result;

  

    protected abstract V compute();

  

    public final V getRawResult() {

        return result;

    }

  

    protected final void setRawResult(V value) {

        result = value;

    }

  

    protected final boolean exec() {

        result = compute();

        return true;

    }

  

}

我们看到唯一不同的是返回结果的处理,其余都可以和RecursiveAction一样使用。

3. Fork Join应用小结

Fork Join是为我们提供了一个非常好的“分而治之”思想的实现平台,并且在一定程度上实现了“变串行并发为并行”。但Fork Join不是万能的页不完全是通用的,对于可很好分解成子任务的场景,我们可以对其进行应用,更多时候要考虑需

特别说明:尊重作者的劳动成果,转载请注明出处哦~~~http://blog.yemou.net/article/query/info/tytfjhfascvhzxcytp86

时间: 2024-09-17 03:15:52

Java7中的ForkJoin并发框架初探(下)的相关文章

Java7中的ForkJoin并发框架初探(上)

这篇我们来简要了解一下JavaSE7中提供的一个新特性 -- Fork Join 框架. 0. 处理器发展和需求背景 回想一下并发开发的初衷,其实可以说是有两点,或者说可以从两个方面看. 对于单核的处理器来说,在进行IO操作等比较费时的操作进行时,如果执行任务的方式是单任务的,那么CPU将会"空转",知道IO操作结束.如果有多任务的调度机制,则在一个任务不需要CPU支持的时候,CPU可以被调度处理其他任务.简单地讲,并发可以提高CPU计算资源的利用率. 对于多核,或者多个计算资源的情况

Java7中的ForkJoin并发框架初探(中)

根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现.在Java SE 7的API中,多了ForkJoinTask.ForkJoinPool.ForkJoinWorkerThread.RecursiveAction.RecursiveTask这样5个类.本文就对JDK1.7中增加这5个工具类实现做简要分析. 0. JDK中ForkJoin实现概述 在JavaSE7的API和JDK1.7中,分别集成了支持ForkJoin的五个类: ForkJoinPool

无锁并发框架Disruptor

概述 在逛并发编程网的时候,看到了并发框架Disruptor译文这个系列文章. Martin Fowler在自己网站上写了一篇LMAX架构(译文)的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单.业务逻辑处理器完全是运行在内存中,使用事件源驱动方式.业务逻辑处理器的核心是Disruptor. Disruptor它是一个开源的并发框架,能够在无锁的情况下实现网络

J.U.C并发框架

 J.U.C并发框架 作者:Doug Lea SUNY Oswego Oswego NY 13126 dl@cs.oswego.edu 翻译:书卷多情 在J2SE1.5中,java.util.concurrent包下的大部分同步工具(锁.屏障等)以AbstractQueuedSynchronizer类为基础来构建.这个框架提供了一些常用机制用于自动管理并发状态.阻塞及非阻塞线程,以及队列.本论文描述了该框架的根源.设计.实现.用法及性能. 关键字:synchronized, java 1.介绍

并发框架Disruptor译文

Martin Fowler在自己网站上写了一篇LMAX架构的 文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在JVM平台上,其核心是一个业务逻辑处理 器,它能够在一个线程里每秒处理6百万订单.业务逻辑处理器完全是运行在内存中,使用事件源驱动方式.业务逻辑处理器的核心是Disruptor. Disruptor它是一个开源的并发框架,并获得2011 Duke's 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作.本文是Disru

分享ppt: java7里的fork-join

以前分享的ppt,介绍了java7里的fork-join框架: 从slideshare下载,或从微盘下载 work-stealing在很多框架里都出现过,从两张图能大致看明白: 文章转自 并发编程网-ifeve.com

C#开发微信门户及应用(48) - 在微信框架中整合CacheManager 缓存框架

 在我们的很多框架或者项目应用中,缓存在一定程度上可以提高程序的响应速度,以及减轻服务器的承载压力,因此在一些地方我们都考虑引入缓存模块,这篇随笔介绍使用开源缓存框架CacheManager来实现数据的缓存,在微信开发框架中,我们有一些常用的处理也需要应用到缓存,因此本随笔以微信框架为例介绍缓存的实际使用,实际上,在我们很多框架中,如混合式开发框架.Web开发框架.Bootstrap开发框架中,这个模块都是通用的. 1.框架的缓存设计 在我们的微信开发框架中,缓存作为数据库和对外接口之间的一个分

[Qt教程] 第20篇 2D绘图(十)图形视图框架(下)

[Qt教程] 第20篇 2D绘图(十)图形视图框架(下) 楼主  发表于 2013-5-4 15:43:02 | 查看: 861| 回复: 0 图形视图框架(下) 版权声明 该文章原创于Qter开源社区(www.qter.org),作者yafeilinux,转载请注明出处! 导语 环境:Windows Xp + Qt 4.8.4+QtCreator 2.6.2 目录 三.场景(QGraphicsScene) (一)场景层 (二)索引算法 (三)边界矩形 (四)图形项查找 (五)事件处理和传播 (

在Struts中使用Validator验证框架详解

Validatro框架以成为Jakarta的公共项目的一部分,可以从http://jakarta.apache.org/commons/下载单独的Validator框架,在Struts中已经呆了这个框架. Validator主要依赖两个jar包 Jakarta-oro.jar:-提供一组处理文本的类,具有文本替换.过滤.和分割功能. Commons-validator.jar:提供了一个简单.可扩展的验证框架,包含了通用的验证方法和验证规则. 在用Struts中用这个框架,需加入这两个包,用起来