linux中编写并发队列类

 这篇文章主要介绍了linux中编写并发队列类,功能有:并发阻塞队列、有超时限制、有大小限制

设计并发队列
 
代码如下:
#include <pthread.h>
#include <list>
using namespace std;
 
template <typename T>
class Queue 

public: 
    Queue( ) 
    { 
        pthread_mutex_init(&_lock, NULL); 
    } 
    ~Queue( ) 
    { 
        pthread_mutex_destroy(&_lock);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
};
 
template <typename T>
void Queue<T>::push(const T& value ) 

    pthread_mutex_lock(&_lock);
    _list.push_back(value);
    pthread_mutex_unlock(&_lock);
}
 
template <typename T>
T Queue<T>::pop( ) 

    if (_list.empty( )) 
    { 
        throw "element not found";
    }
    pthread_mutex_lock(&_lock); 
    T _temp = _list.front( );
    _list.pop_front( );
    pthread_mutex_unlock(&_lock);
    return _temp;
}
 
 
 
上述代码是有效的。但是,请考虑这样的情况:您有一个很长的队列(可能包含超过 100,000 个元素),而且在代码执行期间的某个时候,从队列中读取数据的线程远远多于添加数据的线程。因为添加和取出数据操作使用相同的互斥锁,所以读取数据的速度会影响写数据的线程访问锁。那么,使用两个锁怎么样?一个锁用于读取操作,另一个用于写操作。给出修改后的 Queue 类。
 
代码如下:
template <typename T>
class Queue 

public: 
    Queue( ) 
    { 
        pthread_mutex_init(&_rlock, NULL); 
        pthread_mutex_init(&_wlock, NULL);
    } 
    ~Queue( ) 
    { 
        pthread_mutex_destroy(&_rlock);
        pthread_mutex_destroy(&_wlock);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    list<T> _list; 
    pthread_mutex_t _rlock, _wlock;
};
 
 
template <typename T>
void Queue<T>::push(const T& value ) 

    pthread_mutex_lock(&_wlock);
    _list.push_back(value);
    pthread_mutex_unlock(&_wlock);
}
 
template <typename T>
T Queue<T>::pop( ) 

    if (_list.empty( )) 
    { 
        throw "element not found";
    }
    pthread_mutex_lock(&_rlock);
    T _temp = _list.front( );
    _list.pop_front( );
    pthread_mutex_unlock(&_rlock);
    return _temp;
}
 
 
 
设计并发阻塞队列
 
目前,如果读线程试图从没有数据的队列读取数据,仅仅会抛出异常并继续执行。但是,这种做法不总是我们想要的,读线程很可能希望等待(即阻塞自身),直到有数据可用时为止。这种队列称为阻塞的队列。如何让读线程在发现队列是空的之后等待?一种做法是定期轮询队列。但是,因为这种做法不保证队列中有数据可用,它可能会导致浪费大量 CPU 周期。推荐的方法是使用条件变量,即 pthread_cond_t 类型的变量。
 
 代码如下:
template <typename T>
class BlockingQueue 

public: 
    BlockingQueue ( ) 
    { 
        pthread_mutexattr_init(&_attr); 
        // set lock recursive
        pthread_mutexattr_settype(&_attr,PTHREAD_MUTEX_RECURSIVE_NP); 
        pthread_mutex_init(&_lock,&_attr);
        pthread_cond_init(&_cond, NULL);
    } 
    ~BlockingQueue ( ) 
    { 
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    } 
    void push(const T& data);
    bool push(const T& data, const int seconds); //time-out push
    T pop( );
    T pop(const int seconds); // time-out pop
 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
    pthread_mutexattr_t _attr;
    pthread_cond_t _cond;
};
 
template <typename T>
T BlockingQueue<T>::pop( ) 

    pthread_mutex_lock(&_lock);
    while (_list.empty( )) 
    { 
        pthread_cond_wait(&_cond, &_lock) ;
    }
    T _temp = _list.front( );
    _list.pop_front( );
    pthread_mutex_unlock(&_lock);
    return _temp;
}
 
template <typename T>
void BlockingQueue <T>::push(const T& value ) 

    pthread_mutex_lock(&_lock);
    const bool was_empty = _list.empty( );
    _list.push_back(value);
    pthread_mutex_unlock(&_lock);
    if (was_empty) 
        pthread_cond_broadcast(&_cond);
}
 
 
 
并发阻塞队列设计有两个要注意的方面:
 
1.可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 会释放至少一个等待条件变量的线程,这个线程不一定是等待时间最长的读线程。尽管使用 pthread_cond_signal 不会损害阻塞队列的功能,但是这可能会导致某些读线程的等待时间过长。
 
2.可能会出现虚假的线程唤醒。因此,在唤醒读线程之后,要确认列表非空,然后再继续处理。强烈建议使用基于 while 循环的 pop()。
 
设计有超时限制的并发阻塞队列
 
在许多系统中,如果无法在特定的时间段内处理新数据,就根本不处理数据了。例如,新闻频道的自动收报机显示来自金融交易所的实时股票行情,它每 n 秒收到一次新数据。如果在 n 秒内无法处理以前的一些数据,就应该丢弃这些数据并显示最新的信息。根据这个概念,我们来看看如何给并发队列的添加和取出操作增加超时限制。这意味着,如果系统无法在指定的时间限制内执行添加和取出操作,就应该根本不执行操作。
 
 
代码如下:
template <typename T>
bool BlockingQueue <T>::push(const T& data, const int seconds) 
{
    struct timespec ts1, ts2;
    const bool was_empty = _list.empty( );
    clock_gettime(CLOCK_REALTIME, &ts1);
    pthread_mutex_lock(&_lock);
    clock_gettime(CLOCK_REALTIME, &ts2);
    if ((ts2.tv_sec – ts1.tv_sec) <seconds) 
    {
        was_empty = _list.empty( );
        _list.push_back(value);
    }
    pthread_mutex_unlock(&_lock);
    if (was_empty) 
        pthread_cond_broadcast(&_cond);
}
 
template <typename T>
T BlockingQueue <T>::pop(const int seconds) 

    struct timespec ts1, ts2; 
    clock_gettime(CLOCK_REALTIME, &ts1); 
    pthread_mutex_lock(&_lock);
    clock_gettime(CLOCK_REALTIME, &ts2);
 
    // First Check: if time out when get the _lock 
    if ((ts1.tv_sec – ts2.tv_sec) < seconds) 
    { 
        ts2.tv_sec += seconds; // specify wake up time
        while(_list.empty( ) && (result == 0)) 
        { 
            result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ;
        }
        if (result == 0) // Second Check: if time out when timedwait  
        {
            T _temp = _list.front( );
            _list.pop_front( );
            pthread_mutex_unlock(&_lock);
            return _temp;
        }
    }
    pthread_mutex_unlock(&lock);
    throw "timeout happened";
}
 
 
 
设计有大小限制的并发阻塞队列
 
最后,讨论有大小限制的并发阻塞队列。这种队列与并发阻塞队列相似,但是对队列的大小有限制。在许多内存有限的嵌入式系统中,确实需要有大小限制的队列。
对于阻塞队列,只有读线程需要在队列中没有数据时等待。对于有大小限制的阻塞队列,如果队列满了,写线程也需要等待。
 
 代码如下:
template <typename T>
class BoundedBlockingQueue 

public: 
    BoundedBlockingQueue (int size) : maxSize(size) 
    { 
        pthread_mutex_init(&_lock, NULL); 
        pthread_cond_init(&_rcond, NULL);
        pthread_cond_init(&_wcond, NULL);
        _array.reserve(maxSize);
    } 
    ~BoundedBlockingQueue ( ) 
    { 
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_rcond);
        pthread_cond_destroy(&_wcond);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    vector<T> _array; // or T* _array if you so prefer
    int maxSize;
    pthread_mutex_t _lock;
    pthread_cond_t _rcond, _wcond;
};
 
template <typename T>
void BoundedBlockingQueue <T>::push(const T& value ) 

    pthread_mutex_lock(&_lock);
    const bool was_empty = _array.empty( );
    while (_array.size( ) == maxSize) 
    { 
        pthread_cond_wait(&_wcond, &_lock);
    } 
    _array.push_back(value);
    pthread_mutex_unlock(&_lock);
    if (was_empty) 
        pthread_cond_broadcast(&_rcond);
}
 
template <typename T>
T BoundedBlockingQueue<T>::pop( ) 

    pthread_mutex_lock(&_lock);
    const bool was_full = (_array.size( ) == maxSize);
    while(_array.empty( )) 
    { 
        pthread_cond_wait(&_rcond, &_lock) ;
    }
    T _temp = _array.front( );
    _array.erase( _array.begin( ));
    pthread_mutex_unlock(&_lock);
    if (was_full)
        pthread_cond_broadcast(&_wcond);
    return _temp;
}
 
 
 
要注意的第一点是,这个阻塞队列有两个条件变量而不是一个。如果队列满了,写线程等待 _wcond 条件变量;读线程在从队列中取出数据之后需要通知所有线程。同样,如果队列是空的,读线程等待 _rcond 变量,写线程在把数据插入队列中之后向所有线程发送广播消息。如果在发送广播通知时没有线程在等待 _wcond 或 _rcond,会发生什么?什么也不会发生;系统会忽略这些消息。还要注意,两个条件变量使用相同的互斥锁。
 

时间: 2024-12-24 12:47:33

linux中编写并发队列类的相关文章

在Hibernate框架中编写持久对象类实现外键关联的几点注意事项

关系数据库系统本身就比较复杂,加上Hibernate的O/R映射层,复杂度加重了,很容易出现问题,本人将最近遇到的问题和解决方法做一个总结,整理在下面的一系列文章中 正确理解Hibernate的聚合类型(collection)的使用 在Hibernate中正确实现关联关系中的级联操作(cascading) 在Hibernate框架中编写持久对象类实现外键关联的几点注意事项 本文是第三篇,讲解在one-to-many(一对多)关联关系中的对象类的几个关键方法的实现.主要是equals(),hash

《Linux C编程从入门到精通》一第2章 在Linux中编写C语言代码2.1 Linux中C语言程序开发流程和工具介绍

第2章 在Linux中编写C语言代码 Linux C编程从入门到精通 Linux作为一个操作系统,一项重要的功能就是要支持用户编程.传统的UNIX下的程序开发语言是C语言,C语言是一种平台适应性强.易于移植的语言.Linux是用C语言写成的.反过来,Linux又为C语言提供了很好的支持,C语言编译工具gcc.调试工具gdb属于最早开发出来的一批自由软件.因此Linux与C语言形成了完美的结合,为用户提供了一个强大的编程环境,本章将介绍在Linux中编写C语言程序的流程和具体方法. 2.1 Lin

《Linux C编程从入门到精通》——第 2 章 在Linux中编写C语言代码 2.1Linux中C语言程序开发流程和工具介绍

第 2 章 在Linux中编写C语言代码 Linux作为一个操作系统,一项重要的功能就是要支持用户编程.传统的UNIX下的程序开发语言是C语言,C语言是一种平台适应性强.易于移植的语言.Linux是用C语言写成的.反过来,Linux又为C语言提供了很好的支持,C语言编译工具gcc.调试工具gdb属于最早开发出来的一批自由软件.因此Linux与C语言形成了完美的结合,为用户提供了一个强大的编程环境,本章将介绍在Linux中编写C语言程序的流程和具体方法. 2.1 Linux中C语言程序开发流程和工

linux中编写自己的并发队列类(Queue 并发阻塞队列)_linux shell

设计并发队列 复制代码 代码如下: #include <pthread.h>#include <list>using namespace std; template <typename T>class Queue { public:     Queue( )     {         pthread_mutex_init(&_lock, NULL);     }     ~Queue( )     {         pthread_mutex_destroy

Linux下设计并发队列

设计并发队列 #include <pthread.h> #include <list> using namespace std; template <typename T> class Queue { public: Queue( ) { pthread_mutex_init(&_lock, NULL); } ~Queue( ) { pthread_mutex_destroy(&_lock); } void push(const T& data)

linux中Shell并发编程示例

在Python中,有很多模块都可以实现并发编程,比如 threading, processing, eventlet 与 Stackless Python 等. 那么对于Shell而言,又如何实现呢?其实原理很简单,我采用的方法是: 1. 将需要执行的任务分批放入后台执行: 2. 将后台执行的命令结果汇总到指定的文件中; 3. 使用wait命令来等待所有任务执行结束. 下面的脚本就用到了这样的并发编程方法,实现的功能是: 快速(3-4秒内)对相同C网内的所有IP(255个)通过命令ping进行测

Linux进程间通信——使用消息队列

下面来说说如何用不用消息队列来进行进程间的通信,消息队列与命名管道有很多相似之处.有关命名管道的更多内容可以参阅我的另一篇文章:Linux进程间通信--使用命名管道   一.什么是消息队列 消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法.  每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构.我们可以通过发送消息来避免命名管道的同步和阻塞问题.但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制.   Linux用宏MSGMAX和MSGMNB来限制

Linux中gcc g++常用编译选项以及makefile的编写

Linux中gcc,g++常用编译选项 -x language filename 设定文件所使用的语言,使后缀名无效,对以后的多个有效.也就是根据约定,C语言的后缀名称是.c的,而C++的后缀名是.C或者.cpp,如果你很个性,决定你的C代码文件的后缀名是. pig 哈哈,那你就要用这个参数,这个参数对他后面的文件名都起作用,除非到了下一个参数的使用.可以使用的参数有下面的这些: `c', `objective-c', `c-header', `c++', `cpp-output', `asse

操作系统-mint17写类新的linux中没有sys.c吗

问题描述 mint17写类新的linux中没有sys.c吗 新版的linux中都没有sys.c文件吗?比如mint17.没有的话如果想要自己写一个系统调用怎么办?? 解决方案 linux编写系统调用,