线程安全的环形缓冲区实现

来源:http://blog.csdn.net/lezhiyong
    应用背景:线程1将每次数量不一的音频采样点(PCM音频数据)写入环形缓冲区,线程2每次取固定数量采样点送音频编码器,线程1线程2在平均时间内的读写数据量相等。(倒入桶中的水量有时大有时小,但每次取一瓢喝:)
   该环形缓冲区借鉴CoolPlayer音频播放器中的环形缓冲区代码实现,在读写操作函数中加了锁,允许多线程同时操作。CPs_CircleBuffer基于内存段的读写,比用模板实现的环形缓冲队列适用的数据类型更广些, CPs_CircleBuffer修改成C++中基于对象的实现,加上详细注释,m_csCircleBuffer锁变量为自用的lock类型(将CRITICAL_SECTION封装起来),调用lock()加锁,调用unlock()解锁。使用效果良好,分享出来。

CPs_CircleBuffer环形缓冲还不具备当待写数据量超出空余缓冲时自动分配内存的功能,这个将在后续进行优化。

CPs_CircleBuffer使用步骤:

 

[cpp] view plaincopy

  1. 1、创建对象  
  2. CPs_CircleBuffer* m_pCircleBuffer;  
  3. m_pCircleBuffer = new CPs_CircleBuffer(bufsize);  
  4. 2、写  
  5. if (m_pCircleBuffer->GetFreeSize() < CIC_READCHUNKSIZE)  
  6.  {  
  7.      Sleep(20);  
  8.      continue;  
  9.  }  
  10. m_pCircleBuffer->Write(internetbuffer.lpvBuffer,internetbuffer.dwBufferLength);  
  11. 3、读  
  12. m_pCircleBuffer->Read(pDestBuffer,iBytesToRead, piBytesRead);  
  13.    
  14. 4、其他调用  
  15. if(m_pCircleBuffer->IsComplete())  
  16.     break;          
  17. iUsedSpace =m_pCircleBuffer->GetUsedSize();  
  18. m_pCircleBuffer->SetComplete();  

 

CPs_CircleBuffer修改为类的定义:

[cpp] view plaincopy

  1. class  CPs_CircleBuffer  
  2. {  
  3. public:  
  4.        CPs_CircleBuffer(const unsigned int iBufferSize);  
  5.        ~CPs_CircleBuffer();  
  6. public:  
  7.         // Public functions  
  8.         void  Uninitialise();  
  9.         void  Write(const void* pSourceBuffer, const unsigned int iNumBytes);  
  10.         bool  Read(void* pDestBuffer, const size_t iBytesToRead, size_t* pbBytesRead);  
  11.         void  Flush();  
  12.         unsigned int GetUsedSize();  
  13.         unsigned int GetFreeSize();  
  14.         void  SetComplete();  
  15.         bool  IsComplete();  
  16.   
  17. private:         
  18.         unsigned char*  m_pBuffer;  
  19.         unsigned int    m_iBufferSize;  
  20.         unsigned int    m_iReadCursor;  
  21.         unsigned int    m_iWriteCursor;  
  22.         HANDLE          m_evtDataAvailable;  
  23.         Vlock           m_csCircleBuffer;  
  24.         bool            m_bComplete;        
  25. };  

CPs_CircleBuffer修改为类的实现:

[cpp] view plaincopy

  1. #define CIC_WAITTIMEOUT  3000  
  2.   
  3. CPs_CircleBuffer::CPs_CircleBuffer(const unsigned int iBufferSize)  
  4. {  
  5.     m_iBufferSize = iBufferSize;  
  6.     m_pBuffer = (unsigned char*)malloc(iBufferSize);  
  7.     m_iReadCursor = 0;  
  8.     m_iWriteCursor = 0;  
  9.     m_bComplete = false;  
  10.     m_evtDataAvailable = CreateEvent(NULL, FALSE, FALSE, NULL);  
  11. }  
  12.   
  13. CPs_CircleBuffer::~CPs_CircleBuffer()  
  14. {  
  15.     Uninitialise();  
  16. }  
  17.   
  18. // Public functions  
  19. void CPs_CircleBuffer::Uninitialise()//没有必要public这个接口函数,long120817  
  20. {  
  21.     CloseHandle(m_evtDataAvailable);  
  22.     free(m_pBuffer);  
  23. }  
  24.   
  25. //Write前一定要调用m_pCircleBuffer->GetFreeSize(),如果FreeSize不够需要等待,long120817  
  26.   
  27. void  CPs_CircleBuffer::Write(const void* _pSourceBuffer, const unsigned int _iNumBytes)  
  28. {  
  29.     unsigned int iBytesToWrite = _iNumBytes;  
  30.     unsigned char* pSourceReadCursor = (unsigned char*)_pSourceBuffer;  
  31.   
  32.     //CP_ASSERT(iBytesToWrite <= GetFreeSize());//修改为没有足够空间就返回,write前一定要加GetFreeSize判断,否则进入到这里相当于丢掉数据,         // long120817  
  33.     if (iBytesToWrite > GetFreeSize())  
  34.     {  
  35.         return;  
  36.     }  
  37.     _ASSERT(m_bComplete == false);  
  38.   
  39.     m_csCircleBuffer.Lock();  
  40.   
  41.     if (m_iWriteCursor >= m_iReadCursor)  
  42.     {  
  43.         //              0                                            m_iBufferSize  
  44.         //              |-----------------|===========|--------------|  
  45.         //                                pR->        pW->   
  46.         // 计算尾部可写空间iChunkSize,long120817  
  47.         unsigned int iChunkSize = m_iBufferSize - m_iWriteCursor;  
  48.   
  49.         if (iChunkSize > iBytesToWrite)  
  50.         {  
  51.             iChunkSize = iBytesToWrite;  
  52.         }  
  53.   
  54.         // Copy the data  
  55.         memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iChunkSize);  
  56.   
  57.         pSourceReadCursor += iChunkSize;  
  58.   
  59.         iBytesToWrite -= iChunkSize;  
  60.   
  61.         // 更新m_iWriteCursor  
  62.         m_iWriteCursor += iChunkSize;  
  63.   
  64.         if (m_iWriteCursor >= m_iBufferSize)//如果m_iWriteCursor已经到达末尾  
  65.             m_iWriteCursor -= m_iBufferSize;//返回到起点0位置,long120817  
  66.   
  67.     }  
  68.   
  69.     //剩余数据从Buffer起始位置开始写  
  70.     if (iBytesToWrite)  
  71.     {  
  72.         memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iBytesToWrite);  
  73.         m_iWriteCursor += iBytesToWrite;  
  74.         _ASSERT(m_iWriteCursor < m_iBufferSize);//这个断言没什么意思,应该_ASSERT(m_iWriteCursor <= m_iReadCursor);long20120817  
  75.     }  
  76.   
  77.     SetEvent(m_evtDataAvailable);//设置数据写好信号量  
  78.   
  79.     m_csCircleBuffer.UnLock();  
  80. }  
  81.   
  82. bool  CPs_CircleBuffer::Read(void* pDestBuffer, const size_t _iBytesToRead, size_t* pbBytesRead)  
  83. {  
  84.     size_t iBytesToRead = _iBytesToRead;  
  85.     size_t iBytesRead = 0;  
  86.     DWORD dwWaitResult;  
  87.     bool bComplete = false;  
  88.   
  89.     while (iBytesToRead > 0 && bComplete == false)  
  90.     {  
  91.         dwWaitResult = WaitForSingleObject(m_evtDataAvailable, CIC_WAITTIMEOUT);//等待数据写好,long120817  
  92.   
  93.         if (dwWaitResult == WAIT_TIMEOUT)  
  94.         {  
  95.             //TRACE_INFO2("Circle buffer - did not fill in time!");  
  96.             *pbBytesRead = iBytesRead;  
  97.             return FALSE;//等待超时则返回  
  98.         }  
  99.   
  100.         m_csCircleBuffer.Lock();  
  101.   
  102.         if (m_iReadCursor > m_iWriteCursor)  
  103.         {  
  104.             //              0                                                    m_iBufferSize  
  105.             //              |=================|-----|===========================|  
  106.             //                                pW->  pR->   
  107.             unsigned int iChunkSize = m_iBufferSize - m_iReadCursor;  
  108.   
  109.             if (iChunkSize > iBytesToRead)  
  110.                 iChunkSize = (unsigned int)iBytesToRead;  
  111.   
  112.             //读取操作  
  113.             memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);  
  114.   
  115.             iBytesRead += iChunkSize;  
  116.             iBytesToRead -= iChunkSize;  
  117.   
  118.             m_iReadCursor += iChunkSize;  
  119.   
  120.             if (m_iReadCursor >= m_iBufferSize)//如果m_iReadCursor已经到达末尾  
  121.                 m_iReadCursor -= m_iBufferSize;//返回到起点0位置,long120817  
  122.         }  
  123.   
  124.         if (iBytesToRead && m_iReadCursor < m_iWriteCursor)  
  125.         {  
  126.             unsigned int iChunkSize = m_iWriteCursor - m_iReadCursor;  
  127.   
  128.             if (iChunkSize > iBytesToRead)  
  129.                 iChunkSize = (unsigned int)iBytesToRead;  
  130.   
  131.             //读取操作  
  132.             memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);  
  133.   
  134.             iBytesRead += iChunkSize;  
  135.             iBytesToRead -= iChunkSize;  
  136.             m_iReadCursor += iChunkSize;  
  137.         }  
  138.   
  139.         //如果有更多的数据要写  
  140.         if (m_iReadCursor == m_iWriteCursor)  
  141.         {  
  142.             if (m_bComplete)//跳出下一个while循环,该值通过SetComplete()设置,此逻辑什么意思?long120817  
  143.                 bComplete = true;  
  144.         }  
  145.         else//还有数据可以读,SetEvent,在下一个while循环开始可以不用再等待,long120817  
  146.             SetEvent(m_evtDataAvailable);  
  147.   
  148.         m_csCircleBuffer.UnLock();  
  149.     }  
  150.   
  151.     *pbBytesRead = iBytesRead;  
  152.   
  153.     return bComplete ? false : true;  
  154.   
  155. }  
  156. //  0                                                m_iBufferSize  
  157. //  |------------------------------------------------|  
  158. //  pR  
  159. //  pW  
  160. //读写指针归零  
  161. void  CPs_CircleBuffer::Flush()  
  162. {  
  163.     m_csCircleBuffer.Lock();  
  164.     m_iReadCursor = 0;  
  165.     m_iWriteCursor = 0;  
  166.     m_csCircleBuffer.UnLock();  
  167.   
  168. }  
  169. //获取已经写的内存  
  170. unsigned int CPs_CircleBuffer::GetUsedSize()  
  171. {  
  172.      return m_iBufferSize - GetFreeSize();  
  173.   
  174. }  
  175.   
  176.   
  177. unsigned int CPs_CircleBuffer::GetFreeSize()  
  178. {  
  179.     unsigned int iNumBytesFree;  
  180.   
  181.     m_csCircleBuffer.Lock();  
  182.   
  183.     if (m_iWriteCursor < m_iReadCursor)  
  184.     {  
  185.         //              0                                                    m_iBufferSize  
  186.         //              |=================|-----|===========================|  
  187.         //                                pW->  pR->   
  188.         iNumBytesFree = (m_iReadCursor - 1) - m_iWriteCursor;  
  189.     }  
  190.     else if (m_iWriteCursor == m_iReadCursor)  
  191.     {  
  192.         iNumBytesFree = m_iBufferSize;  
  193.     }  
  194.     else  
  195.     {  
  196.         //              0                                                    m_iBufferSize  
  197.         //              |-----------------|=====|---------------------------|  
  198.         //                                pR->   pW->   
  199.         iNumBytesFree = (m_iReadCursor - 1) + (m_iBufferSize - m_iWriteCursor);  
  200.     }  
  201.   
  202.     m_csCircleBuffer.UnLock();  
  203.   
  204.     return iNumBytesFree;  
  205.   
  206. }  
  207. //该函数什么时候调用?long120817  
  208. void  CPs_CircleBuffer::SetComplete()  
  209. {  
  210.     m_csCircleBuffer.Lock();  
  211.     m_bComplete = true;  
  212.     SetEvent(m_evtDataAvailable);  
  213.     m_csCircleBuffer.UnLock();  
  214. }  

 

附自动初始化和摧毁的锁对象Vlock的实现:

[cpp] view plaincopy

    1. #ifdef WIN32  
    2. #include <windows.h>  
    3.   
    4. #define  V_MUTEX            CRITICAL_SECTION //利用临界区实现的锁变量  
    5. #define  V_MUTEX_INIT(m)        InitializeCriticalSection(m)  
    6. #define  V_MUTEX_LOCK(m)        EnterCriticalSection(m)  
    7. #define  V_MUTEX_UNLOCK(m)      LeaveCriticalSection(m)  
    8. #define  V_MUTEX_DESTORY(m)     DeleteCriticalSection(m)  
    9.   
    10. #else  
    11.   
    12. #define  V_MUTEX                pthread_mutex_t  
    13. #define  V_MUTEX_INIT(m)        pthread_mutex_init(m,NULL)  
    14. #define  V_MUTEX_LOCK(m)        pthread_mutex_Lock(m)  
    15. #define  V_MUTEX_UNLOCK(m)      pthread_mutex_unLock(m)  
    16. #define  V_MUTEX_DESTORY(m)     pthread_mutex_destroy(m)  
    17.   
    18. #endif  
    19.   
    20.   
    21. class  Vlock  
    22. {  
    23. public:  
    24.     Vlock(void)  
    25.     {  
    26.         V_MUTEX_INIT(&m_Lock);  
    27.     }  
    28.     ~Vlock(void)  
    29.     {  
    30.         V_MUTEX_DESTORY(&m_Lock);  
    31.     }  
    32. public:  
    33.     void Lock(){V_MUTEX_LOCK(&m_Lock);}  
    34.     void UnLock(){V_MUTEX_UNLOCK(&m_Lock);}  
    35. private:  
    36.     V_MUTEX m_Lock;  
    37. };  
时间: 2024-10-02 08:02:05

线程安全的环形缓冲区实现的相关文章

服务器公共组件实现 -- 环形缓冲区

消息队列锁调用太频繁的问题算是解决了,另一个让人有些苦恼的大概是这太多的内存分配和释放操作了.频繁的内存分配不但增加了系统开销,更使得内存碎片不断增多,非常不利于我们的服务器长期稳定运行.也许我们可以使用内存池,比如SGI STL中附带的小内存分配器.但是对于这种按照严格的先进先出顺序处理的,块大小并不算小的,而且块大小也并不统一的内存分配情况来说,更多使用的是一种叫做环形缓冲区的方案,mangos的网络代码中也有这么一个东西,其原理也是比较简单的. 就好比两个人围着一张圆形的桌子在追逐,跑的人

嵌入式 环形缓冲区的设计与实现

环形缓冲区是嵌入式系统中十分重要的一种数据结构,比如在一个视频处理的机制中,环形缓冲区就可以理解为数据码流的通道,每一个通道都对应着一个环形缓冲区,这样数据在读取和写入的时候都可以在这个缓冲区里循环进行,程序员可以根据自己需要的数据大小来决定自己使用的缓冲区大小.          环形缓冲区,顾名思义这个缓冲区是环形的,那么何谓环形这个意思也很好理解,就是用一个指针去访问该缓冲区的最后一个内存位置的的后一位置时回到环形缓冲区的起点.类似一个环一样.这样形容就很好理解了,当然有办法实现了.我在这

C 语言中实现环形缓冲区_C 语言

1.实现代码: #include #include #include #include #include #define BUFFSIZE 1024 * 1024 #define min(x, y) ((x) < (y) ? (x) : (y)) pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; struct cycle_buffer { unsigned char *buf; unsigned int size; unsigned int in

C#环形缓冲区(队列)完全实现_C#教程

公司项目中经常设计到串口通信,TCP通信,而且大多都是实时的大数据的传输,然后大家都知道协议通讯肯定涉及到什么,封包.拆包.粘包.校验--什么鬼的概念一大堆,说简单点儿就是要一个高效率可复用的缓存区.按照码农的惯性思维就是去百度.谷歌搜索看有没有现成的东西可以直接拿来用,然而我并没有找到,好吧不是很难的东西自己实现一个呗.开扯--  为什么要用环形队列?环形队列是在实际编程极为有用的数据结构,它有如下特点: 它是一个首尾相连的FIFO的数据结构,采用数组的线性空间,数据组织简单.能很快知道队列是

详细解读Java的串口编程_java

常见问题 JavaComm 和 RxTX 安装时有一些与众不同的地方.强烈建议按照安装说明一点点的安装.如果安装说明要求一个jar文件或一个共享库必须在某一特定的文件夹下,那这就意味着需要严肃对待.如果说明要求一个特定的文件或设备需要拥有一个特定的所有权或访问权,这也意味着需要严肃处理.很多安装问题都只是因为没有按照安装说明要求的去做而引起的. 特别要注意的是一些版本的JavaComm会带有两个安装说明.一个用于java 1.2及以后的版本,一个用于java 1.1版本.使用错误的安装说明会导致

理解 Memory barrier(内存屏障)【转】

转自:http://name5566.com/4535.html 参考文献列表:http://en.wikipedia.org/wiki/Memory_barrierhttp://en.wikipedia.org/wiki/Out-of-order_executionhttps://www.kernel.org/doc/Documentation/memory-barriers.txt 本文例子均在 Linux(g++)下验证通过,CPU 为 X86-64 处理器架构.所有罗列的 Linux 内

进程、线程知识点总结和同步(消费者生产者,读者写者三类问题)、互斥、异步、并发、并行、死锁、活锁的总结

进程和程序: 进程:是个动态的概念,指的是一个静态的程序对某个数据集的一次运行活动,而程序是静态的概念,是由代码和数据组成的程序块而已. 进程5大特点:动态性,并发性,独立运行性,异步性,和结构化的特性. 在多道程序环境下,程序不能独立运行,操作系统所有的特征都是基于进程而体现的,只有进程可以在系统中运行,程序运行必须有进程才行.进程是操作系统里资源分配的基本单位,也是独立运行的基本单位,具有动态的特点,暂时出现的特点,而且一个进程产生之后,可以再次生成多个进程出来.也可以多个进程并发执行,也可

线程及 进程间的通信问题! .

一个很好的编程随想的博客http://program-think.blogspot.com/2009/03/producer-consumer-pattern-0-overview.html 架构设计:生产者/消费者模式[0]:概述  1.如何确定数据单元2.队列缓冲区3.环形缓冲区4.双缓冲区 生产 消费 2010-06-01 10:13   #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp>

logcat命令使用方法和查看android系统日志缓冲区内容的方法_Android

*注:可以用 adb logcat > 路径/文件名 来保存,此命令执行之时起的全部日志信息到一个文件里,ctrl + C 结束日志输出:后面不加 > 路径/文件名 的话,则在 stdout (终端窗口)中输出!例如:$ adb logcat -v long Checkin *:S > ~/桌面/log.txt 一.在 Java 与 C 语言中输出日志:1) Java 代码在程序中输出日志, 使用 android.util.Log 类的以下 5 个方法:   Log.v().Log.d(