聊聊并发:生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

生产者消费者模式实战

我和同事一起利用业余时间开发的Yuna工具中使用了生产者和消费者模式。首先我先介绍下Yuna工具,在阿里巴巴很多同事都喜欢通过邮件分享技术文章,因为通过邮件分享很方便,同学们在网上看到好的技术文章,复制粘贴发送就完成了分享,但是我们发现技术文章不能沉淀下来,对于新来的同学看不到以前分享的技术文章,大家也很难找到以前分享过的技术文章。为了解决这问题,我们开发了Yuna工具。Yuna取名自我喜欢的一款游戏最终幻想里的女主角。

首先我们申请了一个专门用来收集分享邮件的邮箱,比如share@alibaba.com,同学将分享的文章发送到这个邮箱,让同学们每次都抄送到这个邮箱肯定很麻烦,所以我们的做法是将这个邮箱地址放在部门邮件列表里,所以分享的同学只需要象以前一样向整个部门分享文章就行,Yuna工具通过读取邮件服务器里该邮箱的邮件,把所有分享的邮件下载下来,包括邮件的附件,图片,和邮件回复,我们可能会从这个邮箱里下载到一些非分享的文章,所以我们要求分享的邮件标题必须带有一个关键字,比如[内贸技术分享],下载完邮件之后,通过confluence的web service接口,把文章插入到confluence里,这样新同事就可以在confluence里看以前分享过的文章,并且Yuna工具还可以自动把文章进行分类和归档。

为了快速上线该功能,当时我们花了三天业余时间快速开发了Yuna1.0版本。在1.0版本中我并没有使用生产者消费模式,而是使用单线程来处理,因为当时只需要处理我们一个部门的邮件,所以单线程明显够用,整个过程是串行执行的。在一个线程里,程序先抽取全部的邮件,转化为文章对象,然后添加全部的文章,最后删除抽取过的邮件。代码如下:

public void extract() {
        logger.debug("开始" + getExtractorName() + "。。");
        //抽取邮件
        List<Article> articles = extractEmail();
        //添加文章
        for (Article article : articles) {
            addArticleOrComment(article);
        }
        //清空邮件
        cleanEmail();
        logger.debug("完成" + getExtractorName() + "。。");
    }

Yuna工具在推广后,越来越多的部门使用这个工具,处理的时间越来越慢,Yuna是每隔5分钟进行一次抽取的,而当邮件多的时候一次处理可能就花了几分钟,于是我在Yuna2.0版本里使用了生产者消费者模式来处理邮件,首先生产者线程按一定的规则去邮件系统里抽取邮件,然后存放在阻塞队列里,消费者从阻塞队列里取出文章后插入到conflunce里。代码如下:

public class QuickEmailToWikiExtractor extends AbstractExtractor {

private ThreadPoolExecutor      threadsPool;

private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;

public QuickEmailToWikiExtractor() {
        emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000));

    }

public void extract() {
        logger.debug("开始" + getExtractorName() + "。。");
        long start = System.currentTimeMillis();

        //抽取所有邮件放到队列里
        new ExtractEmailTask().start();

        // 把队列里的文章插入到Wiki
        insertToWiki();

        long end = System.currentTimeMillis();
        double cost = (end - start) / 1000;
        logger.debug("完成" + getExtractorName() + ",花费时间:" + cost + "秒");
    }

    /**
     * 把队列里的文章插入到Wiki
     */
    private void insertToWiki() {
        //登录wiki,每间隔一段时间需要登录一次
        confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);

        while (true) {
            //2秒内取不到就退出
            ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);
            if (email == null) {
                break;
            }
            threadsPool.submit(new insertToWikiTask(email));
        }
    }

     protected List<Article> extractEmail() {
        List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();
        if (allEmails == null) {
            return null;
        }
        for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {
            emailQueue.offer(exchangeEmailShallowDTO);
        }
        return null;
    }

    /**
     * 抽取邮件任务
     *
     * @author tengfei.fangtf
     */
    public class ExtractEmailTask extends Thread {
        public void run() {
            extractEmail();
        }
    }
}

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索邮件
, confluence
, 队列
, 生产者消费者问题
, 消费者
, 生产者
, 模式
, 处理
, 生产者 消费者
, 生产
, 生产者  消费者
, 删除邮件队列
, 并发队列类
并发阻塞队列
,以便于您获取更多的相关知识。

时间: 2024-10-15 19:04:56

聊聊并发:生产者消费者模式的相关文章

聊聊并发(十)生产者消费者模式

本文首发于InfoQ   作者:方腾飞  校对:张龙 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这种生产消费能力

用Python多线程实现生产者消费者模式

什么是生产者消费者模式 在软件开发的过程中,经常碰到这样的场景: 某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数.线程.进程等).产生数据的模块称为生产者,而处理数据的模块称为消费者.在生产者与消费者之间的缓冲区称之为仓库.生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式. 结构图如下: 为了大家容易理解,我们举一个寄信的例子.假设你要寄一封信,大致过程如下: 你把信写好--相当于生产者生产数据 你把信放入邮箱--相当于生产者把数据放

生产者消费者模式浅析

        原文地址:http://blog.csdn.net/lenyusun/article/details/6609786         由于最近工作中,涉及到生产者消费者设计模式,对此有一些体会,所以总结一下,与大家分享.         什么是生产者消费者模式?         在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为

Qt之线程同步(生产者消费者模式 - QWaitCondition)

简述 生产者将数据写入缓冲区,直到它到达缓冲区的末尾,这时,它从开始位置重新启动,覆盖现有数据.消费者线程读取数据并将其写入标准错误. Wait condition(等待条件)比单独使用 mutex(互斥量)有一个更高级的并发性,如果缓冲区的访问由一个 QMutex 把守,当生产者线程访问缓冲区时,消费者线程将无法访问.然而,两个线程同时访问不同的缓冲区是没有害处的. 示例包含两个类:Producer 和 Consumer,均继承自 QThread.循环缓冲区用于两个类之间的沟通,同步工具用于保

Qt之线程同步(生产者消费者模式 - QSemaphore)

简述 生产者将数据写入缓冲区,直到它到达缓冲区的末尾,此时,它将从开始位置重新启动,覆盖现有数据.消费者线程读取数据并将其写入标准错误. Semaphore(信号量) 比 mutex(互斥量)有一个更高级的并发性.如果缓冲区的访问由一个 QMutex 把守,当生产者线程访问缓冲区时,消费者线程将无法访问.然而,有两个线程同一时间访问不同的缓冲区是没有害处的. 示例包括两个类:Producer 和 Consumer,均继承自 QThread.循环缓冲区用于这两个类之间的沟通,信号量用于保护全局变量

leizi求指教-生产者消费者模式在什么情况下用到?

问题描述 生产者消费者模式在什么情况下用到? 生产者消费者通常都用来解决哪类问题?什么情况下需要想到用到生产者消费者模式呢? 解决方案 一般就是任务队列的时候,比如你有专门处理的任务的线程,同时有生成任务的线程,这样为了好控制,一般是把任务通过队列的方式来传递这样可以有多个线程做生产者,它们只需要把任务不停的丢入队列,同样很多线程做消费者,它们不停的从队列中取任务执行

Lock、Condition实现简单的生产者消费者模式示例_java

复制代码 代码如下: package condition; import java.util.ArrayList;import java.util.List;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock; /** * 利用Lock.Condition实现生产者消费者模式 * @aut

python 多线程笔记(6)-- 生产者/消费者模式(续)

  用 threading.Event() 也可以实现生产者/消费者模式 (自己拍脑袋想出来的,无法知道其正确性,请大神告知为谢!)   import threading import time import random products = 20 class Producer(threading.Thread): '''生产者''' ix = [0] # 生产者实例个数 # 闭包,必须是数组,不能直接 ix = 0 def __init__(self): super().__init__()

python 多线程笔记(5)-- 生产者/消费者模式

  我们已经知道,对公共资源进行互斥访问,可以使用Lock上锁,或者使用RLock去重入锁.   但是这些都只是方便于处理简单的同步现象,我们甚至还不能很合理的去解决使用Lock锁带来的死锁问题. 要解决更复杂的同步问题,就必须考虑别的办法了.   threading提供的Condition对象提供了对复杂线程同步问题的支持. Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法.   使用Condition的主要方式