Java多线程编程简明教程(2) - ForkJoin模式

Fork-Join模式

说起Fork-Join模式,我们不免联想起了Map-Reduce.它们的原理都是分治法,就是将一个大问题划分成若干个小问题,如果这些小问题之间互相不影响的话,就可以并发去执行. 最后,统一将各小问题的结果汇总起来,就是这个大问题的结果.
这个任务最适合处理像一棵树一样的问题.

ForkJoinPool

Fork-Join模式不再是只管一个后台作务,而是有多个任务并发执行. 这时我们前面学到的简单的线程池执行器的功能就显得不足了.这时候JDK 7开始为我们提供了ForkJoinPool.
ForkJoinPool不但自动计算开多大的线程池合适,而且提供了称为工作窃取算法的算法来管理这些任务. 如果有的线程空闲, ForkJoinPool会从其它线程的队列尾中窃取一个任务给空闲线程来运行.而正常的线程是从任务队列头中取任务,二者不会有冲突.

RecusiveTask

如同FutureTask一样,Fork-Join模式也有自己的Task类ForkJoinTask. 不过一般我们都是从ForkJoinTask的子类RecursiveTask来继承. 通过重载RecursiveTask类的compute方法,来实现Fork-Join的逻辑.
在compute方法里, 要实现两件事, 顾名思义, Fork-Join就是要先fork出RecursiveTask对象的子任务,然后将它们join在一起.

Fork-Join模式10分钟速成教程

我们先写个copy二叉树结构的简单任务学习一下如何利用Fork-Join框架来实现功能.

先实现一个最简单的二叉树节点,带左右孩子,一个字符串吧:

public class BinaryTree {
    public static class Node{
        public Node leftChild;
        public Node rightChild;
        public String content;
        public Node(String ct){
            content = ct;
        }
    }

然后实现一个RecursiveTask的子类,重载它的compute方法.

    public static class NodeCopyTask extends RecursiveTask<Node>{
        Node mNode;
        public NodeCopyTask(Node node){
            mNode = node;
        }
        @Override
        protected Node compute() {
            if(mNode==null)
                return null;

下面我们开始实现分叉, 对于左右子树分别fork出一个子任务. 这两个子任务又会分叉出它的的子任务,直至结束.

            NodeCopyTask taskLeft = new NodeCopyTask(mNode.leftChild);
            taskLeft.fork();
            NodeCopyTask taskRight = new NodeCopyTask(mNode.rightChild);
            taskRight.fork();

fork之后, 任务就在后台开始运行了. 这时候我们开始构造我们的左右子树的父节点:

            Node node = new Node(mNode.content);

实际问题中一般不会这么简单.主线任务完成了之后,就是等待子任务交活儿,将它们组装在一起:

            node.leftChild = taskLeft.join();
            node.rightChild = taskRight.join();
            return node;
        }
    }

核心功能实现完了,下面我们写个主函数让它运行起来吧. 先构造一个被复制的对象.

    public static void main(String[] args){
        Node node = new Node("Hello,Fork-Join");
        node.leftChild = new Node("Left");
        node.rightChild = new Node("Right");

下面我们前面介绍的主角之一 - ForkJoinPool粉墨登场. 没什么复杂的设置,直接new一个就好:

        ForkJoinPool forkJoinPool = new ForkJoinPool();

ForkJoinPool有了之后, 再创建一个我们的RecursiveTask的对象, 然后调用ForkJoinPool的submit方法将其提交, 这又是一个Future模式了. 最后我们通过这个FutureTask的get方法获取结果就一切OK了.

        NodeCopyTask task = new NodeCopyTask(node);
        Future<Node> future = forkJoinPool.submit(task);
        try {
            Node node2 = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

整理一下上面的步骤:
1. 实现一个RecursiveTask的子类,重载compute方法实现fork-join逻辑
2. 合理划分任务,调用递归的RecursiveTask子类,fork出每个子任务
3. 通过join方法获取子任务的值,并将它们组合到一起
4. 构造ForkJoinPool线程池
5. 创建第一步的子类的对象,通过Future模式,提交到ForkJoinPool线程中运行
6. 获取Future的值,即可得到Fork-Join的结果.

总结一下,把刚才拆散的代码整合在一起:

public class BinaryTree {
    public static class Node{
        public Node leftChild;
        public Node rightChild;
        public String content;
        public Node(String ct){
            content = ct;
        }
    }

    public static class NodeCopyTask extends RecursiveTask<Node>{
        Node mNode;
        public NodeCopyTask(Node node){
            mNode = node;
        }
        @Override
        protected Node compute() {
            if(mNode==null)
                return null;

            NodeCopyTask taskLeft = new NodeCopyTask(mNode.leftChild);
            taskLeft.fork();
            NodeCopyTask taskRight = new NodeCopyTask(mNode.rightChild);
            taskRight.fork();

            Node node = new Node(mNode.content);
            node.leftChild = taskLeft.join();
            node.rightChild = taskRight.join();
            return node;
        }
    }

    public static void main(String[] args){
        //TODO: construct a real tree
        Node node = new Node("Hello,Fork-Join");
        node.leftChild = new Node("Left");
        node.rightChild = new Node("Right");

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        NodeCopyTask task = new NodeCopyTask(node);
        Future<Node> future = forkJoinPool.submit(task);
        try {
            Node nodeNew = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

注意事项

  1. 现在这个阶段,暂时先不要共享内存,通过复制成不变的对象传递给子任务.返回值也创建新对象,当然可以使用对象池等技术.
  2. 暂时不要使用跨任务的容器,我们还没有经过相关的训练,时机还不成熟.
  3. 暂时不要使用其他的同步机制,我们的知识储备暂时还不够.
  4. 要注意任务中的异常会接收不到,一定在任务中处理好可能出现的异常. 否则发生了异常,在主任务中却收不到,会感到很奇怪.
  5. 注意I/O操作,建议目前阶段在Fork-Join之前将I/O操作提前做好.

尽管有一些限制,但是Fork-Join框架还是给我们带来了很大的便利. 按照Fork-Join设计好的代码,在将来计算核数增加时,会自动给我们的代码获得性能提高.

不变模式

在结束这个快餐教程之前,我们得再次强调一下内存共享的风险. 请初学的同学们一定要重视起来.目前我们还没有学习Java对象模型和容器的安全用法, 所以目前阶段最安全的就是不共享任何状态.
只读的对象是不会引起线程安全问题的.我们所有的跨任务的数据传递,暂时都只传递不变的对象.
这样的限制可能会带来一些不便和一些性能损失.但是,它是线程安全的,对于开发人员是种投入小见效快的好事情. 如果暂时还不能满意你的需求,我们会继续学习,从此开始,没有快餐式的速成教程了,我们要经过一段非常扎实的训练.

Android的特别注意事项

请大家注意,Java中的Fork-Join并没有办法处理Android的UI线程等问题, 如果需要运行在UI线程, 区分主线程和工作线程等, 还请参考上节我们分析AsyncTask中的做法, 该使用Handler的还是要用Handler. 后面我们还会详情说细节.

时间: 2024-11-17 09:09:58

Java多线程编程简明教程(2) - ForkJoin模式的相关文章

Java多线程编程简明教程(1) - Future模式与AsyncTask

Java多线程编程简明教程 缘起 关于多线程编程的教程汗牛充栋了,比如阿里集团内部就有一粟.高铁等大牛的讲座,更不用说有清英同学专门创建了并发编程网站来专注于这件事情.专门讲Java并发开发的书籍也是相当丰富了. 我们举个例子,典型的一本Java多线程开发的教材是这样写的,上来就是介绍如何创建线程,然后再讲线程安全,线程之间如何做同步和通信,接着才是线程池和执行器,最后是线程安全的数据结构. 这样写当然从技术上讲是没问题的,不过问题在于,门槛太高了.假如读者的时间短,只看完创建线程这一章就开始照

C++多任务编程简明教程 (1) - C++的多任务其实很简单

C++多任务编程简明教程 (1) - C++的多任务其实很简单 用库的方式无法实现彻底的线程安全!我们需要C++11 与很多同学交流的时候发现,一想到用C++写多线程,还是想到pthread这样的库的方法实现. 但是,十几年前的研究就证明了,线程安全是无法用库的形式来提供的,有兴趣的同学可以参见原文:http://www.hpl.hp.com/techreports/2004/HPL-2004-209.pdf 解释需要大量的篇幅,作为快餐式的教程,我们只讲结论. 十几年过去了,CPU的乱序执行,

《JAVA多线程编程实战指南》之Two-phase Termination(两阶段终止)模式

本文是<JAVA多线程编程实战指南>的样章,感谢作者授权并发网(ifeve.com)发表此文.感谢demochen整理此文. 5.1Two-phase Termination模式简介 停止线程是一个目标简单而实现却不那么简单的任务.首先,Java没有提供直接的API用于停止线程.此外,停止线程还有一些额外的细节需要考虑,如停止的线程处于阻塞(如等待锁)或者等待状态(等待其他线程),尚有未处理完的任务等. Two-phase Termination模式通过将停止线程这个动作分解为准备阶段和执行阶

Java多线程编程中synchronized线程同步的教程_java

0.关于线程同步 (1)为什么需要同步多线程?线程的同步是指让多个运行的线程在一起良好地协作,达到让多线程按要求合理地占用释放资源.我们采用Java中的同步代码块和同步方法达到这样的目的.比如这样的解决多线程无固定序执行的问题: public class TwoThreadTest { public static void main(String[] args) { Thread th1= new MyThread1(); Thread th2= new MyThread2(); th1.sta

Java多线程编程详解

编程|多线程|详解 一:理解多线程多线程是这样一种机制,它允许在程序中并发执行多个指令流,每个指令流都称为一个线程,彼此间互相独立. 线程又称为轻量级进程,它和进程一样拥有独立的执行控制,由操作系统负责调度,区别在于线程没有独立的存储空间,而是和所属进程中的其它线程共享一个存储空间,这使得线程间的通信远较进程简单.多个线程的执行是并发的,也就是在逻辑上"同时",而不管是否是物理上的"同时".如果系统只有一个CPU,那么真正的"同时"是不可

Java多线程编程实战之不提倡的方法

不提倡使用的方法是为支持向后兼容性而保留的那些方法,它们在以后的版本中可能出现,也可能不出现.Java 多线程支持在版本 1.1 和版本 1.2 中做了重大修订,stop().suspend() 和 resume() 函数已不提倡使用.这些函数在 JVM 中可能引入微妙的错误.虽然函数名可能听起来很诱人,但请抵制诱惑不要使用它们. 调试线程化的程序 在线程化的程序中,可能发生的某些常见而讨厌的情况是死锁.活锁.内存损坏和资源耗尽. 死锁 死锁可能是多线程程序最常见的问题.当一个线程需要一个资源而

JAVA学习(九):JAVA多线程编程

本文详细解释JAVA多线程编程,首先对进程和线程做了区别,其次介绍线程的两种实现方式,即继承Thread类和实现Runnable接口,然后讨论了线程常用的方法和优先级,最后介绍了线程的同步和死锁以及线程的生命周期.   1.进程与线程的区别与联系 进程:是应用程序的运行实例,是应用程序的一次动态执行.进程是由进程控制块.程序段和数据段3部分组成的.进程是操作系统进行资源分配的单位. 线程:是进程中的一个实体,其本身依靠程序进行运行,是程序中的顺序控制流,只能使用分配给程序的资源和环境.线程是被系

详解Java多线程编程中线程的启动、中断或终止操作_java

线程启动: 1.start() 和 run()的区别说明start() : 它的作用是启动一个新线程,新线程会执行相应的run()方法.start()不能被重复调用. run() : run()就和普通的成员方法一样,可以被重复调用.单独调用run()的话,会在当前线程中执行run(),而并不会启动新线程! 下面以代码来进行说明. class MyThread extends Thread{ public void run(){ ... } }; MyThread mythread = new

Java多线程编程中使用Condition类操作锁的方法详解_java

Condition的作用是对锁进行更精确的控制.Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法.不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的:而Condition是需要与"互斥