Qt之线程同步(生产者消费者模式 - QWaitCondition)

简述

生产者将数据写入缓冲区,直到它到达缓冲区的末尾,这时,它从开始位置重新启动,覆盖现有数据。消费者线程读取数据并将其写入标准错误。

Wait condition(等待条件)比单独使用 mutex(互斥量)有一个更高级的并发性,如果缓冲区的访问由一个 QMutex 把守,当生产者线程访问缓冲区时,消费者线程将无法访问。然而,两个线程同时访问不同的缓冲区是没有害处的。

示例包含两个类:Producer 和 Consumer,均继承自 QThread。循环缓冲区用于两个类之间的沟通,同步工具用于保护全局变量。

另一种“生产者 - 消费者”模式的方案是使用 QSemaphore - Qt之线程同步(生产者消费者模式 - QSemaphore)

  • 简述
  • 全局变量
  • Producer
  • Consumer
  • main 函数
  • 更多参考

全局变量

首先,一起来看循环缓冲区和相关的同步工具:

const int DataSize = 100000;

const int BufferSize = 8192;
char buffer[BufferSize];

QWaitCondition bufferNotEmpty;
QWaitCondition bufferNotFull;
QMutex mutex;
int numUsedBytes = 0;

DataSize 是生产者将生成的数据数量,为了让示例尽可能地简单,把它定义为一个常数。BufferSize 是循环缓冲区的大小,小于 DataSize,这意味着在某一时刻生产者将达到缓冲区的末尾,并从开始位置重新启动。

要同步生产者和消费者,需要两个 wait 条件和一个 mutex。当生产者生成一些数据时,bufferNotEmpty 条件被发射,告诉消费者可以读取它了;当消费者读取一些数据时,bufferNotFull 条件被发射,告诉生产者生成更多的数据。numUsedBytes 为缓冲区中所包含数据的字节数。

总之,wait 条件、mutex、和 numUsedBytes 计数器确保生产者不会先于消费者超过 BufferSize 的大小,而消费者永远不会读取生产者尚未生成的数据。

Producer

Producer 类的代码如下:

class Producer : public QThread
{
public:
    Producer(QObject *parent = NULL) : QThread(parent)
    {
    }

    void run() Q_DECL_OVERRIDE
    {
        qsrand(QTime(0,0,0).secsTo(QTime::currentTime()));

        for (int i = 0; i < DataSize; ++i) {
            mutex.lock();
            if (numUsedBytes == BufferSize)
                bufferNotFull.wait(&mutex);
            mutex.unlock();

            buffer[i % BufferSize] = "ACGT"[(int)qrand() % 4];

            mutex.lock();
            ++numUsedBytes;
            bufferNotEmpty.wakeAll();
            mutex.unlock();
        }
    }
};

生产者生成 DataSize 字节的数据。在往循环缓冲区写入一个字节之前,它必须先检测缓冲区是否已满(例如,numUsedBytes 等于 BufferSize),如果缓冲区满了,线程就会在 bufferNotFull 条件上等待。

最后,生产者增加 numUsedBytes,并且标志 bufferNotEmpty 条件为 true,从而 numUsedBytes 必然大于 0,

我们使用一个 mutex 保护 numUsedBytes 变量的所有访问。此外,QWaitCondition::wait() 函数接受一个 mutex 作为其参数。当线程被置于休眠状态之前,该 mutex 被解锁;当线程被唤醒,该 mutex 被锁定。此外,为了防止竞争条件发生,从锁定状态到等待状态的过渡具有原子性。

Consumer

现在转向 Consumer 类:

class Consumer : public QThread
{
    Q_OBJECT
public:
    Consumer(QObject *parent = NULL) : QThread(parent)
    {
    }

    void run() Q_DECL_OVERRIDE
    {
        for (int i = 0; i < DataSize; ++i) {
            mutex.lock();
            if (numUsedBytes == 0)
                bufferNotEmpty.wait(&mutex);
            mutex.unlock();

            fprintf(stderr, "%c", buffer[i % BufferSize]);

            mutex.lock();
            --numUsedBytes;
            bufferNotFull.wakeAll();
            mutex.unlock();
        }
        fprintf(stderr, "\n");
    }

signals:
    void stringConsumed(const QString &text);
};

代码非常类似于生产者,在读取字节之前,需要先检查缓冲区是否为空(numUsedBytes 为 0),而非它是否为已满。并且,当它为空时,等待 bufferNotEmpty 条件。在读取字节后,减小 numUsedBytes (而非增加),并标志 bufferNotFull 条件(而非 bufferNotEmpty 条件)。

main() 函数

在 main() 函数中,我们创建两个线程,并调用 QThread::wait(),以确保在退出之前,这两个线程有时间完成。

int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return 0;
}

当运行这个程序时,会发生什么呢?

最初,生产者是唯一一个可以做任何事情的线程,消费者阻塞并等待 bufferNotEmpty 条件被发射(numUsedBytes 是 0)。一旦生产者把一个字节放入缓冲区,numUsedBytes 就会变为 BufferSize - 1,并且 bufferNotEmpty 条件被发射。这时,可能发生两件事:要么消费者线程接管和读取字节,要么生产者开始生成第二个字节。

此示例中提出的“生产者 - 消费者”模式,适用于编写高并发多线程应用。在多处理器计算机中,程序可能比基于 mutex 的方案快达两倍之多,因为两个线程可以同一时间在缓冲区的不同部分处于激活状态。

要知道,这些好处并不总能实现,加锁和解锁一个 QMutex 是需要成本的。在实践中,可能需要把缓冲区分为块,并针对块操作而非单个字节。缓冲区的大小也是一个必须仔细选择的参数,需要基于实验。

更多参考

时间: 2024-11-09 00:18:55

Qt之线程同步(生产者消费者模式 - QWaitCondition)的相关文章

Qt之线程同步(生产者消费者模式 - QSemaphore)

简述 生产者将数据写入缓冲区,直到它到达缓冲区的末尾,此时,它将从开始位置重新启动,覆盖现有数据.消费者线程读取数据并将其写入标准错误. Semaphore(信号量) 比 mutex(互斥量)有一个更高级的并发性.如果缓冲区的访问由一个 QMutex 把守,当生产者线程访问缓冲区时,消费者线程将无法访问.然而,有两个线程同一时间访问不同的缓冲区是没有害处的. 示例包括两个类:Producer 和 Consumer,均继承自 QThread.循环缓冲区用于这两个类之间的沟通,信号量用于保护全局变量

Qt之线程同步

简述 使用线程的目的是允许代码并行运行,但是有时线程必须停止并等待其他线程.例如,如果两个线程试图同时写入相同的变量,结果是未知的. 迫使线程等待另一个的原则被称为互斥 . 这是一种保护共享资源等数据的常见的技术. 简述 低级同步原语 风险 便利类 高级事件队列 低级同步原语 QMutex 是强制执行互斥的基本类.一个线程锁定一个互斥量(mutex),以获得共享资源的访问权限.如果另一个线程试图锁定互斥量,而互斥量已被锁定,这时,它将进入睡眠状态,直到第一个线程完成其任务并解锁互斥量. QRea

用Python多线程实现生产者消费者模式

什么是生产者消费者模式 在软件开发的过程中,经常碰到这样的场景: 某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数.线程.进程等).产生数据的模块称为生产者,而处理数据的模块称为消费者.在生产者与消费者之间的缓冲区称之为仓库.生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式. 结构图如下: 为了大家容易理解,我们举一个寄信的例子.假设你要寄一封信,大致过程如下: 你把信写好--相当于生产者生产数据 你把信放入邮箱--相当于生产者把数据放

聊聊并发:生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题于是引入了生产者和消费者模式. 什么是生产者消费者模式 生产者

聊聊并发(十)生产者消费者模式

本文首发于InfoQ   作者:方腾飞  校对:张龙 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这种生产消费能力

leizi求指教-生产者消费者模式在什么情况下用到?

问题描述 生产者消费者模式在什么情况下用到? 生产者消费者通常都用来解决哪类问题?什么情况下需要想到用到生产者消费者模式呢? 解决方案 一般就是任务队列的时候,比如你有专门处理的任务的线程,同时有生成任务的线程,这样为了好控制,一般是把任务通过队列的方式来传递这样可以有多个线程做生产者,它们只需要把任务不停的丢入队列,同样很多线程做消费者,它们不停的从队列中取任务执行

生产者消费者模式浅析

        原文地址:http://blog.csdn.net/lenyusun/article/details/6609786         由于最近工作中,涉及到生产者消费者设计模式,对此有一些体会,所以总结一下,与大家分享.         什么是生产者消费者模式?         在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为

Lock、Condition实现简单的生产者消费者模式示例_java

复制代码 代码如下: package condition; import java.util.ArrayList;import java.util.List;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock; /** * 利用Lock.Condition实现生产者消费者模式 * @aut

python 多线程笔记(6)-- 生产者/消费者模式(续)

  用 threading.Event() 也可以实现生产者/消费者模式 (自己拍脑袋想出来的,无法知道其正确性,请大神告知为谢!)   import threading import time import random products = 20 class Producer(threading.Thread): '''生产者''' ix = [0] # 生产者实例个数 # 闭包,必须是数组,不能直接 ix = 0 def __init__(self): super().__init__()