可伸缩多线程任务队列

在我们的工作中,我们经常需要异步执行一些任务,下面介绍的这个可伸缩多线程队列,可满足我们的需求。

  出自:http://www.codeproject.com/Articles/4148/Multithreaded-Job-Queue,主要有以下几个功能:

    1、任务队列是多线程,许多任务可以异步进行,任务队列使用线程池来执行任务。

    2、任务队列支持优先级,优先级高的任务优先执行(即使是后来添加的)

    3、任务队列可以被暂停,但是用户还是可以添加任务,当任务队列被唤醒时,任务可以继续执行下去

    4、在运行过程中,任务队列使用的线程池,用户可以自行增加和减少

  大体框架主要由3个类构成

    1、CJob,任务类,用户需要从该类派生来实现自身需要完成的任务

    2、CJobExecuter,任务执行类,任务均由该类来调用执行,每一个类相当于对应一个线程

    3、CMThreadedJobQ,多线程任务队列,添加任务已经任务的分发均由该类完成,该类维护一个任务队列和一个完成队列的线程池。

  类图如下:

  该例子中,CJobExecuter和CMThreadJobQ这两个类的调用关系是非常值得我们学习的,同时,CJob作为一个基类,子类派生可以实现不同的任务,可扩展性也不错。源代码解析如下:

  Job.h文件:

class CJob
{
public:
    CJob();
    virtual ~CJob();

    BOOL m_Completed;         //任务是否完成:TRUE 完成,FALSE 未完成
    static long lastUsedID;   //最后的ID

    //================================================================================================
    //函数名:                  setPriority
    //函数描述:                设置任务优先级
    //输入:                    [in] priority 优先级别
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void setPriority(int priority);

    //================================================================================================
    //函数名:                  getPriority
    //函数描述:                返回任务优先级
    //输入:                    无
    //输出:                    无
    //返回:                    任务优先级
    //================================================================================================
    int getPriority();

    //================================================================================================
    //函数名:                  getID
    //函数描述:                返回任务ID
    //输入:                    无
    //输出:                    无
    //返回:                    任务ID
    //================================================================================================
    long getID();

    //================================================================================================
    //函数名:                  setAutoDelete
    //函数描述:                设置完成任务后是否删除任务
    //输入:                    [in] autoDeleteFlag
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void setAutoDelete(BOOL autoDeleteFlag = TRUE);

    //================================================================================================
    //函数名:                  AutoDelete
    //函数描述:                返回删除任务标记
    //输入:                    无
    //输出:                    无
    //返回:                    任务标记
    //================================================================================================
    BOOL AutoDelete();

    //================================================================================================
    //函数名:                  execute
    //函数描述:                任务真正工作的函数,纯虚函数,需要子类化实现
    //输入:                    无
    //输出:                    无
    //返回:                    任务ID
    //================================================================================================
    virtual void execute() = 0;
private:
    long m_ID;               //任务ID
    BOOL m_autoDeleteFlag;   //是否自动删除任务标记,TRUE 删除,FALSE 不删除,默认为TRUE
    int m_priority;          //任务优先级,默认为5

};

  Job.cpp文件:

long CJob::lastUsedID = 0;

CJob::CJob()
{
    this->m_ID = InterlockedIncrement(&lastUsedID);
    this->m_autoDeleteFlag = TRUE;
    this->m_priority = 5;
    this->m_Completed= FALSE;
}

CJob::~CJob()
{
}

BOOL CJob::AutoDelete()
{
    return m_autoDeleteFlag;
}

void CJob::setAutoDelete(BOOL autoDeleteFlag)
{
    m_autoDeleteFlag = autoDeleteFlag;
}

long CJob::getID()
{
    return this->m_ID;
}

int CJob::getPriority()
{
    return this->m_priority;
}

void CJob::setPriority(int priority)
{
    this->m_priority = priority;
}

  JobExecuter.h文件:

//一个对象对应一个线程,执行任务Job
class CJobExecuter
{
public:
    CJobExecuter(CMThreadedJobQ *pJobQ);
    virtual ~CJobExecuter();

    //================================================================================================
    //函数名:                  stop
    //函数描述:                停止执行任务
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void stop();

    //================================================================================================
    //函数名:                  execute
    //函数描述:                执行一个任务
    //输入:                    [in] pJob 任务指针
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void execute(CJob* pJob);

    static UINT ThreadFunction(LPVOID pParam); //线程函数

    CMThreadedJobQ* m_pJobQ;                   //指向线程任务队列指针
    CJob* m_pJob2Do;                           //指向正在执行任务的指针
    int m_flag;                                //线程执行标记
    CWinThread* m_pExecuterThread;             //线程标识符
};

  JobExecuter.cpp文件:

#define STOP_WORKING -1
#define KEEP_WORKING  0

CJobExecuter::CJobExecuter(CMThreadedJobQ *pJobQ)
{
    this->m_pJobQ= pJobQ;
    this->m_pExecuterThread= AfxBeginThread(ThreadFunction,this);
    this->m_pJob2Do = NULL;
    this->m_flag = KEEP_WORKING;
}

CJobExecuter::~CJobExecuter()
{
    if(this->m_pExecuterThread!= NULL )
    {
        this->m_pExecuterThread->ExitInstance();
        delete m_pExecuterThread;
    }
}

UINT CJobExecuter::ThreadFunction(LPVOID pParam)
{
    CJobExecuter *pExecuter = (CJobExecuter *)pParam;
    pExecuter->m_flag = 1;
    ::Sleep(1);
    CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);
    while(pExecuter->m_flag !=STOP_WORKING )
    {
        if(pExecuter->m_pJob2Do!=  NULL)
        {
            pExecuter->m_pJob2Do->execute();
            pExecuter->m_pJob2Do->m_Completed = TRUE;
            if(pExecuter->m_pJob2Do->AutoDelete())
                delete pExecuter->m_pJob2Do;
            pExecuter->m_pJob2Do = NULL;
        }

        if(pExecuter->m_pJobQ == NULL) break;

        CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);
        singleLock.Lock();
        if(pExecuter->m_pJobQ->getNoOfExecuter() > pExecuter->m_pJobQ->getMaxNoOfExecuter()) //CJobExecuter个数大于最大值,自动销毁
        {
            pExecuter->stop();
            singleLock.Unlock();
        }
        else
        {
            pExecuter->m_pJobQ->addFreeJobExecuter(pExecuter);      //完成任务后,添加到CMThreadedJobQ的空闲队列中
            singleLock.Unlock();
            pExecuter->m_pJobQ->m_pObserverThread->ResumeThread();
            pExecuter->m_pExecuterThread->SuspendThread();
        }
    }

    if(pExecuter->m_pJobQ != NULL)
    {
        pExecuter->m_pJobQ->deleteJobExecuter(pExecuter);
    }
    else
    {
        delete pExecuter;
    }

    return 0;
}

void CJobExecuter::execute(CJob* pJob)
{
    this->m_pJob2Do = pJob;
    ::Sleep(0);
    this->m_pExecuterThread->ResumeThread();
}

void CJobExecuter::stop()
{
    this->m_flag = STOP_WORKING;
    this->m_pExecuterThread->ResumeThread();
}

  MThreadedJobQ.h文件:

typedef CTypedPtrList< CPtrList ,CJob*>CJobQList;

//线程池任务队列
class CMThreadedJobQ
{

public:
    typedef struct THNODE
    {
        CJobExecuter* pExecuter;
        THNODE * pNext ;
    } THNODE;

    CMThreadedJobQ();
    virtual ~CMThreadedJobQ();

    //================================================================================================
    //函数名:                  deleteJobExecuter
    //函数描述:                删除一个JobExecuter对象
    //输入:                    [in] pEx
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void deleteJobExecuter(CJobExecuter *pEx);

    //================================================================================================
    //函数名:                  setMaxNoOfExecuter
    //函数描述:                设置CJobExecuter的个数
    //输入:                    [in] value
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void setMaxNoOfExecuter(int value);

    //================================================================================================
    //函数名:                  addJobExecuter
    //函数描述:                添加一个CJobExecuter
    //输入:                    [in] pEx
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void addJobExecuter(CJobExecuter *pEx);

    //================================================================================================
    //函数名:                  getJobExecuter
    //函数描述:                返回一个CJobExecuter
    //输入:                    无
    //输出:                    无
    //返回:                    处理任务的指针
    //================================================================================================
    CJobExecuter* getJobExecuter();

    //================================================================================================
    //函数名:                  addFreeJobExecuter
    //函数描述:                添加一个CJobExecuter
    //输入:                    [in] pEx
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void addFreeJobExecuter(CJobExecuter *pEx);

    //================================================================================================
    //函数名:                  addJob
    //函数描述:                添加一个任务
    //输入:                    [in] pJob
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void addJob(CJob *pJob);

    //================================================================================================
    //函数名:                  getMaxNoOfExecuter
    //函数描述:                获取CJobExecuter个数的最大值
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    int getMaxNoOfExecuter();

    //================================================================================================
    //函数名:                  getNoOfExecuter
    //函数描述:                获取当前CJobExecuter的个数
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    int getNoOfExecuter();

    static UINT JobObserverThreadFunction(LPVOID);

    //================================================================================================
    //函数名:                  pause
    //函数描述:                挂起JobObserverThread线程
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void pause();

    //================================================================================================
    //函数名:                  resume
    //函数描述:                唤醒JobObserverThread线程
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void resume();    

    CWinThread* m_pObserverThread; //向空闲的executer线程添加任务的线程
    CCriticalSection m_cs;         //关键代码段,用于互斥
    CJobQList m_jobQList;          //任务队列
private :
    BOOL m_pause;                  //JobObserverThread线程运行标记
    int m_MaxNoOfExecuter;         //CJobExecuter最大个数
    int m_NoOfExecuter;            //当前CJobExecuter个数
    THNODE* m_pFreeEList;          //维护空闲处理任务线程的队列
    THNODE* m_pAllEList;           //维护所有处理任务线程的队列
};

  MThreadedJobQ.cpp文件:

CMThreadedJobQ::CMThreadedJobQ()
{
    m_MaxNoOfExecuter = 2;
    m_pause = FALSE;
    m_pObserverThread = AfxBeginThread(JobObserverThreadFunction,this);
    m_pFreeEList =NULL;
    m_NoOfExecuter =0;
    m_pAllEList = NULL;
}

CMThreadedJobQ::~CMThreadedJobQ()
{
    THNODE* pTempNode;
    while (m_pAllEList != NULL)
    {
        pTempNode = m_pAllEList->pNext;
        delete m_pAllEList->pExecuter;
        delete m_pAllEList;
        m_pAllEList = pTempNode;
    }    

    while (m_pFreeEList != NULL)
    {    pTempNode = m_pFreeEList->pNext;
        delete m_pFreeEList;
        m_pFreeEList = pTempNode;
    }    

    m_pObserverThread->ExitInstance();
    delete m_pObserverThread;
}

void CMThreadedJobQ::pause()
{
    this->m_pause = TRUE;
}

void CMThreadedJobQ::resume()
{
    this->m_pause = FALSE;
    this->m_pObserverThread->ResumeThread();
}

UINT CMThreadedJobQ::JobObserverThreadFunction(LPVOID pParam)
{
    CMThreadedJobQ *pMTJQ = (CMThreadedJobQ *)pParam;
    CJobExecuter *pJExecuter;

    while(TRUE)
    {
        Sleep(100);
        if(pMTJQ->m_pause != TRUE)
        {
            while(!pMTJQ->m_jobQList.IsEmpty() )
            {
                pJExecuter = pMTJQ->getJobExecuter();
                if( pJExecuter!=NULL)
                {
                    pMTJQ->m_cs.Lock();
                    pJExecuter->execute(pMTJQ->m_jobQList.GetHead());
                    pMTJQ->m_jobQList.RemoveHead();
                    AfxGetApp()->m_pMainWnd->PostMessage(REFRESH_LIST);
                    pMTJQ->m_cs.Unlock();
                }
                else
                {
                    break;
                }
                if(pMTJQ->m_pause == TRUE)
                    break;
            }
        }
        pMTJQ->m_pObserverThread->SuspendThread();
    }
    return 0;
}

int CMThreadedJobQ::getNoOfExecuter()
{
    return this->m_NoOfExecuter;
}

int CMThreadedJobQ::getMaxNoOfExecuter()
{
    return this->m_MaxNoOfExecuter;
}

void CMThreadedJobQ::addJob(CJob *pJob)
{
    CJob * pTempJob;
    CSingleLock sLock(&this->m_cs);
    sLock.Lock();
    POSITION pos,lastPos;
    pos = this->m_jobQList.GetHeadPosition();
    lastPos = pos;
    if(pos != NULL)
        pTempJob =this->m_jobQList.GetHead();
    while(pos != NULL )
    {
        if( pJob->getPriority() > pTempJob->getPriority())
            break;
        lastPos = pos;
        pTempJob =     this->m_jobQList.GetNext(pos);
    }
    if(pos == NULL)
        this->m_jobQList.AddTail(pJob);
    else
        this->m_jobQList.InsertBefore(lastPos,pJob);
    this->m_pObserverThread->ResumeThread();
    sLock.Unlock();
}

void CMThreadedJobQ::addFreeJobExecuter(CJobExecuter *pEx)
{
    m_cs.Lock();
    THNODE* node = new THNODE;
    node->pExecuter = pEx;
    node->pNext = this->m_pFreeEList;
    this->m_pFreeEList = node;
    m_cs.Unlock();
}

CJobExecuter* CMThreadedJobQ::getJobExecuter()
{
    THNODE *pTemp;
    CJobExecuter *pEx=NULL;
    m_cs.Lock();

    if(this->m_pFreeEList != NULL)  //有空闲CJobExecuter,就返回
    {
        pTemp = this->m_pFreeEList;
        this->m_pFreeEList = this->m_pFreeEList->pNext;
        pEx = pTemp->pExecuter;
        delete pTemp ;
        m_cs.Unlock();
        return pEx;
    }

    if(this->m_NoOfExecuter < this->m_MaxNoOfExecuter) //没有空闲CJobExecuter,并且当前CJobExecuter小于最大值,就生成一个新的CJobExecuter
    {
        pEx =  new CJobExecuter(this);
        this->addJobExecuter(pEx);
        this->m_NoOfExecuter++;
        m_cs.Unlock();
        return pEx;
    }
    m_cs.Unlock();
    return NULL;
}

void CMThreadedJobQ::addJobExecuter(CJobExecuter *pEx)
{
    m_cs.Lock();
    THNODE* node = new THNODE;
    node->pExecuter= pEx;
    node->pNext = this->m_pAllEList;
    this->m_pAllEList = node;
    m_cs.Unlock();
}

void CMThreadedJobQ::setMaxNoOfExecuter(int value)
{
    this->m_cs.Lock();
    if(value >1 && value <11)
        this->m_MaxNoOfExecuter = value;
    m_pObserverThread->ResumeThread();
    this->m_cs.Unlock();
}

void CMThreadedJobQ::deleteJobExecuter(CJobExecuter *pEx)
{
    THNODE* pNode,*pNodeP;
    CSingleLock singleLock(&m_cs);
    singleLock.Lock();
    if(this->m_pAllEList != NULL)
    {
        pNode = this->m_pAllEList;
        if(pNode->pExecuter == pEx )
        {
          this->m_pAllEList = pNode->pNext;
          delete pNode;
        }
        else
        {
            pNodeP =pNode;
            pNode  = pNode->pNext ;
            while(pNode != NULL )
            {
                if(pNode->pExecuter== pEx ) break;
                pNodeP = pNode;
                pNode  = pNode->pNext ;
            }
            if(pNode!= NULL)
            {
                pNodeP->pNext = pNode->pNext;
                delete pNode;
            }
        }
    }
    this->m_NoOfExecuter--;
    singleLock.Unlock();
    pEx->stop();
    Sleep(1);
    delete pEx;
}

  以上,就是该可伸缩多线程任务的主体框架,当我们工作需要实现类似这样的需要:异步执行多个不同的任务时,这个例子就是一个很好的参考例子,我研究这些代码只是为了让我在遇到这种问题的时候,可以有一个思路去思考,而不至于无从下手,仅此而已。

时间: 2024-12-21 05:00:54

可伸缩多线程任务队列的相关文章

基于ArcGIS的JLKEngine中间件平台可带框架源码购买并提供免费升级服务!

问题描述 基于ArcGIS的JLKEngine中间件平台底层类库结构简介1,系统总体上划分为如下几个层次:JLKEngine核心库:它是JLKEnigne的运行基础,定义了框架运行模型.框架扩展模式.并对ArcGISEngine中可用的操作进行了封装,提供了一组函数实现对空间数据的快速处理,以便于在基于ArcGISEngine的应用系统开发中应用以简化开发工作难度.GIS基础功能库:它是在ArcGISEngine基础上按地图制图.数据编辑.网络分析.三维分析等对数据处理划分的一组类库.在这组

你想找的Python资料这里全都有!没有你找不到!史上最全资料合集

GitHub 上有一个 Awesome - XXX 系列的资源整理,资源非常丰富,涉及面非常广.awesome-python 是 vinta 发起维护的 Python 资源列表,内容包括:Web框架.网络爬虫.网络内容提取.模板引擎.数据库.数据可视化.图片处理.文本处理.自然语言处理.机器学习.日志.代码分析等.在给大家分享之前呢,小编推荐一下一个挺不错的交流宝地,里面都是一群热爱并在学习Python的小伙伴们,大几千了吧,各种各样的人群都有,特别喜欢看到这种大家一起交流解决难题的氛围,群资料

多核时代多线程编程(一)基本策略

1.1问题分析 1.2分工原则 1.2.1确定线程数 1.2.2确定任务的数量 1.3共享可变性 1.4小结 1.5参考资料 大家对并发(concurrency).多线程(Multi-Threading)程序设计肯定不陌生,因为在当今多核CPU时代到处可见,从底层的操作系统(OS)到上层的应用程序,从服务端到客户端,从低级语言到高级程序语言.从分布式集群到大数据处理等等,都可以看到并发程序的身影.可以这样说,只要有计算机软件的地方,就会存在并发程序.大家肯定知道,为何到处都有并发程序?就是因为它

java多线程总结一:线程的两种创建方式及优劣比较

http://blog.csdn.net/touch_2011/article/details/6891026 1.通过实现Runnable接口线程创建 (1).定义一个类实现Runnable接口,重写接口中的run()方法.在run()方法中加入具体的任务代码或处理逻辑. (2).创建Runnable接口实现类的对象. (3).创建一个Thread类的对象,需要封装前面Runnable接口实现类的对象.(接口可以实现多继承) (4).调用Thread对象的start()方法,启动线程 示例代码

javascript 多线程异步队列

首先,你得知道 jQuery.Deferred 的大致用法,然后,我们进入正题吧: 库代码: /*!  * 多线程异步队列  * 依赖 jQuery 1.8+ (如果你用的是 1.6或1.7, 只要将源码中的 then方法替换为pipe方法 即可)  */ /**  * @n {Number} 正整数, 线程数量  */ function Queue (n) {     n = parseInt(n 1, 10);     return new Queue.prototype.init( (n

Java 5.0多线程编程实践

Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供 了丰富的API多线程编程在Java 5中更加容易,灵活.本文通过一个网络服务器 模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列 ,可重入锁等,还实践了Callable, Future等接口,并使用了Java 5的另外一 个新特性泛型. 简介 本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一 个新线程为该连接服务,服务内容为往客户端输送一些字符信息.一个

JavaScript能不能多线程?

JavaScript的setTimeout与setInterval是两个很容易欺骗别人感情的方法,因为我们开始常常以为调用了就会按既定的方式执行, 我想不少人都深有同感, 例如 [javascript] setTimeout( function(){ alert('你好!'); } , 0); setInterval( callbackFunction , 100); setTimeout( function(){ alert('你好!'); } , 0); setInterval( callb

iOS多线程编程之三——GCD的应用

iOS多线程编程之三--GCD的应用 一.引言 在软件开发中使用多线程可以大大的提升用户体验度,增加工作效率.iOS系统中提供了多种分线程编程的方法,在前两篇博客都有提及: NSThread类进行多线程编程:http://my.oschina.net/u/2340880/blog/416524. NSOperation进行多线程操作编程:http://my.oschina.net/u/2340880/blog/416782. 上两个进行多线程编程的机制都是封装于Object-C的类与方法.这篇博

Java多线程知识小抄集(一)

本文主要整理博主遇到的Java多线程的相关知识点,适合速记,故命名为"小抄集".本文没有特别重点,每一项针对一个多线程知识做一个概要性总结,也有一些会带一点例子,习题方便理解和记忆. 1. interrupted与isInterrupted的区别 interrupted():测试当前线程是否已经是中断状态,执行后具有状态标志清除为false的功能. isInterrupted():测试线程Thread对象是否已经是中断状态,但不清除状态标志. 方法: public static boo