Linux下设计并发队列

设计并发队列

#include <pthread.h>
#include <list>
using namespace std;

template <typename T>
class Queue
{
public:
    Queue( )
    {
        pthread_mutex_init(&_lock, NULL);
    }
    ~Queue( )
    {
        pthread_mutex_destroy(&_lock);
    }
    void push(const T& data);
    T pop( );
private:
    list<T> _list;
    pthread_mutex_t _lock;
};

template <typename T>
void Queue<T>::push(const T& value )
{
    pthread_mutex_lock(&_lock);
    _list.push_back(value);
    pthread_mutex_unlock(&_lock);
}

template <typename T>
T Queue<T>::pop( )
{
    if (_list.empty( ))
    {
        throw "element not found";
    }
    pthread_mutex_lock(&_lock);
    T _temp = _list.front( );
    _list.pop_front( );
    pthread_mutex_unlock(&_lock);
    return _temp;
}

上述代码是有效的。但是,请考虑这样的情况:您有一个很长的队列(可能包含超过 100,000 个元素),而且在代码执行期间的某个时候,从队列中读取数据的线程远远多于添加数据的线程。因为添加和取出数据操作使用相同的互斥锁,所以读取数据的速度会影响写数据的线程访问锁。那么,使用两个锁怎么样?一个锁用于读取操作,另一个用于写操作。给出修改后的 Queue 类。

template <typename T>
class Queue
{
public:
    Queue( )
    {
        pthread_mutex_init(&_rlock, NULL);
        pthread_mutex_init(&_wlock, NULL);
    }
    ~Queue( )
    {
        pthread_mutex_destroy(&_rlock);
        pthread_mutex_destroy(&_wlock);
    }
    void push(const T& data);
    T pop( );
private:
    list<T> _list;
    pthread_mutex_t _rlock, _wlock;
};

template <typename T>
void Queue<T>::push(const T& value )
{
    pthread_mutex_lock(&_wlock);
    _list.push_back(value);
    pthread_mutex_unlock(&_wlock);
}

template <typename T>
T Queue<T>::pop( )
{
    if (_list.empty( ))
    {
        throw "element not found";
    }
    pthread_mutex_lock(&_rlock);
    T _temp = _list.front( );
    _list.pop_front( );
    pthread_mutex_unlock(&_rlock);
    return _temp;
}

设计并发阻塞队列

目前,如果读线程试图从没有数据的队列读取数据,仅仅会抛出异常并继续执行。但是,这种做法不总是我们想要的,读线程很可能希望等待(即阻塞自身),直到有数据可用时为止。这种队列称为阻塞的队列。如何让读线程在发现队列是空的之后等待?一种做法是定期轮询队列。但是,因为这种做法不保证队列中有数据可用,它可能会导致浪费大量 CPU 周期。推荐的方法是使用条件变量,即 pthread_cond_t 类型的变量。

template <typename T>
class BlockingQueue
{
public:
    BlockingQueue ( )
    {
        pthread_mutexattr_init(&_attr);
        // set lock recursive
        pthread_mutexattr_settype(&_attr,PTHREAD_MUTEX_RECURSIVE_NP);
        pthread_mutex_init(&_lock,&_attr);
        pthread_cond_init(&_cond, NULL);
    }
    ~BlockingQueue ( )
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    }
    void push(const T& data);
    bool push(const T& data, const int seconds); //time-out push
    T pop( );
    T pop(const int seconds); // time-out pop

private:
    list<T> _list;
    pthread_mutex_t _lock;
    pthread_mutexattr_t _attr;
    pthread_cond_t _cond;
};

template <typename T>
T BlockingQueue<T>::pop( )
{
    pthread_mutex_lock(&_lock);
    while (_list.empty( ))
    {
        pthread_cond_wait(&_cond, &_lock) ;
    }
    T _temp = _list.front( );
    _list.pop_front( );
    pthread_mutex_unlock(&_lock);
    return _temp;
}

template <typename T>
void BlockingQueue <T>::push(const T& value )
{
    pthread_mutex_lock(&_lock);
    const bool was_empty = _list.empty( );
    _list.push_back(value);
    pthread_mutex_unlock(&_lock);
    if (was_empty)
        pthread_cond_broadcast(&_cond);
}

并发阻塞队列设计有两个要注意的方面:

1.可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 会释放至少一个等待条件变量的线程,这个线程不一定是等待时间最长的读线程。尽管使用 pthread_cond_signal 不会损害阻塞队列的功能,但是这可能会导致某些读线程的等待时间过长。

2.可能会出现虚假的线程唤醒。因此,在唤醒读线程之后,要确认列表非空,然后再继续处理。强烈建议使用基于 while 循环的 pop()。

设计有超时限制的并发阻塞队列

在许多系统中,如果无法在特定的时间段内处理新数据,就根本不处理数据了。例如,新闻频道的自动收报机显示来自金融交易所的实时股票行情,它每 n 秒收到一次新数据。如果在 n 秒内无法处理以前的一些数据,就应该丢弃这些数据并显示最新的信息。根据这个概念,我们来看看如何给并发队列的添加和取出操作增加超时限制。这意味着,如果系统无法在指定的时间限制内执行添加和取出操作,就应该根本不执行操作。

template <typename T>
bool BlockingQueue <T>::push(const T& data, const int seconds)
{
    struct timespec ts1, ts2;
    const bool was_empty = _list.empty( );
    clock_gettime(CLOCK_REALTIME, &ts1);
    pthread_mutex_lock(&_lock);
    clock_gettime(CLOCK_REALTIME, &ts2);
    if ((ts2.tv_sec – ts1.tv_sec) <seconds)
    {
        was_empty = _list.empty( );
        _list.push_back(value);
    }
    pthread_mutex_unlock(&_lock);
    if (was_empty)
        pthread_cond_broadcast(&_cond);
}

template <typename T>
T BlockingQueue <T>::pop(const int seconds)
{
    struct timespec ts1, ts2;
    clock_gettime(CLOCK_REALTIME, &ts1);
    pthread_mutex_lock(&_lock);
    clock_gettime(CLOCK_REALTIME, &ts2);

    // First Check: if time out when get the _lock
    if ((ts1.tv_sec – ts2.tv_sec) < seconds)
    {
        ts2.tv_sec += seconds; // specify wake up time
        while(_list.empty( ) && (result == 0))
        {
            result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ;
        }
        if (result == 0) // Second Check: if time out when timedwait
        {
            T _temp = _list.front( );
            _list.pop_front( );
            pthread_mutex_unlock(&_lock);
            return _temp;
        }
    }
    pthread_mutex_unlock(&lock);
    throw "timeout happened";
}

设计有大小限制的并发阻塞队列

最后,讨论有大小限制的并发阻塞队列。这种队列与并发阻塞队列相似,但是对队列的大小有限制。在许多内存有限的嵌入式系统中,确实需要有大小限制的队列。
对于阻塞队列,只有读线程需要在队列中没有数据时等待。对于有大小限制的阻塞队列,如果队列满了,写线程也需要等待。

template <typename T>
class BoundedBlockingQueue
{
public:
    BoundedBlockingQueue (int size) : maxSize(size)
    {
        pthread_mutex_init(&_lock, NULL);
        pthread_cond_init(&_rcond, NULL);
        pthread_cond_init(&_wcond, NULL);
        _array.reserve(maxSize);
    }
    ~BoundedBlockingQueue ( )
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_rcond);
        pthread_cond_destroy(&_wcond);
    }
    void push(const T& data);
    T pop( );
private:
    vector<T> _array; // or T* _array if you so prefer
    int maxSize;
    pthread_mutex_t _lock;
    pthread_cond_t _rcond, _wcond;
};

template <typename T>
void BoundedBlockingQueue <T>::push(const T& value )
{
    pthread_mutex_lock(&_lock);
    const bool was_empty = _array.empty( );
    while (_array.size( ) == maxSize)
    {
        pthread_cond_wait(&_wcond, &_lock);
    }
    _array.push_back(value);
    pthread_mutex_unlock(&_lock);
    if (was_empty)
        pthread_cond_broadcast(&_rcond);
}

template <typename T>
T BoundedBlockingQueue<T>::pop( )
{
    pthread_mutex_lock(&_lock);
    const bool was_full = (_array.size( ) == maxSize);
    while(_array.empty( ))
    {
        pthread_cond_wait(&_rcond, &_lock) ;
    }
    T _temp = _array.front( );
    _array.erase( _array.begin( ));
    pthread_mutex_unlock(&_lock);
    if (was_full)
        pthread_cond_broadcast(&_wcond);
    return _temp;
}

要注意的第一点是,这个阻塞队列有两个条件变量而不是一个。如果队列满了,写线程等待 _wcond 条件变量;读线程在从队列中取出数据之后需要通知所有线程。同样,如果队列是空的,读线程等待 _rcond 变量,写线程在把数据插入队列中之后向所有线程发送广播消息。如果在发送广播通知时没有线程在等待 _wcond 或 _rcond,会发生什么?什么也不会发生;系统会忽略这些消息。还要注意,两个条件变量使用相同的互斥锁。

 

来源《用于并行计算的多线程数据结构》
http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures1/index.html
http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/index.html

 

作者:阿凡卢

出处:http://www.cnblogs.com/luxiaoxun/

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

时间: 2024-09-16 04:45:26

Linux下设计并发队列的相关文章

linux中编写并发队列类

 这篇文章主要介绍了linux中编写并发队列类,功能有:并发阻塞队列.有超时限制.有大小限制 设计并发队列   代码如下: #include <pthread.h> #include <list> using namespace std;   template <typename T> class Queue  {  public:      Queue( )      {          pthread_mutex_init(&_lock, NULL); 

linux网络编程-Linux下epoll并发数量达到1987个后涨不上去

问题描述 Linux下epoll并发数量达到1987个后涨不上去 Linux下epoll并发数量达到1987个后涨不上去(达到1987个链接后,无法接受新链接,并非最大开文件句柄限制所导致) 我在linux下写来一个简单的epoll server程序,在局域网中另一台windows计算机采用多线程的形式链接server,但是大概epoll链接了1987个套接字后,再也不能增加新链接了(并非最大文件句柄数量所限制),不清楚所什么原因,跪求解答,谢谢各位好心人. server代码: #include

消息队列 聊天室-在Linux下用消息队列实现聊天室的问题:目前实现一问一答

问题描述 在Linux下用消息队列实现聊天室的问题:目前实现一问一答 我在Linux下写了一个消息队列实现聊天室的代码,也实现了: 目前只能一问一答的来进行,如果客户端连续提问几个问题一次发送,服务端只能处理一条后,然后才显示下一条消息再回答,逐条显示. 我想让服务端一起接收显示消息,并且能够多条一起发送,可以实现吗?怎么实现?

教你修改Linux下高并发socket最大连接数所受的各种限制

1.修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统为每个TCP连接都要创建一个socket句柄,每个socket句柄同时也是一个文件句柄).可使用ulimit命令查看系统允许当前用户进程打开的文件数限制: [speng@as4 ~]$ ulimit -n 1024 这表示当前用户的每个进程最多允许同时打开1024个文件,这1024个文件中还得除去每个进

Linux下高并发socket最大连接数所受的各种限制(详解)_Linux

1.修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统为每个TCP连接都要创建一个socket句柄,每个socket句柄同时也是一个文件句柄).可使用ulimit命令查看系统允许当前用户进程打开的文件数限制: [speng@as4 ~]$ ulimit -n 1024 这表示当前用户的每个进程最多允许同时打开1024个文件,这1024个文件中还得除去每个进

Linux下套接字详解(十)---epoll模式下的IO多路复用服务器

epoll模型简介 epoll可是当前在Linux下开发大规模并发网络程序的热门人选,epoll 在Linux2.6内核中正式引入,和select相似,其实都I/O多路复用技术而已,并没有什么神秘的. 其实在Linux下设计并发网络程序,向来不缺少方法,比如典型的Apache模型(Process Per Connection,简称PPC),TPC(Thread PerConnection)模型,以及select模型和poll模型,那为何还要再引入Epoll这个东东呢?那还是有得说说的- 常用模型

linux中编写自己的并发队列类(Queue 并发阻塞队列)_linux shell

设计并发队列 复制代码 代码如下: #include <pthread.h>#include <list>using namespace std; template <typename T>class Queue { public:     Queue( )     {         pthread_mutex_init(&_lock, NULL);     }     ~Queue( )     {         pthread_mutex_destroy

Linux集群和自动化维1.4.2 优化Linux下的内核TCP参数以提高系统性能

1.4.2 优化Linux下的内核TCP参数以提高系统性能  内核的优化跟服务器的优化一样,应本着稳定安全的原则.下面以Squid服务器为例来说明,待客户端与服务器端建立TCP/IP连接后就会关闭Socket,服务器端连接的端口状态也就变为TIME_WAIT了.那是不是所有执行主动关闭的Socket都会进入TIME_WAIT状态呢?有没有什么情况可使主动关闭的Socket直接进入CLOSED状态呢?答案是主动关闭的一方在发送最后一个ACK后就会进入TIME_WAIT状态,并停留2MSL(报文最大

优化Linux下的内核TCP参数以提高系统性能

内核的优化跟服务器的优化一样,应本着稳定安全的原则.下面以64位的Centos5.5下的Squid服务器为例来说明,待客户端与服务器端建立TCP/IP连接后就会关闭SOCKET,服务器端连接的端口状态也就变为TIME_WAIT了.那是不是所有执行主动关闭的SOCKET都会进入TIME_WAIT状态呢?有没有什么情况使主动关闭的SOCKET直接进入CLOSED状态呢?答案是主动关闭的一方在发送最后一个ACK后就会进入TIME_WAIT状态,并停留2MSL(Max Segment LifeTime)