工作者引擎 IWorkerEngine -- ESBasic 可复用的.NET类库(05)

1.缘起:

    假设我们的系统在运行的过程中,源源不断的有新的任务需要处理(比如订单处理),而且这些任务的处理是相互独立的,没有前后顺序依赖性(顺序依赖性是指,必须在任务A处理结束后才可开始B任务),那么我们就可以使用多个线程来同时处理多个任务。每个处理任务的线程称为“工作者(线程)”。
      我设计了ESBasic.Threading.Engines.IWorkerEngine工作者引擎,其目的就是使用多个线程来并行处理任务,提高系统的吞吐能力。

      工作者引擎的形象示意图如下:
      
 

2.适用场合:

设计工作者引擎ESBasic.Threading.Engines.IWorkerEngine的主要目的是为了解决类似下面的问题:

(1)充分利用多CPU、多核计算资源。

(2)减少因高速设备与低速设备之间速度差而产生计算资源浪费。

(3)对于突发的大批量的任务(比如订单系统经常在其它时段接受的订单很少,但在某高峰期会有突发性的大量的订单进来)进行缓冲处理,并最大限度地利用现有资源进行处理。

 

3.设计思想与实现

       IWorkerEngine的设计思路是这样的:我们使用一个队列来存放需要处理的任务,新来的任务都会排队到这个队列中,然后有N个工作者线程不断地从队列中取出任务去处理,每个线程处理完当前任务后,又从队列中取出下一个任务……,如此循环。

       IWorkerEngine接口的源码对应如下:    

 

    public interface IWorkerEngine<T>
    {
        /// <summary>
        /// IdleSpanInMSecs 当没有工作要处理时,工作者线程休息的时间间隔。默认为10ms
        /// </summary>
        int IdleSpanInMSecs { get;set; }

        /// <summary>
        /// WorkerThreadCount 工作者线程的数量。默认值为1。
        /// </summary>
        int WorkerThreadCount { get; set; }

        /// <summary>
        /// WorkProcesser 用于处理任务的处理器。
        /// </summary>
        IWorkProcesser<T> WorkProcesser { set; }
        
        /// <summary>
        /// WorkCount 当前任务队列中的任务数。
        /// </summary>
        int WorkCount { get; }

        /// <summary>
        /// MaxWaitWorkCount 历史中最大的处于等待状态的任务数量。
        /// </summary>
        int MaxWaitWorkCount { get; }

        void Initialize();
        void Start();
        void Stop();

        /// <summary>
        /// AddWork 添加任务。
        /// </summary>       
        void AddWork(T work); 
    }

   

由于任务的类型不是固定的,所以我们使用的泛型参数T来表示要处理任务的类型。

所有的任务的具体执行都是由IWorkProcesser完成的:    

 

    public interface IWorkProcesser<T>
    {
        void Process(T work);
    }

      实现这个IWorkerEngine接口的时候要注意以下几点:

(1)AddWork方法会在多线程的环境中被调用,所以必须保证其是线程安全的。

(2)每个工作者线程实际上就是一个我们前面介绍的循环引擎ICycleEngine,只不过将其DetectSpanInSecs设为0即可,表示不间断地执行任务。WorkerEngine便是使用了N个AgileCycleEngine实例来作为工作者的。这些AgileCycleEngine实例在Initialize方法中被实例化。

(3)所有的工作者最终都是执行私有的DoWork方法,这个方法就是从任务队列中取出任务并且调用IWorkProcesser来处理任务,如果任务队列为空,则等待IdleSpanInMSecs秒钟后再重试。

(4)MaxWaitWorkCount属性用于记录自从引擎运行以来最大的等待任务的数量,通过这个属性我们可以推测任务量与任务处理速度之间的差距。

(5)通过Start、Stop方法我们可以随时停止、启动工作者引擎,并可重复调用。

4. 使用时的注意事项

(1)     当引擎已经启动并正在运行时,如果要修改WorkerThreadCount的值并使其生效,则必须先调用Stop方法停止引擎,然后重新调用Initialize方法初始化引擎,再调用Start方法启动引擎。

(2)     关于工作者线程的个数N的设置的问题。这个数字不是越大越好,因为使用的线程越多,而CPU跟不上的话,那么消耗在线程切换上的浪费就越严重。所以,为了达到最好的性能,需要为工作者线程个数设置一个合适的值。
通常,这个值跟CPU的个数、CPU核的个数、任务的复杂度、慢速设备与快速设备之间的速度差以及它们的吞吐量有关。我们可以通过足够的测试来发现适合我们系统的N值。

      一般情况下的推荐值为:CPU个数*单个CPU的核数*2 + 1。
 

5.扩展

(1)“一次性”的工作者引擎:BriefWorkerEngine

    假设我们的系统可能会偶尔有一批任务要处理(也许永远也不会有这样的任务出现),我们希望只有当任务到来时,才使用一个工作者引擎实例来多线程处理它,处理完后,该引擎就可以释放掉。

      ESBasic.Threading.Engines.BriefWorkerEngine,精简的工作者引擎,便是为这一目的而设计的。它使用多线程处理一批任务,当这批任务处理结束后,工作者线程会被自动释放,而该引擎实例也就可以被结束了。

    为了方便使用,我将BriefWorkerEngine设计为从构造函数注入引擎运行所需要的参数,包括任务处理器、工作者线程个数、以及要处理的任务集合。在引擎实例被构造成功的同时,内部的循环引擎已经准备好了。注意,BriefWorkerEngine实现了IDisposable接口,这表明当引擎被释放时,内部所有的循环引擎都会停止运行,从而不再占有后台线程池中的线程。

我们可以这样来使用BriefWorkerEngine:

 

            IWorkProcesser<MyTask> processer = ...;
            IList<MyTask> taskList = ...;
            BriefWorkerEngine<MyTask> engine = new BriefWorkerEngine<MyTask>(processer, 5, taskList);
            engine.Start();
            while (!engine.IsFinished())
            {
                System.Threading.Thread.Sleep(100);
            }
            engine.Dispose();
            //执行到这里,表示所有任务已经处理完毕,引擎实例即将被释放。

       我们可以通过它的IsFinished方法来检测执行是否已经完成。当IsFinished方法返回true时,引擎实例就可以被销毁了。

(2)永不停止的工作者引擎

我们同样可以考虑一个类似于循环引擎的扩展的情况,假设我们的系统要求在启动时就将工作者引擎运行起来,而且在整个运行的生命周期中,都不需要停止引擎,那么我们就不想将Start方法、Stop方法暴露出来以免意外的调用Stop方法而导致引擎停止运行,那这个时候我们可以使用相同的技巧来做到:

 

    public sealed class MyWorkerEngine
    {
        private IWorkerEngine<MyTask> workerEngine;

        public void Initialize()
        {
            this.workerEngine = new WorkerEngine<MyTask>();
            this.workerEngine.WorkerThreadCount = 5;
            //this.workerEngine.WorkProcesser = .. 赋值
            this.workerEngine.Initialize();
            this.workerEngine.Start();
        }
    }

    public class MyTask   {    }

其道理与循环引擎的扩展是一样的。

注: ESBasic已经开源,点击这里下载源码。
    ESBasic开源前言

时间: 2024-10-27 13:54:14

工作者引擎 IWorkerEngine -- ESBasic 可复用的.NET类库(05)的相关文章

循环引擎 ICycleEngine --ESBasic 可复用的.NET类库(04)

 1.缘起: 有些系统需要每隔一段时间就执行一下某个动作,比如,一个监控系统每隔10秒钟就要检测一下被监控对象的状态是否正常,那这时我们就可以用到循环引擎了.     有人说可以使用.NET框架自带定时器如System.Threading.Timer,嗯,没错.但是若这个类使用不当可能会引发后台池线程耗尽的后果.因为Timer的定时事件触发实在后台线程池中的某个线程中处理的.也就是说Timer的每次定时事件触发都会用到一个线程,如果定时的时间间隔小于事件处理的时间,则后台线程池中将会有越来越多的

Round缓存管理器RoundCacheManager--ESBasic 可复用的.NET类库(26)

1.缘起:     在增量自动获取器章节的缘起部分,我们曾提到增量缓存,本节我们将深入探讨它以及用于管理增量缓存的管理器.我们还是以增量自动获取器章节提到的例子作为基础,并做更进一步的讨论.       OK,现在让我们开始这有趣的旅程. 首先,基于前面例子给出的上下文,我们知道IIncreaseAutoRetriever获取的增量是用于累积当天的已成交订单报表的."当天已成交报表"就是一个典型的增量缓存,每当有新的增量到来,都会累加到上面. 我们假设今天是2009.07.08,那么我

循环任务切换器 CircleTaskSwitcher -- ESBasic 可复用的.NET类库(06)

 1.缘起:     假设我的订单处理系统有这样的需求:将一天24小时分为4个时段,凌晨2:15到8:30采用A类型的处理器处理接收到的订单,8:30到14:00采用B类型的处理器,14:00到20:00采用C类型的处理器,20:00到第二天凌晨2:15采用D类型的处理器.     即我们的订单处理器需要在任一天的2:15.8:30.14:00.20:00这四个时刻发生切换,这就是一个循环切换器所要做的工作.     我设计了ESBasic.Threading.Application. ICir

ESBasic 可复用的.NET类库(00) -- 开源前言(附下载)

自从03年正式使用.NET开发以来,已经走过了6个年头,这期间我积累了几套类库和框架,ESBasic便是其中最基础的一个类库.ESBasic是Enterprise Service Basic的缩写,虽然也简写为ESB,但是它和Enterprise Service Bus(企业服务总线)没有任何关系.ESBasic是我能够快速和高效开发应用程序的利器之一,开这个专门的blog是想将它介绍给大家,希望能对大家有所启发. ESBasic覆盖的内容包括:对象管理.插件.网络(Socket).多线程.Em

回调定时器ICallbackTimer --ESBasic 可复用的.NET类库(07)

 1.缘起:     举个例子也许就能够说清楚回调定时器的用途.假设我的订单系统接收各种不同类型的订单,当订单A进来时,系统根据订单的类型和其它特征进行综合判断后,决定A订单要在2秒之后被方法M1处理:接下来收到的B订单经过同样的判断后,决定要在10秒后被方法M2处理,--.这时候就可以用回调定时器来管理这些将要被延迟一定时间再执行的任务.     当然,我们可以使用定时器或前面介绍的循环引擎来实现这样的功能,只不过我们自己需要手动管理注册的定时回调任务,并且定时检查每一个未处理订单是否已经到了

优先级管理器 IPriorityManager -- ESBasic 可复用的.NET类库(14)

1.缘起:     假设我们的订单处理系统所要处理的订单是有优先级的,也就是说,不同的订单类型所要求被处理的紧迫程度不同,对那些优先级高的注单要先处理,对于优先级低的注单可稍后处理.对于处于同一优先级的订单了,就按照其到达的先后顺序进行处理.     这是一个典型的管理具有优先级的对象的需求,注单就是具有优先级(With Priority)的对象.我设计了ESBasic.ObjectManagement.Managers.IPriorityManager优先级管理器(确切地说,应该称之为"具有优

心跳监测器 IHeartBeatChecker -- ESBasic 可复用的.NET类库(09)

1.缘起:     假设我们的C/S系统中服务端与客户端之间采用UDP进行通信,那么服务端如何知道每个客户端当前是否仍然在线了?有可能某个客户端一直没有退出,但是在很长一段时间内都没有与服务端作任何通信,那么服务端就应该认为这个客户端已经离线了吗?为了能让服务端掌握每个客户端是否在线的状态,我们可以这样做,只要客户端一启动起来,就每隔一段时间间隔(如10秒)就向服务端发一个"我还在线"的消息,以表明自己的状态.而服务端如果在一个更大的时间间隔内(如20秒)都没有收到某个客户端的任何消息

时刻 ShortTime --ESBasic 可复用的.NET类库(01)

          (如果您能对照着源码来阅读本文,效果会更好.) 1.缘起:        假设我们的员工打卡系统,需要设定公司规定的上班时间.下班时间.以及还要对员工是否迟到早退等这些情况进行判断.        我们以什么方式来记录类似上下班时间这样只有时分秒没有年月日的时间了?你说可以使用DateTime,但是合适吗?总是觉得用DateTime来表示上下班的时间很别扭,因为我们的上下班时间并需要指定到具体的哪一天啊.        我设计了ESBasic.ShortTime来对类似上下班

热缓存 IHotCache --ESBasic 可复用的.NET类库(19)

1.缘起:     假设我们有一个订单系统,现在这个系统要增加一个功能――允许客人查核他认为有问题的订单的详细信息.当客人觉得自己的某个订单不对劲时,他首先会从订单系统查询这个订单的详细信息,然后打电话告诉我们的客服有问题的订单的编号,客服再去查核,如果属实,客服还要进一步上报,如果该订单非常重要,则可能需要更进一步上报复查等.     从这个需求我们看到,同一个订单可能会在比较短的时间内查询数次甚至数十次,所以我们可以称这个订单为"热点"订单.而其它的成千上万的订单可能在一个月内都不