在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
生产者消费者模式实战
我和同事一起利用业余时间开发的Yuna工具中使用了生产者和消费者模式。首先我先介绍下Yuna工具,在阿里巴巴很多同事都喜欢通过邮件分享技术文章,因为通过邮件分享很方便,同学们在网上看到好的技术文章,复制粘贴发送就完成了分享,但是我们发现技术文章不能沉淀下来,对于新来的同学看不到以前分享的技术文章,大家也很难找到以前分享过的技术文章。为了解决这问题,我们开发了Yuna工具。Yuna取名自我喜欢的一款游戏最终幻想里的女主角。
首先我们申请了一个专门用来收集分享邮件的邮箱,比如share@alibaba.com,同学将分享的文章发送到这个邮箱,让同学们每次都抄送到这个邮箱肯定很麻烦,所以我们的做法是将这个邮箱地址放在部门邮件列表里,所以分享的同学只需要象以前一样向整个部门分享文章就行,Yuna工具通过读取邮件服务器里该邮箱的邮件,把所有分享的邮件下载下来,包括邮件的附件,图片,和邮件回复,我们可能会从这个邮箱里下载到一些非分享的文章,所以我们要求分享的邮件标题必须带有一个关键字,比如[内贸技术分享],下载完邮件之后,通过confluence的web service接口,把文章插入到confluence里,这样新同事就可以在confluence里看以前分享过的文章,并且Yuna工具还可以自动把文章进行分类和归档。
为了快速上线该功能,当时我们花了三天业余时间快速开发了Yuna1.0版本。在1.0版本中我并没有使用生产者消费模式,而是使用单线程来处理,因为当时只需要处理我们一个部门的邮件,所以单线程明显够用,整个过程是串行执行的。在一个线程里,程序先抽取全部的邮件,转化为文章对象,然后添加全部的文章,最后删除抽取过的邮件。代码如下:
public void extract() { logger.debug("开始" + getExtractorName() + "。。"); //抽取邮件 List<Article> articles = extractEmail(); //添加文章 for (Article article : articles) { addArticleOrComment(article); } //清空邮件 cleanEmail(); logger.debug("完成" + getExtractorName() + "。。"); }
Yuna工具在推广后,越来越多的部门使用这个工具,处理的时间越来越慢,Yuna是每隔5分钟进行一次抽取的,而当邮件多的时候一次处理可能就花了几分钟,于是我在Yuna2.0版本里使用了生产者消费者模式来处理邮件,首先生产者线程按一定的规则去邮件系统里抽取邮件,然后存放在阻塞队列里,消费者从阻塞队列里取出文章后插入到conflunce里。代码如下:
public class QuickEmailToWikiExtractor extends AbstractExtractor { private ThreadPoolExecutor threadsPool; private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue; public QuickEmailToWikiExtractor() { emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>(); int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000)); } public void extract() { logger.debug("开始" + getExtractorName() + "。。"); long start = System.currentTimeMillis(); //抽取所有邮件放到队列里 new ExtractEmailTask().start(); // 把队列里的文章插入到Wiki insertToWiki(); long end = System.currentTimeMillis(); double cost = (end - start) / 1000; logger.debug("完成" + getExtractorName() + ",花费时间:" + cost + "秒"); } /** * 把队列里的文章插入到Wiki */ private void insertToWiki() { //登录wiki,每间隔一段时间需要登录一次 confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD); while (true) { //2秒内取不到就退出 ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS); if (email == null) { break; } threadsPool.submit(new insertToWikiTask(email)); } } protected List<Article> extractEmail() { List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails(); if (allEmails == null) { return null; } for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) { emailQueue.offer(exchangeEmailShallowDTO); } return null; } /** * 抽取邮件任务 * * @author tengfei.fangtf */ public class ExtractEmailTask extends Thread { public void run() { extractEmail(); } } }
以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索邮件
, confluence
, 队列
, 生产者消费者问题
, 消费者
, 生产者
, 模式
, 处理
, 生产者 消费者
, 生产
, 生产者 消费者
, 删除邮件队列
, 并发队列类
并发阻塞队列
,以便于您获取更多的相关知识。