在上一章中我们概述了一下线程池,这一章我们看一下创建newFixedThreadPool的源码。例子还是我们在上一章中写的那个例子。
创建newFixedThreadPool的方法:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
上面这两个方法是创建固定数量的线程池的两种方法,两者的区别是:第二种创建方法多了一个线程工厂的方法。我们继续看ThreadPoolExecutor这个类中的构造函数:
ThreadPoolExecutor的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
ThreadPollExecutor中的所有的构造函数最终都会调用上面这个构造函数,接下来我们来分析一下这些参数的含义:
corePoolSize:
线程池启动后,在池中保持的线程的最小数量。需要说明的是线程数量是逐步到达corePoolSize值的。例如corePoolSize被设置为10,而任务数量只有5,则线程池中最多会启动5个线程,而不是一次性地启动10个线程。
maxinumPoolSize:
线程池中能容纳的最大线程数量,如果超出,则使用RejectedExecutionHandler拒绝策略处理。
keepAliveTime:
线程的最大生命周期。这里的生命周期有两个约束条件:一:该参数针对的是超过corePoolSize数量的线程;二:处于非运行状态的线程。举个例子:如果corePoolSize(最小线程数)为10,maxinumPoolSize(最大线程数)为20,而此时线程池中有15个线程在运行,过了一段时间后,其中有3个线程处于等待状态的时间超过keepAliveTime指定的时间,则结束这3个线程,此时线程池中则还有12个线程正在运行。
unit:
这是keepAliveTime的时间单位,可以是纳秒,毫秒,秒,分钟等。
workQueue:
任务队列。当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要一个容器来容纳这些任务,这就是任务队列。这个任务队列是一个阻塞式的单端队列。
threadFactory:
定义如何启动一个线程,可以设置线程的名称,并且可以确定是否是后台线程等。
handler:
拒绝任务处理器。由于超出线程数量和队列容量而对继续增加的任务进行处理的程序。
OK,ThreadPoolExecutor中的主要参数介绍完了。我们再说一下线程的管理过程:首先创建一个线程池,然后根据任务的数量逐步将线程增大到corePoolSize,如果此时仍有任务增加,则放置到workQueue中,直到workQueue爆满为止,然后继续增加池中的线程数量(增强处理能力),最终达到maxinumPoolSize。那如果此时还有任务要增加进来呢?这就需要handler来处理了,或者丢弃新任务,或者拒绝新任务,或者挤占已有的任务。在任务队列和线程池都饱和的情况下,一旦有线程处于等待(任务处理完毕,没有新任务)状态的时间超过keepAliveTime,则该线程终止,也就是说池中的线程数量会逐渐降低,直至为corePoolSize数量为止。在《编写高质量代码
改善Java程序的151个建议》这本书里举的这个例子很形象:
OK,接下来我们来看一下怎么往任务队里中放入线程任务:在java.util.concurrent.AbstractExecutorService这个类的submit方法
submit方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask);//执行任务 return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask);//执行任务 return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask);//执行任务 return ftask; }
这是三个重载方法,分别对应Runnable、带结果的Runnable接口和Callable回调函数。其中的newTaskFor也是一个重载的方法,它通过层层的包装,把Runnable接口包装成了适配RunnableFuture的实现类,底层实现如下:
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
在submit中最重要的是execute这个方法,这个方法也是我们分析的重点
execute方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {// if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
在这个方法中分为三部分
1、如果少于corePoolSize数量的线程在运行,则启动一个新的线程并把传进来的Runnable做为第一个任务。然后会检查线程的运行状态和worker的数量,阻止不符合要求的任务添加到线程中
2、如果一个任务成功的放入到了队列中,我们仍然需要二次检查我们是否应该添加线程或者停止。因此我们重新检查线程状态,是否需要回滚队列,或者是停止或者是启动一个新的线程
3、如果我们不能添加队列任务了,但是仍然在往队列中添加任务,如果添加失败的话,用拒绝策略来处理。
这里最主要的是addWorker这个方法:
try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); }
我们在这个方法里创建一个线程,注意这个线程不是我们的任务线程,而是经过包装的Worker线程。所以这里的run方法是Worker这个类中的run方法。execute方法是通过Worker类启动的一个工作线程,执行的是我们的第一个任务,然后该线程通过getTask方法从任务队列总获取任务,之后再继续执行。这个任务队列是一个BlockingQueue,是一个阻塞式的,也就是说如果该队列元素为0,则保持等待状态。直到有任务进入为止。