在前面的几篇博客中,我探讨过消息队列的设计和定时器的设计,可以说在这方面下了很多功夫,做了很多参考,当然也写了很多测试。
就拿消息队列来说吧,为了更好的吞吐量,更加快速的相应,我再减少加锁粒度,减少加锁频率方面做了不少工作;曾经设计了一个三个队列作为缓冲的TripleList(参考博文消息队列),原理为:一个读队列,一个写队列,一个缓冲队列,三个队列分别用数组形式体现,枷锁只要保证缓冲队列一致既可,即写队列向缓冲队列提交数据的时候,需要枷锁,读队列再向缓冲队列交换数据的时候,需要枷锁!可是在消息很少的时候,需要主动的将写队列同步到缓冲队列中;后来利用条件变量和读写锁相结合做了一个基于条件变量的消息队列,效率也不错,只不过需要在读取的时候比较麻烦,需要做一次一毫秒延迟的同步。
昨天忽然心血来潮,决定从新设计,只要保证性能,并且使用简单,于是消息队列编程了这个样子:
#ifndef DUPLEXQUEUE_H_
#define DUPLEXQUEUE_H_
#include <pthread.h>
#include <list>
#include "Mutex.h"
typedef void* CommonItem;
class DuplexQueue
{
public:
DuplexQueue();
virtual ~DuplexQueue();
int append(CommonItem item);
std::list<CommonItem>* serial_read(std::list<CommonItem>* list);
private:
std::list<CommonItem> _write_list;
Mutex _write_mutex;
};
#endif /* DUPLEXQUEUE_H_ */
#include <DuplexQueue.h>
DuplexQueue::DuplexQueue()
{
this->_write_list.clear();
}
DuplexQueue::~DuplexQueue()
{
}
int DuplexQueue::append(CommonItem item)
{
Auto_Mutex auto_mutex(this->_write_mutex);
this->_write_list.push_back(item);
return 0;
}
std::list<CommonItem>* DuplexQueue::serial_read(std::list<CommonItem>* list)
{
Auto_Mutex auto_mutex(this->_write_mutex);
std::swap(this->_write_list,*list);
return list;
}
其实只需要不足百余行代码就足以实现一个性能不错的消息队列,而且在使用上也很简单,不必担心写同步问题,不必担心读同步。使用方法:
std::list<void*> list_to_process;
this->_recv_db_msgqueue->serial_read(&list_to_process);
JUDGE_RETURN(!list_to_process.empty(),0);
for(std::list<void*>::iterator iter = list_to_process.begin();iter != list_to_process.end();
iter ++)
{
//do something
}
当然这个只是作为一个1:N的设计,如果需要可以将writelist作为交换switchlist,做成m:n的设计;将自己的writelist追加到switchlist中,只要追加的时候加锁就可以保证同步的问题!
我们再看下定时器的设计,在两篇博客里面有体现,一篇是定时器的设计,另一篇是stl优先级队列,前者在效率上存在一定的问题,因为每次都是遍历定时器。后者在效率上不存在问题,只是在删除上存在诸多不足,删除要遍历所有。参考了陈硕老师的设计,并且灵活运用了std::set,将定时器设计成这样子:
首先是Timer的设计:
#ifndef TIMER_H_
#define TIMER_H_
#include "GameDefine.h"
using namespace std;
class Timer
{
public:
Timer();
virtual ~Timer();
virtual VOID open(int sec);
/**
* Call to change the timer's interval.
* @a sec represents the new interval
*/
virtual int schedule_timer(INT sec,INT usec = 0);
// 重新设置计时器
virtual VOID set_timer(INT sec);
VOID register_timer();
VOID cancel_timer();
/**
* Called when timer expires. @a current_time represents the current
* time that the Event_Handler was selected for timeout
* dispatching and @a act is the asynchronous completion token that
* was passed in when <schedule_timer> was invoked.
*/
virtual INT handle_timeout(const LONG ¤t_time,const VOID *act = 0);
bool operator < (Timer _timer)
{
return this->_expires_time < _timer._expires_time;
}
bool operator == (Timer _timer)
{
return _timer._expires_time == this->_expires_time;
}
LONG interval()
{
return this->_interval;
}
LONG get_expires_time()
{
return this->_expires_time;
}
VOID set_expires_time(LONG usec)
{
this->_expires_time = usec;
}
BOOL valid() const
{
return this->_valid;
}
private:
LONG _interval;
LONG _now;
LONG _expires_time;
BOOL _valid; // 是否有效
};
#endif /* TIMER_H_ */
Timer::Timer()
{
this->_valid = false;
}
Timer::~Timer()
{
}
VOID Timer::open(int sec)
{
}
VOID Timer::set_timer(INT sec)
{
}
int Timer::schedule_timer(INT sec,INT usec)
{
this->_now = get_os_system_time();
this->_interval = sec * 1000;
this->_expires_time = _now + _interval;
return 0;
}
INT Timer::handle_timeout(const LONG ¤t_time,const VOID *act )
{
return 0;
}
VOID Timer::register_timer()
{
this->_valid = true;
}
VOID Timer::cancel_timer()
{
this->_valid = false;
}
TimerQueue的设计:
#ifndef TIMERQUEUE_H_
#define TIMERQUEUE_H_
#include <set>
class Timer;
class TimerQueue
{
public:
typedef std::pair<long,Timer*> ACTIVETIMER;
TimerQueue();
virtual ~TimerQueue();
virtual int add_timer(Timer* timer);
virtual int cancel_timer(Timer* timer);
virtual std::vector<ACTIVETIMER> process_expire();
private:
std::set<ACTIVETIMER> _timer_queue;
};
#endif /* TIMERQUEUE_H_ */
#include "Timer.h"
#include <TimerQueue.h>
#include "GameUtil.h"
#include "GameStruct.h"
TimerQueue::TimerQueue()
{
}
TimerQueue::~TimerQueue()
{
}
int TimerQueue::add_timer(Timer* timer)
{
JUDGE_RETURN(timer!=NULL,-1);
this->_timer_queue.insert(std::make_pair(timer->get_expires_time(),timer));
return 0;
}
int TimerQueue::cancel_timer(Timer* timer)
{
JUDGE_RETURN(timer!=NULL,-1);
ACTIVETIMER antry(timer->get_expires_time(),timer);
std::set<ACTIVETIMER>::iterator iter = this->_timer_queue.find(antry);
if(iter != this->_timer_queue.end())
{
this->_timer_queue.erase(iter);
}
return 0;
}
std::vector<TimerQueue::ACTIVETIMER> TimerQueue::process_expire()
{
long now = get_os_system_time();
std::vector<ACTIVETIMER> expired;
ACTIVETIMER antry(now,NULL);
std::set<ACTIVETIMER>::iterator end = this->_timer_queue.upper_bound(antry);
std::copy(this->_timer_queue.begin(),end,back_inserter(expired));
this->_timer_queue.erase(this->_timer_queue.begin(),end);
return expired;
}
测试代码:
TimerQueue _queue;
class TestTimer : public Timer
{
public:
virtual INT handle_timeout(const LONG ¤t_time,const VOID *act = 0)
{
printf("interval:%d,timer expired\n",(int)this->interval());
return this->schedule_timer(this->interval()/1000,0);
}
};
int main(int argc,char** argv)
{
TestTimer timer[100];
for(int i = 1;i<100 ;i++)
{
timer[i].schedule_timer(i,0);
_queue.add_timer(&timer[i]);
}
while(true)
{
std::vector<TimerQueue::ACTIVETIMER> expired = _queue.process_expire();
for(std::vector<TimerQueue::ACTIVETIMER>::iterator iter = expired.begin();
iter != expired.end();
iter ++)
{
iter->second->handle_timeout(0,NULL);
_queue.add_timer(iter->second);
}
}
return 0;
}
这个定时器的设计不仅解决了性能的问题,再使用起来也比较方便。
在设计一些基础组件的时候,确实是不应该因为性能作为第一标准,而是应该将易用考虑进来。比如很多选择linux+C++作为服务器的设计,可是有没有考虑过开发的需求,难道每个开发者都要用一台win作为办公机器,再用一个linux作为开发机器? 为什么不退而求其次,用java考虑;或者将Xmanager和虚拟机引入,这样保证保证一台电脑一个员工办公开发两不误,而且保证了成本!