C++11 生产者消费者

下面是一个生产者消费者问题,来介绍condition_variable的用法。当线程间的共享数据发生变化的时候,可以通过condition_variable来通知其他的线程。消费者wait 直到生产者通知其状态发生改变,Condition_variable是使用方法如下:

·当持有锁之后,线程调用wait

·wait解开持有的互斥锁(mutex),阻塞本线程,并将自己加入到唤醒队列中

·当收到通知(notification),该线程从阻塞中恢复,并加入互斥锁队列(mutex queue)

 线程被唤醒之后继续持有锁运行。

 

Condition variable有两种类型:condition_variable 和 condition_variable_any,前一种效率更高,但是使用不够灵活,只支持std::unique_lock<std::mutex>类型的互斥锁;后一种比较灵活,支持所有类型的锁,但是效率稍微低一些。

有一点需要注意的是使用condition variable进行通信的线程,condition variable 需要使用相同的互斥信号量(mutex)。

下面来看例子:(当按下回车键之后停止)

#include <thread>

#include <iostream>

#include <mutex>

#include <queue>

#include <condition_variable>

#include <atomic>

using namespace std;

int main()
{

    mutex lockBuffer; //申明互斥信号量

    volatile bool ArretDemande = false; //使生产、消费过程的结束

    queue<long> buffer;       

    condition_variable_any cndNotifierConsommateurs;//condition variable

    condition_variable_any cndNotifierProducteur;   

    thread ThreadProducteur([&]()//生产者线程
    {

        std::atomic<long> interlock;//对interlock的操作将是原子的

        interlock=1;   

        while(true)
        {               

                std::this_thread::sleep_for (chrono::milliseconds (15));               

                long element=interlock.fetch_add (1);//【1】

                lockBuffer.lock ();

                while(buffer.size()==10 && ArretDemande ==false)
                {

                    cndNotifierProducteur.wait (lockBuffer);//【2】

                }

                if (ArretDemande==true)

                {                   

                    lockBuffer.unlock ();

                    cndNotifierConsommateurs.notify_one ();//【3】

                    break;

                }

                buffer.push(element);

                cout << "Production unlement :" << element << " size :" << buffer.size() << endl;

                lockBuffer.unlock ();

                cndNotifierConsommateurs.notify_one ();

        }

    } );
    thread ThreadConsommateur([&]()
    {

        while(true)
            {

                lockBuffer.lock ();

                while(buffer.empty () && ArretDemande==false)

                {                   

                    cndNotifierConsommateurs.wait(lockBuffer);

                }

                if (ArretDemande==true && buffer.empty ())

                {

                    lockBuffer.unlock();

                    cndNotifierProducteur.notify_one ();

                    break;

                }

                long element=buffer.front();

                buffer.pop ();

                cout << "Consommation element :" << element << " size :" << buffer.size() << endl;

                lockBuffer.unlock ();

                cndNotifierProducteur.notify_one ();

            }           

    } );

    std::cout << "Pour arreter pressez [ENTREZ]" << std::endl;

    getchar();

    std::cout << "Arret demande" << endl
    ArretDemande=true;

    ThreadProducteur.join();
    ThreadConsommateur.join();

    cout<<"Main Thread"<<endl;

    return 0;

}

 

运行结果:

对程序进行一下说明,程序中有三个线程,主线程、生产者线程、消费者线程,三个线程之间乱序执行,通过一些全局变量来控制他们的执行顺序。主线程的作用是控制生产消费过程是否结束,当程序运行之后,主线程通过getchar()接收一个输入,接收到输入后会将ArretDemande设置为true,另外两个线程会终止。生产者线程将生产出来的数据放在一个queue类型的buffer中,并解锁,通知消费之线程,buffer中最多“能”存10个数据,如果buffer中已经有10个数据还没有被取走,则会通知消费者线程“消费”,如果ArretDmande被置位,则打开锁,并通知消费之线程。消费者线程主要是将buffer中的数据取出来,当buffer为空的时候阻塞自己,并通知生产者线程,当ArretDemande被置位,且已经消费完产品则解锁,并通知生产者线程。需要注意的是需要通信的生产者和消费者这两个线程通过condition variable来实现通信,必须操作同一个mutex,这里是lockbuffer,并且每次Notify都会打开当前锁。

程序中对interlock进行的操作是原子的,interlock.fet_add(N),效果是将interlock加N,然后返回interlock在加N之前的值,atomic类型是通过一定的内存顺序规则来实现这个过程的。

虽然conditon_variable 只能支持std::unique_lock<std::mutex>类型的互斥锁,但是在大部分情况下已经够用,而且使用std::unique_lock<std::mutex>会比较简单,因为std::unique_lock<std::mutex>在声明的时候就会初始化,在生命周期结束之后就会自动解锁,因此我们不用太花精力来考虑什么时候解锁。我们来看看下面这段程序:

#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
#include <queue>
#include <chrono>

int main()
{
    std::queue<int> produced_nums;
    std::mutex m;;
    std::condition_variable cond_var;
    bool done = false;
    bool notified = false;

    std::thread producer([&]() {
        for ( int i = 0; i < 5; ++i) {
            std::this_thread::sleep_for(std::chrono:: seconds(1));
            std:: unique_lock<std::mutex > lock(m);  //May lock mutex after construction, unlock before destruction.
            std::cout << "producing " << i << '\n' ;
            produced_nums.push(i);
            notified = true;        cond_var.notify_one();
        }  

        done = true;
        cond_var.notify_one();
    });
    //cond_var.notify_one();
    std::thread consumer([&]() {
        while (!done) {
            std:: unique_lock<std::mutex > lock(m);
            while (!notified) {  // loop to avoid spurious wakeups
                cond_var.wait(lock);
            }
            while (!produced_nums.empty()) {
                std::cout << "consuming " << produced_nums.front() << '\n';
                produced_nums.pop();
            }
            notified = false;
        }
    });

    producer.join();
    consumer.join();

        return 0;
}

 运行结果:

C:\Windows\system32\cmd.exe /c producer_consumer.exe
producing 0
consuming 0
producing 1
consuming 1
producing 2
consuming 2
producing 3
consuming 3
producing 4
consuming 4
Hit any key to close this window...

更新:2012年8月4日16:53:25

make it simple, make it happen

时间: 2024-12-30 09:32:55

C++11 生产者消费者的相关文章

使用阻塞队列实现生产者-消费者模型

1.生产者-消费者问题 生产者消费者问题也称作有界缓冲区(bounded-buffer)问题, 是操作系统中一个经典的线程同步问题,问题描述如下: 生产者在生产产品提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区,生产者将它生产的产品放入缓冲区中,消费者可以从缓冲区中取走产品进行消费,两个进程共享一个公共的固定大小的缓冲区. 显然生产者和消费者之间必须保持同步,即不允许消费者到一个空的缓冲区中取产品,也不允许生产者向一个已经放入产品的缓冲区中再次投放产品

【Python之旅】第六篇(五):生产者消费者模型实现多线程异步交互

 虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应该还包括Python的消息队列,因为这里多线程异步交互是通过Python的消息队列来实现的,因此主要内容如下: 1 2 3 4 1.生产者消费者模型:厨师做包子与顾客吃包子 2.Python的消息队列 3.利用消息队列实现Python多线程异步交互 4.再谈耦合度的问题 1.生产者消费者模型     通过厨师做包子与顾客吃包子来引出生产者消费者模型,如下图:     这里,厨师相当于生产者,顾客相当于消费者,顾客吃包子,

聊聊并发(十)生产者消费者模式

本文首发于InfoQ   作者:方腾飞  校对:张龙 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这种生产消费能力

Linux线程编程之生产者消费者问题

前言 本文基于顺序循环队列,给出Linux生产者/消费者问题的多线程示例,并讨论编程时需要注意的事项.文中涉及的代码运行环境如下: 本文假定读者已具备线程同步的基础知识. 一  顺序表循环队列 1.1 顺序循环队列定义 队列是一种运算受限的先进先出线性表,仅允许在队尾插入(入队),在队首删除(出队).新元素入队后成为新的队尾元素,元素出队后其后继元素就成为队首元素. 队列的顺序存储结构使用一个数组和两个整型变量实现,其结构如下: 1 struct Queue{ 2     ElemType el

并发编程(二):分析Boost对 互斥量和条件变量的封装及实现生产者消费者问题

请阅读上篇文章<并发编程实战: POSIX 使用互斥量和条件变量实现生产者/消费者问题>.当然不阅读亦不影响本篇文章的阅读. Boost的互斥量,条件变量做了很好的封装,因此比"原生的"POSIX mutex,condition variables好用.然后我们会通过分析boost相关源码看一下boost linux是如何对pthread_mutex_t和pthread_cond_t进行的封装. 首先看一下condition_variable_any的具体实现,代码路径:/

Linux C实现生产者消费者问题

//信号量---线程间通信 //"生产者消费者" 问题 #include<stdio.h> #include<stdlib.h> #include<unistd.h> #include<semaphore.h> #include<pthread.h> #define msleep(x) usleep(x*1000) #define PRODUCT_SPEED 3 //生产速度 #define CONSUM_SPEED 1 //

聊聊并发:生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题于是引入了生产者和消费者模式. 什么是生产者消费者模式 生产者

Java线程:并发协作-生产者消费者模型

实际上,准确说应该是"生产者-消费者-仓储"模型,离开了仓储,生产者消费者模型就显得没有说服力了. 对于此模型,应该明确一下几点: 1.生产者仅仅在仓储未满时候生产,仓满则停止生产. 2.消费者仅仅在仓储有产品时候才能消费,仓空则等待. 3.当消费者发现仓储没产品可消费时候会通知生产者生产. 4.生产者在生产出可消费产品时候,应该通知等待的消费者去消费. 此模型将要结合java.lang.Object的wait与notify.notifyAll方法来实现以上的需求.这是非常重要的. /

并发编程(一): POSIX 使用互斥量和条件变量实现生产者/消费者问题

    boost的mutex,condition_variable非常好用.但是在Linux上,boost实际上做的是对pthread_mutex_t和pthread_cond_t的一系列的封装.因此通过对原生态的POSIX 的mutex,cond的生成者,消费者的实现,我们可以再次体会boost带给我们的便利. 1. 什么是互斥量        互斥量从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁.对互斥量进行加锁以后,任何其他试图再次对互斥量加锁的线程将