支持生产阻塞的线程池

在各种并发编程模型中,生产者-消费者模式大概是最常用的了。在实际工作中,对于生产消费的速度,通常需要做一下权衡。通常来说,生产任务的速度要大于消费的速度。一个细节问题是,队列长度,以及如何匹配生产和消费的速度。

一个典型的生产者-消费者模型如下:

在并发环境下利用J.U.C提供的Queue实现可以很方便地保证生产和消费过程中的线程安全。这里需要注意的是,Queue必须设置初始容量,防止生产者生产过快导致队列长度暴涨,最终触发OutOfMemory。

对于一般的生产快于消费的情况。当队列已满时,我们并不希望有任何任务被忽略或得不到执行,此时生产者可以等待片刻再提交任务,更好的做法是,把生产者阻塞在提交任务的方法上,待队列未满时继续提交任务,这样就没有浪费的空转时间了。阻塞这一点也很容易,BlockingQueue就是为此打造的,ArrayBlockingQueue和LinkedBlockingQueue在构造时都可以提供容量做限制,其中LinkedBlockingQueue是在实际操作队列时在每次拿到锁以后判断容量。

更进一步,当队列为空时,消费者拿不到任务,可以等一会儿再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,当有任务时便可以立即获得执行,建议调用take的带超时参数的重载方法,超时后线程退出。这样当生产者事实上已经停止生产时,不至于让消费者无限等待。

于是一个高效的支持阻塞的生产消费模型就实现了。

等一下,既然J.U.C已经帮我们实现了线程池,为什么还要采用这一套东西?直接用ExecutorService不是更方便?

我们来看一下ThreadPoolExecutor的基本结构:

可以看到,在ThreadPoolExecutor中,BlockingQueue和Consumer部分已经帮我们实现好了,并且直接采用线程池的实现还有很多优势,例如线程数的动态调整等。

但问题在于,即便你在构造ThreadPoolExecutor时手动指定了一个BlockingQueue作为队列实现,事实上当队列满时,execute方法并不会阻塞,原因在于ThreadPoolExecutor调用的是BlockingQueue非阻塞的offer方法:

01 public void execute(Runnable command) {
02     if (command == null)
03         throw new NullPointerException();
04     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
05         if (runState == RUNNING && workQueue.offer(command)) {
06             if (runState != RUNNING || poolSize == 0)
07                 ensureQueuedTaskHandled(command);
08         }
09         else if (!addIfUnderMaximumPoolSize(command))
10             reject(command); // is shutdown or saturated
11     }
12 }

这时候就需要做一些事情来达成一个结果:当生产者提交任务,而队列已满时,能够让生产者阻塞住,等待任务被消费。

关键在于,在并发环境下,队列满不能由生产者去判断,不能调用ThreadPoolExecutor.getQueue().size()来判断队列是否满。

线程池的实现中,当队列满时会调用构造时传入的RejectedExecutionHandler去拒绝任务的处理。默认的实现是AbortPolicy,直接抛出一个RejectedExecutionException。

几种拒绝策略在这里就不赘述了,这里和我们的需求比较接近的是CallerRunsPolicy,这种策略会在队列满时,让提交任务的线程去执行任务,相当于让生产者临时去干了消费者干的活儿,这样生产者虽然没有被阻塞,但提交任务也会被暂停。

01 public static class CallerRunsPolicy implements RejectedExecutionHandler {
02     /**
03      * Creates a <tt>CallerRunsPolicy</tt>.
04      */
05     public CallerRunsPolicy() { }
06  
07     /**
08      * Executes task r in the caller's thread, unless the executor
09      * has been shut down, in which case the task is discarded.
10      * @param r the runnable task requested to be executed
11      * @param e the executor attempting to execute this task
12      */
13     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
14         if (!e.isShutdown()) {
15             r.run();
16         }
17     }
18 }

但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。

参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:

01 new RejectedExecutionHandler() {
02     @Override
03     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
04         if (!executor.isShutdown()) {
05             try {
06                 executor.getQueue().put(r);
07             } catch (InterruptedException e) {
08                 // should not be interrupted
09             }
10         }
11     }
12 };

这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。

相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。 

时间: 2024-10-31 14:34:48

支持生产阻塞的线程池的相关文章

nginx-1.7.11 开发版发布,支持体验版的线程池和代理请求缓冲等

无名码农 发布于: 2015年03月25日 (9评) nginx-1.7.11 开发版已经发布了,支持体验版的线程池和代理请求缓冲等. 详情请看: *) Change: the "sendfile" parameter of the "aio" directive is        deprecated; now nginx automatically uses AIO to pre-load data for        sendfile if both &q

支持生产阻塞的Java线程池_java

通常来说,生产任务的速度要大于消费的速度.一个细节问题是,队列长度,以及如何匹配生产和消费的速度. 一个典型的生产者-消费者模型如下:   在并发环境下利用J.U.C提供的Queue实现可以很方便地保证生产和消费过程中的线程安全.这里需要注意的是,Queue必须设置初始容量,防止生产者生产过快导致队列长度暴涨,最终触发OutOfMemory. 对于一般的生产快于消费的情况.当队列已满时,我们并不希望有任何任务被忽略或得不到执行,此时生产者可以等待片刻再提交任务,更好的做法是,把生产者阻塞在提交任

Marble原理之线程池

本章节依赖于[Marble使用],阅读本章节前请保证已经充分了解Marble 线程池概述 由于Marble属于框架性项目,用户接入Marble不关心Marble的实现机制.因此Marble在做相关处理时对资源的消耗要可控,不能因为Marble的原因导致接入的应用不可用(比如资源耗尽). 此外,Marble-Agent每次收到RPC调度为了不阻塞都会新开线程进行JOB执行,对线程的使用非常频繁,因此必须使用同一的线程池进行Marble的资源使用收口. 对于线程池 Java已经做了很好的封装,大部分

戏(细)说Executor框架线程池任务执行全过程(上)

原文链接   归档下发表于infoq.com 2015年6月的两篇文章. 内容综述 基于Executor接口中将任务提交和任务执行解耦的设计,ExecutorService和其各种功能强大的实现类提供了非常简便方式来提交任务并获取任务执行结果,封装了任务执行的全部过程.本文尝试通过对j.u.c.下该部分源码的解析以ThreadPoolExecutor为例来追踪任务提交.执行.获取执行结果的整个过程.为了避免陷入枯燥的源码解释,将该过程和过程中涉及的角色与我们工作中的场景和场景中涉及的角色进行映射

Python并发编程之线程池/进程池

引言 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间.但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对

线程池规模调优浅析

线程池应该配置成多大? 先前一位朋友通过Skype问我关于运行在64位机器JVM集群一些问题,该集群每天会运行几次30万+个线程的任务.30万+个线程运行时,核心模块花了太多时间管理它们,导致应用程序极其不稳定.很明显,该应用程序需要一个线程池,从而保证可以杀死客户端,而不是放任客户端把整个应用程序搞崩溃. 上面的示例是比较极端的情况,但它强调了我们使用线程池的原因.尽管我们合理使用了线程池,仍可能由于数据丢失或交易失败惹恼用户.若我们的线程池定义得过大或过小,都有可能让应用程序完全瘫痪.大小合

Java线程池和阻塞队列

Java提供了自己的线程池.每次只执行指定数量的线程,java.util.concurrent.ThreadPoolExecutor 就是这样的线程池. ThreadPoolExecutor 参数介绍: corePoolSize 核心线程数,指保留的线程池大小(不超过maximumPoolSize值时,线程池中最多有corePoolSize 个线程工作). maximumPoolSize 指的是线程池的最大大小(线程池中最大有corePoolSize 个线程可运行). keepAliveTime

C语言实现支持动态拓展和销毁的线程池_C 语言

本文实例介绍了C 语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下 实现功能 1.初始化指定个数的线程 2.使用链表来管理任务队列 3.支持拓展动态线程 4.如果闲置线程过多,动态销毁部分线程 #include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <signal.h> /*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/ typedef str

Linux下套接字详解(七)----线程池accept处理高并发connect

前言 服务器在调用listen和accept后,就会阻塞在accept函数上,accpet函数返回后循环调用accept函数等待客户的TCP连接. 我们知道服务器段listen套接字能处理的连接数与监听队列的大小有关,如果这时候又大量的用户并发发起connec连接,那么在listen有队列上限(最大可接受TCP的连接数)的情况下,有多少个connect会成功了. 试验证明,当连接数远远高于listen的可连接数上限时,客户端的大部分TCP请求会被抛弃,只有当listen监听队列空闲或者放弃某个连