Java Worker 设计模式

Worker模式

想解决的问题

异步执行一些任务,有返回或无返回结果

使用动机

有些时候想执行一些异步任务,如异步网络通信、daemon任务,但又不想去管理这任务的生命周。这个时候可以使用Worker模式,它会帮您管理与执行任务,并能非常方便地获取结果

结构

很多人可能为觉得这与executor很像,但executor是多线程的,它的作用更像是一个规划中心。而Worker则只是个搬运工,它自己本身只有一个线程的。每个worker有自己的任务处理逻辑,为了实现这个目的,有两种方式

1. 建立一个抽象的AbstractWorker,不同逻辑的worker对其进行不同的实现;

2. 对worker新增一个TaskProcessor不同的任务传入不同的processor即可。

第二种方式worker的角色可以很方便地改变,而且可以随时更换processor,可以理解成可”刷机”的worker
^ ^。这里我们使用第二种方式来介绍此模式的整体结构。

详细介绍一下几个角色:

  • ConfigurableWorker:顾名思义这个就是真正干活的worker了。要实现自我生命周期管理,需要实现Runable,这样其才能以单独的线程运行,需要注意的是work最好以daemon线程的方式运行。worker里面还包括几个其它成员:taskQueue,一个阻塞性质的queue,一般BlockingArrayList就可以了,这样任务是FIFO(先进先出)的,如果要考虑任务的优先级,则可以考虑使用PriorityBlockingQueue;listeners,根据事件进行划分的事件监听者,以便于当一个任务完成的时候进行处理,需要注意的是,为了较高效地进行listener遍历,这里我推荐使用CopyOnWriteArrayList,免得每次都复制。其对应的方法有addlistener、addTask等配套方法,这个都不多说了,更详细的可以看后面的示例代码。
  • WorkerTask:实际上这是一个抽象的工内容,其包括基本的id与,task的ID是Worker生成的,相当于递wtte后的一个执回,当数据执行完了的时候需要使用这个id来取结果。而后面真正实现的实体task则包含任务处理时需要的数据。
  • Processor:为了实现可”刷机”的worker,我们将处理逻辑与worker分开来,processor的本职工作很简单,只需要加工传入的task数据即可,加工完成后触发fireEvent(WorkerEvent.TASK_COMPLETE)事件,之后通过Future的get即可得到最终的数据。

另外再说一点,对于addTask,可以有一个overload的方法,即在输入task的同时,传入一个RejectPolice,这样可以在size过大的时候做出拒绝操作,有效避免被撑死。

适用性/问题

这种设计能自动处理任务,并能根据任务的优先级自动调节任务的执行顺序,一个完全独立的thread,你完全可以将其理解成一专门负责干某种活的”机器人”。它可以用于处理一些定时、请求量固定均匀且对实时性要求不是太高的任务,如日志记录,数据分析等。当然,如果想提高任务处理的数据,可以生成多个worker,就相当于雇佣更多的人来为你干活,非常直观的。当然这样一来,谁来维护这worker便成了一个问题,另外就目前这种设计下worker之间是没有通信与协同的,这些都是改进点。

那么对于多个worker,有什么组织方式呢?这里我介绍三种,算是抛砖引玉:

流水线式worker(assembly-line worker)

就像生产车间上的流水线工人一样,将任务切分成几个小块,每个worker负责自己的一部分,以提高整体的生产、产出效率,如下图:

假设完成任务 t 需要的时间为:W(t)=n,那么将任务分解成m份,流水线式的执行,每小份需要的时间便为 W(t/m)=n/m,那么执行1000条任务的时间,单个为1000n,流水线长度为L,则用这种方式所用的时间为(1000-1)*(m-L+1)*n/m+n
其中L<m,由此可见,流水线的worker越多、任务越细分,工作的效率将越高。这种主方式的问题在于,如果一个worker出现问题,那么整个流水线就将停止工作。而且任务的优先级不能动态调用,必须事先告知。

多级反馈队列(Multilevel Feedback Queue)

这是一个有Q1、Q2...Qn个多重流水线方式,从高到低分别代码不同的优先级,高优先级的worker要多于低优先级的,一般是2的倍数,即Q4有16个worker、Q3有8个,后面类推。任务根据预先估计好的优先级进入,如果任务在某步的执行过长,直接踢到下一级,让出最快的资源。

显然这种方式的好处就在于可以动态地调整任务的优级,及时做出反应。当然,为了实现更好的高度,我们可以在低级里增加一个阀值,使得放偶然放入低级的task可以有复活的机会^
^。

MapReduce式

流水线虽然有一定的并行性,但总体来说仍然是串行的,因为只要有一个节点出了问题,那都是致命的错误。MapReduce是Google率先实现的一个分布式算法,有非常好的并行执行效率。

只要我们将Map与Reduce都改成Worker就行了,如MapWorker与ReduceWorker。这样,可以看见,Map的过程是完全并行的,当然这样就需要在Map与Reduce上的分配与数据组合上稍稍下一点功夫了。

样例实现

这里我们实现一个PageURLMiningWorker,对给定的URL,打开页面后,采取所有的URL,并反回结果进行汇总输出。由于时间有限,这里我只实现了单worker与MapReduce worker集两种方式,有兴趣的同学可以实现其它类型,如多级反馈队列。注意!我这里只是向大家展示这种设计模式,URL
抓取的效率不在本次考虑之列。

所有的代码可以在这里获取:https://github.com/sefler1987/javaworker

单Worker实现样例

  1. 
    
    package com.alibaba.taobao.main;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentSkipListSet;
    import java.util.concurrent.TimeUnit;
    
    import com.alibaba.taobao.worker.ConfigurableWorker;
    import com.alibaba.taobao.worker.SimpleURLComparator;
    import com.alibaba.taobao.worker.WorkerEvent;
    import com.alibaba.taobao.worker.WorkerListener;
    import com.alibaba.taobao.worker.WorkerTask;
    import com.alibaba.taobao.worker.linear.PageURLMiningProcessor;
    import com.alibaba.taobao.worker.linear.PageURLMiningTask;
    
    /**
     * Linear version of page URL mining. It's slow but simple.
     * Average time cost for 1000 URLs is: 3800ms
     *
     * @author xuanyin.zy E-mail:xuanyin.zy@taobao.com
     * @since Sep 16, 2012 5:35:40 PM
     */
    public class LinearURLMiningMain implements WorkerListener {
        private static final String EMPTY_STRING = "";
    
        private static final int URL_SIZE_TO_MINE = 10000;
    
        private static ConcurrentHashMap<String, WorkerTask<?>> taskID2TaskMap = new ConcurrentHashMap<String, WorkerTask<?>>();
    
        private static ConcurrentSkipListSet<String> foundURLs = new ConcurrentSkipListSet<String>(new SimpleURLComparator());
    
        public static void main(String[] args) throws InterruptedException {
            long startTime = System.currentTimeMillis();
    
            ConfigurableWorker worker = new ConfigurableWorker("W001");
            worker.setTaskProcessor(new PageURLMiningProcessor());
    
            addTask2Worker(worker, new PageURLMiningTask("http://www.taobao.com"));
            addTask2Worker(worker, new PageURLMiningTask("http://www.xinhuanet.com"));
            addTask2Worker(worker, new PageURLMiningTask("http://www.zol.com.cn"));
            addTask2Worker(worker, new PageURLMiningTask("http://www.163.com"));
    
            LinearURLMiningMain mainListener = new LinearURLMiningMain();
            worker.addListener(mainListener);
    
            worker.start();
    
            String targetURL = EMPTY_STRING;
            while (foundURLs.size() < URL_SIZE_TO_MINE) {
                targetURL = foundURLs.pollFirst();
    
                if (targetURL == null) {
                    TimeUnit.MILLISECONDS.sleep(50);
                    continue;
                }
    
                PageURLMiningTask task = new PageURLMiningTask(targetURL);
                taskID2TaskMap.putIfAbsent(worker.addTask(task), task);
    
                TimeUnit.MILLISECONDS.sleep(100);
            }
    
            worker.stop();
    
            for (String string : foundURLs) {
                System.out.println(string);
            }
    
            System.out.println("Time Cost: " + (System.currentTimeMillis() - startTime) + "ms");
        }
    
        private static void addTask2Worker(ConfigurableWorker mapWorker_1, PageURLMiningTask task) {
            String taskID = mapWorker_1.addTask(task);
            taskID2TaskMap.put(taskID, task);
        }
    
        @Override
        public List<WorkerEvent> intrests() {
            return Arrays.asList(WorkerEvent.TASK_COMPLETE, WorkerEvent.TASK_FAILED);
        }
    
        @Override
        public void onEvent(WorkerEvent event, Object... args) {
            if (WorkerEvent.TASK_FAILED == event) {
                System.err.println("Error while extracting URLs");
                return;
            }
    
            if (WorkerEvent.TASK_COMPLETE != event)
                return;
    
            PageURLMiningTask task = (PageURLMiningTask) args[0];
            if (!taskID2TaskMap.containsKey(task.getTaskID()))
                return;
    
            foundURLs.addAll(task.getMinedURLs());
    
            System.out.println("Found URL size: " + foundURLs.size());
    
            taskID2TaskMap.remove(task.getTaskID());
        }
    }
    

MapReduce实现样例

  1. 
    
    package com.alibaba.taobao.main;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentSkipListSet;
    import java.util.concurrent.TimeUnit;
    
    import com.alibaba.taobao.worker.ConfigurableWorker;
    import com.alibaba.taobao.worker.SimpleURLComparator;
    import com.alibaba.taobao.worker.WorkerEvent;
    import com.alibaba.taobao.worker.WorkerListener;
    import com.alibaba.taobao.worker.WorkerTask;
    import com.alibaba.taobao.worker.mapreduce.Map2ReduceConnector;
    import com.alibaba.taobao.worker.mapreduce.MapReducePageURLMiningTask;
    import com.alibaba.taobao.worker.mapreduce.PageContentFetchProcessor;
    import com.alibaba.taobao.worker.mapreduce.URLMatchingProcessor;
    
    /**
     * MapReduce version of page URL mining. It's very powerful.
     *
     * @author xuanyin.zy E-mail:xuanyin.zy@taobao.com
     * @since Sep 16, 2012 5:35:40 PM
     */
    public class MapReduceURLMiningMain implements WorkerListener {
        private static final String EMPTY_STRING = "";
    
        private static final int URL_SIZE_TO_MINE = 10000;
    
        private static ConcurrentHashMap<String, WorkerTask<?>> taskID2TaskMap = new ConcurrentHashMap<String, WorkerTask<?>>();
    
        private static ConcurrentSkipListSet<String> foundURLs = new ConcurrentSkipListSet<String>(new SimpleURLComparator());
    
        public static void main(String[] args) throws InterruptedException {
            long startTime = System.currentTimeMillis();
    
            // four mapers
            List<ConfigurableWorker> mappers = new ArrayList<ConfigurableWorker>(4);
    
            ConfigurableWorker mapWorker_1 = new ConfigurableWorker("W_M1");
            ConfigurableWorker mapWorker_2 = new ConfigurableWorker("W_M2");
            ConfigurableWorker mapWorker_3 = new ConfigurableWorker("W_M3");
            ConfigurableWorker mapWorker_4 = new ConfigurableWorker("W_M4");
            mapWorker_1.setTaskProcessor(new PageContentFetchProcessor());
            mapWorker_2.setTaskProcessor(new PageContentFetchProcessor());
            mapWorker_3.setTaskProcessor(new PageContentFetchProcessor());
            mapWorker_4.setTaskProcessor(new PageContentFetchProcessor());
    
            mappers.add(mapWorker_1);
            mappers.add(mapWorker_2);
            mappers.add(mapWorker_3);
            mappers.add(mapWorker_4);
    
            // one reducer
            ConfigurableWorker reduceWorker_1 = new ConfigurableWorker("W_R1");
            reduceWorker_1.setTaskProcessor(new URLMatchingProcessor());
    
            // bind reducer to final result class
            MapReduceURLMiningMain main = new MapReduceURLMiningMain();
            reduceWorker_1.addListener(main);
    
            // initiate tasks
            addTask2Worker(mapWorker_1, new MapReducePageURLMiningTask("http://www.taobao.com"));
            addTask2Worker(mapWorker_2, new MapReducePageURLMiningTask("http://www.xinhuanet.com"));
            addTask2Worker(mapWorker_3, new MapReducePageURLMiningTask("http://www.zol.com.cn"));
            addTask2Worker(mapWorker_4, new MapReducePageURLMiningTask("http://www.sina.com.cn/"));
    
            // bind mapper to reduer
            Map2ReduceConnector connector = new Map2ReduceConnector(Arrays.asList(reduceWorker_1));
            mapWorker_1.addListener(connector);
            mapWorker_2.addListener(connector);
            mapWorker_3.addListener(connector);
            mapWorker_4.addListener(connector);
    
            // start all
            mapWorker_1.start();
            mapWorker_2.start();
            mapWorker_3.start();
            mapWorker_4.start();
            reduceWorker_1.start();
    
            String targetURL = EMPTY_STRING;
            int lastIndex = 0;
            while (foundURLs.size() < URL_SIZE_TO_MINE) {
                targetURL = foundURLs.pollFirst();
    
                if (targetURL == null) {
                    TimeUnit.MILLISECONDS.sleep(50);
                    continue;
                }
    
                lastIndex = ++lastIndex % mappers.size();
                MapReducePageURLMiningTask task = new MapReducePageURLMiningTask(targetURL);
                taskID2TaskMap.putIfAbsent(mappers.get(lastIndex).addTask(task), task);
    
                TimeUnit.MILLISECONDS.sleep(100);
            }
    
            // stop all
            mapWorker_1.stop();
            mapWorker_2.stop();
            mapWorker_3.stop();
            mapWorker_4.stop();
            reduceWorker_1.stop();
    
            for (String string : foundURLs) {
                System.out.println(string);
            }
    
            System.out.println("Time Cost: " + (System.currentTimeMillis() - startTime) + "ms");
        }
    
        private static void addTask2Worker(ConfigurableWorker mapWorker_1, MapReducePageURLMiningTask task) {
            String taskID = mapWorker_1.addTask(task);
            taskID2TaskMap.put(taskID, task);
        }
    
        @Override
        public List<WorkerEvent> intrests() {
            return Arrays.asList(WorkerEvent.TASK_COMPLETE, WorkerEvent.TASK_FAILED);
        }
    
        @Override
        public void onEvent(WorkerEvent event, Object... args) {
            if (WorkerEvent.TASK_FAILED == event) {
                System.err.println("Error while extracting URLs");
                return;
            }
    
            if (WorkerEvent.TASK_COMPLETE != event)
                return;
    
            MapReducePageURLMiningTask task = (MapReducePageURLMiningTask) args[0];
            if (!taskID2TaskMap.containsKey(task.getTaskID()))
                return;
    
            foundURLs.addAll(task.getMinedURLs());
    
            System.out.println("Found URL size: " + foundURLs.size());
    
            taskID2TaskMap.remove(task.getTaskID());
        }
    }
    

结果对比

Y轴为抓取X轴URL个数所用的时间

总结

我们可以看到,worker模式组合是非常灵活的,它真的就像一个活生生的工人,任你调配。使用worker,我们可以更方便地实现更复杂的结构。

时间: 2024-12-12 12:00:39

Java Worker 设计模式的相关文章

Java运用设计模式中的建造者模式构建项目的实例解析_java

1.建造者模式概念定义: 将一个复杂的对象构建与其表示相分离,使得同样的构建过程可以创建不同的表示: 核心 : 构建与表示分离,同构建不同表示 区别于 抽象工厂模式 : (1)与抽象工厂模式 相似,因为它也可以创建复杂对象.主要的区别是建造者模式着重于 一步步构造一个复杂对象,关注的是零件类型和装配工艺的顺序 .而抽象工厂模式着重于多个系列的产品对象(简单的或是复杂的).建造者模式在最后的一步返回产品,而对于抽象工厂来说,产品是立即返回的. (2)在建造者模式里,有个指导者,由指导者来管理建造者

Java经典设计模式之十一种行为型模式(附实例和详解)

版权声明:本文为博主原创文章,转载注明出处http://blog.csdn.net/u013142781 目录(?)[+] Java经典设计模式共有21中,分为三大类:创建型模式(5种).结构型模式(7种)和行为型模式(11种). 本文主要讲行为型模式,创建型模式和结构型模式可以看博主的另外两篇文章:Java经典设计模式之五大创建型模式(附实例和详解). Java经典设计模式之七大结构型模式(附实例和详解). 行为型模式细分为如下11种:策略模式.模板方法模式.观察者模式.迭代子模式.责任链模式

Java经典设计模式之七大结构型模式(附实例和详解)

版权声明:本文为博主原创文章,转载注明出处http://blog.csdn.net/u013142781 目录(?)[+] 博主在大三的时候有上过设计模式这一门课,但是当时很多都基本没有听懂,重点是也没有细听,因为觉得没什么卵用,硬是要搞那么复杂干嘛.因此设计模式建议工作半年以上的猿友阅读起来才会理解的比较深刻.当然,你没事做看看也是没有坏处的. 总体来说设计模式分为三大类:创建型模式.结构型模式和行为型模式. 博主的上一篇文章已经提到过创建型模式,此外该文章还有设计模式概况和设计模式的六大原则

Java使用设计模式中的工厂方法模式实例解析_java

工厂方法模式的定义工厂方法(Factory Method)模式的意义是定义一个创建产品对象的工厂接口,将实际创建工作推迟到子类当中.核心工厂类不再负责产品的创建,这样核心类成为一个抽象工厂角色,仅负责具体工厂子类必须实现的接口,这样进一步抽象化的好处是使得工厂方法模式可以使系统在不修改具体工厂角色的情况下引进新的产品. 它包含了如下角色: 抽象产品(Product) 具体产品(ConcreteProduct) 抽象工厂(Factory) 具体工厂(ConcreteFactory) 模式的UML类

讲故事,学(Java)设计模式—桥接模式

讲故事,学(Java)设计模式-桥接模式 2013/11/09 | 分类: 基础技术 | 0 条评论 | 标签: Java, 设计模式 分享到:12 本文由 ImportNew - 陈雅峰 翻译自 programcreek.欢迎加入翻译小组.转载请见文末要求. 本文由 @胡试之 校稿.如果你也希望参与类似的系列文章翻译,可以加入我们的Java开发 和 技术翻译 小组. 简单来讲,桥接模式是一个两层的抽象. 桥接模式是用于"把抽象和实现分开,这样它们就能独立变化". 桥接模式使用了封装.

解析Java的设计模式编程之解释器模式的运用_java

定义:给定一种语言,定义他的文法的一种表示,并定义一个解释器,该解释器使用该表示来解释语言中句子.类型:行为类模式类图: 解释器模式是一个比较少用的模式,本人之前也没有用过这个模式.下面我们就来一起看一下解释器模式.  解释器模式的结构抽象解释器:声明一个所有具体表达式都要实现的抽象接口(或者抽象类),接口中主要是一个interpret()方法,称为解释操作.具体解释任务由它的各个实现类来完成,具体的解释器分别由终结符解释器TerminalExpression和非终结符解释器Nontermina

Java观察者设计模式详解_java

   观察者模式(有时又被称为发布(publish )-订阅(Subscribe)模式.模型-视图(View)模式.源-收听者(Listener)模式或从属者模式)是软件设计模式的一种.在此种模式中,一个目标物件管理所有相依于它的观察者物件,并且在它本身的状态改变时主动发出通知.这通常透过呼叫各观察者所提供的方法来实现.此种模式通常被用来实现事件处理系统.   观察者模式(Observer)完美的将观察者和被观察的对象分离开.举个例子,用户界面可以作为一个观察者,业务数据是被观察者,用户界面观察

详解Java的设计模式编程中的原型模式_java

定义:用原型实例指定创建对象的种类,并通过拷贝这些原型创建新的对象.类型:创建类模式类图: 原型模式主要用于对象的复制,它的核心是就是类图中的原型类Prototype.Prototype类需要具备以下两个条件: 实现Cloneable接口.在java语言有一个Cloneable接口,它的作用只有一个,就是在运行时通知虚拟机可以安全地在实现了此接口的类上使用clone方法.在java虚拟机中,只有实现了这个接口的类才可以被拷贝,否则在运行时会抛出CloneNotSupportedException

Java命令设计模式详解_java

将来自客户端的请求传入一个对象,从而使你可用不同的请求对客户进行参数化.用于"行为请求者"与"行为实现者"解耦,可实现二者之间的松耦合,以便适应变化.分离变化与不变的因素. 一.角色Command 定义命令的接口,声明执行的方法.ConcreteCommand 命令接口实现对象,是"虚"的实现:通常会持有接收者,并调用接收者的功能来完成命令要执行的操作.Receiver 接收者,真正执行命令的对象.任何类都可能成为一个接收者,只要它能够实现命令要