ThreadPoolExecutor的应用和实现分析(中)—— 任务处理相关源码分析 线程利用(转)

前面一篇文章从Executors中的工厂方法入手,已经对ThreadPoolExecutor的构造和使用做了一些整理。而这篇文章,我们将接着前面的介绍,从源码实现上对ThreadPoolExecutor在任务的提交、执行,线程重用和线程数维护等方面做下分析。

0.    ThreadPoolExecutor类的声明属性变量分析


1

public class ThreadPoolExecutor extends AbstractExecutorService

从这个类声明中我们可以看到java.util.ThreadPoolExecutor是继承于AbstractExecutorService的,而之前的文章我也提到过,AbstractExecutorService已经实现了一些任务提交处理的方法,如submit()方法都是在这个抽象类中实现的。但submit()方法,最后也是会调用ThreadPoolExecutor的execute()方法。

打开SunJDK中的ThreadPoolExecutor类源码,除了上篇文章提到的一些和构造方法中参数对应的属性之外,让我们看看还有什么:

  • mainLock 对整个ThreadPoolExecutor对象的锁
  • workers  存储工作线程对应Worker对象的HashSet
  • termination 线程池ThreadPoolExecutor对象的生命周期终止条件,和mainLock相关
  • largestPoolSize 线程池跑过的最大线程数
  • completedTaskCount 完成任务数
  • ctl 执行器ThreadPoolExecutor的生命周期状态和活动状态的worker数封装

稍微需要说一下最后一个, ctl是一个AtomicInteger对象,以位运算的方式打包封装了当前线程池ThreadPoolExecutor对象的状态和活动线程数两个数据

1.    执行器状态

ExecutorService中已经指定了这个接口对应的类要实现的方法,其中就包括shutdown()和shutdownNow()等方法。在ThreadPoolExecutor中指明了状态的含义,并包含其于ctl属性中。

ThreadPoolExecutor对象有五种状态,如下:

  • RUNNING 在ThreadPoolExecutor被实例化的时候就是这个状态
  • SHUTDOWN 通常是已经执行过shutdown()方法,不再接受新任务,等待线程池中和队列中任务完成
  • STOP 通常是已经执行过shutdownNow()方法,不接受新任务,队列中的任务也不再执行,并尝试终止线程池中的线程
  • TIDYING 线程池为空,就会到达这个状态,执行terminated()方法
  • TERMINATED terminated()执行完毕,就会到达这个状态,ThreadPoolExecutor终结

2.    Worker内部类

它既实现了Runnable,同时也是一个AQS ( AbstractQueuedSynchronizer )。


1

2

3

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

封装了3样东西,Runnable类的首个任务对象,执行的线程thread和完成的任务数(volatile)completedTasks。


1

2

3

final Thread thread;

Runnable firstTask;

volatile long completedTasks;

这个类还提供了interruptIfStarted()这样一个方法,里面做了(getState()>= 0)的判断。与此呼应,Worker的构造方法里对state设置了-1,避免在线程执行前被停掉。


1

2

3

4

5

Worker(Runnable firstTask) {

    setState(-1); // inhibit interrupts until runWorker

    this.firstTask = firstTask;

    this.thread = getThreadFactory().newThread(this);

}

3. 提交任务

上篇文章已经提到了,提交新任务的时候,如果没达到核心线程数corePoolSize,则开辟新线程执行。如果达到核心线程数corePoolSize, 而队列未满,则放入队列,否则开新线程处理任务,直到maximumPoolSize,超出则丢弃处理。

这段源码逻辑如下,不细说了。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

 

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

    else if (!addWorker(command, false))

        reject(command);

}

4. addWorker()的实现

在上面提交任务的时候,会出现开辟新的线程来执行,这会调用addWorker()方法。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);

 

        // Check if queue empty only if necessary.

        if (rs >= SHUTDOWN &&

            ! (rs == SHUTDOWN &&

               firstTask == null &&

               ! workQueue.isEmpty()))

            return false;

 

        for (;;) {

            int wc = workerCountOf(c);

            if (wc >= CAPACITY ||

                wc >= (core ? corePoolSize : maximumPoolSize))

                return false;

            if (compareAndIncrementWorkerCount(c))

                break retry;

            c = ctl.get();  // Re-read ctl

            if (runStateOf(c) != rs)

                continue retry;

            // else CAS failed due to workerCount change; retry inner loop

        }

    }

 

    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try {

        final ReentrantLock mainLock = this.mainLock;

        w = new Worker(firstTask);

        final Thread t = w.thread;

        if (t != null) {

            mainLock.lock();

            try {

                // Recheck while holding lock.

                // Back out on ThreadFactory failure or if

                // shut down before lock acquired.

                int c = ctl.get();

                int rs = runStateOf(c);

 

                if (rs < SHUTDOWN ||

                    (rs == SHUTDOWN && firstTask == null)) {

                    if (t.isAlive()) // precheck that t is startable

                        throw new IllegalThreadStateException();

                    workers.add(w);

                    int s = workers.size();

                    if (s > largestPoolSize)

                        largestPoolSize = s;

                    workerAdded = true;

                }

            } finally {

                mainLock.unlock();

            }

            if (workerAdded) {

                t.start();

                workerStarted = true;

            }

        }

    } finally {

        if (! workerStarted)

            addWorkerFailed(w);

    }

    return workerStarted;

}

代码较长,我们可以分两大部分看:

第一段从第3行到第26行,是双层无限循环,尝试增加线程数到ctl变量,并且做一些比较判断,如果超出线程数限定或者ThreadPoolExecutor的状态不符合要求,则直接返回false,增加worker失败。

第二段从第28行开始到结尾,把firstTask这个Runnable对象传给Worker构造方法,赋值给Worker对象的task属性。Worker对象把自身(也是一个Runnable)封装成一个Thread对象赋予Worker对象的thread属性。锁住整个线程池并实际增加worker到workers的HashSet对象当中。成功增加后开始执行t.start(),就是worker的thread属性开始运行,实际上就是运行Worker对象的run方法。Worker的run()方法实际上调用了ThreadPoolExecutor的runWorker()方法。

5. 任务的执行runWorker()


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

final void runWorker(Worker w) {

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

        w.firstTask = null;

        w.unlock(); // allow interrupts

        boolean completedAbruptly = true;

        try {

            while (task != null || (task = getTask()) != null) {

                w.lock();

                // If pool is stopping, ensure thread is interrupted;

                // if not, ensure thread is not interrupted.  This

                // requires a recheck in second case to deal with

                // shutdownNow race while clearing interrupt

                if ((runStateAtLeast(ctl.get(), STOP) ||

                     (Thread.interrupted() &&

                      runStateAtLeast(ctl.get(), STOP))) &&

                    !wt.isInterrupted())

                    wt.interrupt();

                try {

                    beforeExecute(wt, task);

                    Throwable thrown = null;

                    try {

                        task.run();

                    } catch (RuntimeException x) {

                        thrown = x; throw x;

                    } catch (Error x) {

                        thrown = x; throw x;

                    } catch (Throwable x) {

                        thrown = x; throw new Error(x);

                    } finally {

                        afterExecute(task, thrown);

                    }

                } finally {

                    task = null;

                    w.completedTasks++;

                    w.unlock();

                }

            }

            completedAbruptly = false;

        } finally {

            processWorkerExit(w, completedAbruptly);

        }

    }

这段代码实际上就是执行提交给线程池执行的Runnable任务的实际内容。其中,值得注意的有以下几点:

  • 线程开始执行前,需要对worker加锁,完成一个任务后执行unlock()
  • 在任务执行前后,执行beforeExecute()和afterExecute()方法
  • 记录任务执行中的异常后,继续抛出
  • 每个任务完成后,会记录当前线程完成的任务数
  • 当worker执行完一个任务的时候,包括初始任务firstTask,会调用getTask()继续获取任务,这个方法调用是可以阻塞的
  • 线程退出,执行processWorkerExit(w, completedAbruptly)处理

5. Worker线程的复用和任务的获取getTask()

在上一段代码中,也就是runWorker()方法,任务的执行过程是嵌套在while循环语句块中的。每当一个任务执行完毕,会从头开始做下一次循环执行,实现了空闲线程的复用。而要执行的任务则是来自于getTask()方法:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

private Runnable getTask() {

        boolean timedOut = false; // Did the last poll() time out?

 

        retry:

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

 

            // Check if queue empty only if necessary.

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

                decrementWorkerCount();

                return null;

            }

 

            boolean timed;      // Are workers subject to culling?

 

            for (;;) {

                int wc = workerCountOf(c);

                timed = allowCoreThreadTimeOut || wc > corePoolSize;

 

                if (wc <= maximumPoolSize && ! (timedOut && timed))

                     break;

                if (compareAndDecrementWorkerCount(c))

                     return null;

                c = ctl.get();

                // Re-read ctl

                if (runStateOf(c) != rs)

                     continue retry;

                // else CAS failed due to workerCount change; retry inner loop

             }

             try {

                 Runnable r = timed ?

                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                     workQueue.take();

                 if (r != null)

                     return r;

                 timedOut = true;

             } catch (InterruptedException retry) {

                 timedOut = false;

             }

         }

     }

getTask()实际上是从工作队列(workQueue)中取提交进来的任务。这个workQueue是一个BlockingQueue,通常当队列中没有新任务的时候,则getTask()会阻塞。另外,还有定时阻塞这样一段逻辑:如果从队列中取任务是计时的,则用poll()方法,并设置等待时间为keepAlive,否则调用阻塞方法take()。当poll()超时,则获取到的任务为null,timeOut设置为 true。这段代码也是放在一个for(;;)循环中,前面有判断超时的语句,如果超时,则return null。这意味着runWorker()方法的while循环结束,线程将退出,执行processWorkerExit()方法。

回头看看是否计时是如何确定的。


1

2

int wc = workerCountOf(c);

timed = allowCoreThreadTimeOut || wc &gt; corePoolSize;

即判断当前线程池的线程数是否超出corePoolSize,如果超出这个值并且空闲时间多于keepAlive则当前线程退出。

另外一种情况就是allowCoreThreadTimeOut为true,就是允许核心在空闲超时的情况下停掉。

6. 线程池线程数的维护和线程的退出处理

刚刚也提到了,我们再看下processWorkerExit()方法。这个方法最主要就是从workers的Set中remove掉一个多余的线程。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

private void processWorkerExit(Worker w, boolean completedAbruptly) {

         if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

             decrementWorkerCount();

         final ReentrantLock mainLock = this.mainLock;

         mainLock.lock();

         try {

             completedTaskCount += w.completedTasks;

             workers.remove(w);

         } finally {

             mainLock.unlock();

         }

         tryTerminate();

         int c = ctl.get();

         if (runStateLessThan(c, STOP)) {

             if (!completedAbruptly) {

                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

                 if (min == 0 && ! workQueue.isEmpty())

                    min = 1;

                 if (workerCountOf(c) >= min)

                    return; // replacement not needed

            }

            addWorker(null, false);

        }

    }

这个方法的第二个参数是判断是否在runWorker()中正常退出了循环向下执行,如果不是,说明在执行任务的过程中出现了异常,completedAbruptly为true,线程直接退出,需要直接对活动线程数减1 。

之后,加锁统计完成的任务数,并从workers这个集合中移除当前worker。

执行tryTerminate(),这个方法后面会详细说,主要就是尝试将线程池推向TERMINATED状态。

最后比较当前线程数是不是已经低于应有的线程数,如果这个情况发生,则添加无任务的空Worker到线程池中待命。

以上,增加新的线程和剔除多余的线程的过程大概就是如此,这样线程池能保持额定的线程数,并弹性伸缩,保证系统的资源不至于过度消耗。

 

http://www.molotang.com/articles/522.html

 

时间: 2024-10-23 09:44:50

ThreadPoolExecutor的应用和实现分析(中)—— 任务处理相关源码分析 线程利用(转)的相关文章

jquery中extend, $.extend源码分析

好久没写jquery源码的内容了.. jquery的发展有很大因素是因为它非常易于扩展,究其原因就得益于 extend函数 该函数是一个扩展函数-说是一个扩展函数,其实就是一个浅拷贝和深拷贝的函数而已. 该函数 确实很强大,而且写的很优雅.. 先来看看用法,有三种用法.  代码如下 复制代码 1.$.extend(dest,src1,src2,src3-); 2.$.extend(src) 3.$.extend(boolean,dest,src1,src2,src3-) 意思就是将 src1,s

OkHttp 3.7源码分析(三)——任务队列

OkHttp3.7源码分析文章列表如下: OkHttp源码分析--整体架构 OkHttp源码分析--拦截器 OkHttp源码分析--任务队列 OkHttp源码分析--缓存策略 OkHttp源码分析--多路复用 前面的博客已经提到过,OkHttp的一个高效之处在于在内部维护了一个线程池,方便高效地执行异步请求.本篇博客将详细介绍OkHttp的任务队列机制. 1. 线程池的优点 OkHttp的任务队列在内部维护了一个线程池用于执行具体的网络请求.而线程池最大的好处在于通过线程复用减少非核心任务的损耗

OkHttp 3.7源码分析(五)——连接池

OkHttp3.7源码分析文章列表如下: OkHttp源码分析--整体架构 OkHttp源码分析--拦截器 OkHttp源码分析--任务队列 OkHttp源码分析--缓存策略 OkHttp源码分析--多路复用 接下来讲下OkHttp的连接池管理,这也是OkHttp的核心部分.通过维护连接池,最大限度重用现有连接,减少网络连接的创建开销,以此提升网络请求效率. 1. 背景 1.1 keep-alive机制 在HTTP1.0中HTTP的请求流程如下: 这种方法的好处是简单,各个请求互不干扰.但在复杂

Android AsyncTask源码分析_Android

Android中只能在主线程中进行UI操作,如果是其它子线程,需要借助异步消息处理机制Handler.除此之外,还有个非常方便的AsyncTask类,这个类内部封装了Handler和线程池.本文先简要介绍AsyncTask的用法,然后分析具体实现. 基本用法AsyncTask是一个抽象类,我们需要创建子类去继承它,并且重写一些方法.AsyncTask接受三个泛型参数: Params: 指定传给任务执行时的参数的类型 Progress: 指定后台任务执行时将任务进度返回给UI线程的参数类型 Res

OkHttp 3.7源码分析(一)——整体架构

OkHttp3.7源码分析文章列表如下: OkHttp源码分析--整体架构 OkHttp源码分析--拦截器 OkHttp源码分析--任务队列 OkHttp源码分析--缓存策略 OkHttp源码分析--多路复用 OkHttp是一个处理网络请求的开源项目,是Android端最火热的轻量级框架,由移动支付Square公司贡献用于替代HttpUrlConnection和Apache HttpClient.随着OkHttp的不断成熟,越来越多的Android开发者使用OkHttp作为网络框架. 之所以可以

Tomcat源码分析——server.xml文件的解析

前言 在<Tomcat源码分析--server.xml文件的加载>一文中我们介绍了server.xml的加载,本文基于Tomcat7.0的Java源码,接着对server.xml文件是如何解析的进行分析. 概要 规则 Tomcat将server.xml文件中的所有元素上的属性都抽象为Rule,以Server元素为例,在内存中对应Server实例,Server实例的属性值就来自于Server元素的属性值.通过对规则(Rule)的应用,最终改变Server实例的属性值. Rule是一个抽象类,其中

Java BufferedWriter BufferedReader 源码分析_java

一:BufferedWriter  1.类功能简介:         BufferedWriter.缓存字符输出流.他的功能是为传入的底层字符输出流提供缓存功能.同样当使用底层字符输出流向目的地中写入字符或者字符数组时.每写入一次就要打开一次到目的地的连接.这样频繁的访问不断效率底下.也有可能会对存储介质造成一定的破坏.比如当我们向磁盘中不断的写入字节时.夸张一点.将一个非常大单位是G的字节数据写入到磁盘的指定文件中的.没写入一个字节就要打开一次到这个磁盘的通道.这个结果无疑是恐怖的.而当我们使

OkHttp 3.7源码分析(四)——缓存策略

OkHttp3.7源码分析文章列表如下: OkHttp源码分析--整体架构 OkHttp源码分析--拦截器 OkHttp源码分析--任务队列 OkHttp源码分析--缓存策略 OkHttp源码分析--多路复用 合理地利用本地缓存可以有效地减少网络开销,减少响应延迟.HTTP报头也定义了很多与缓存有关的域来控制缓存.今天就来讲讲OkHttp中关于缓存部分的实现细节. 1. HTTP缓存策略 首先来了解下HTTP协议中缓存部分的相关域. 1.1 Expires 超时时间,一般用在服务器的respon

jQuery 1.9.1源码分析系列(十四)之常用jQuery工具_jquery

为了给下一章分析动画处理做准备,先来看一下一些工具.其中队列工具在动画处理中被经常使用. jQuery.fn. queue(([ queueName ] [, newQueue ]) || ([ queueName ,] callback ))(获取或设置当前匹配元素上待执行的函数队列. 如果当前jQuery对象匹配多个元素:获取队列时,只获取第一个匹配元素上的队列:设置队列(替换队列.追加函数)时,则为每个匹配元素都分别进行设置.如果需要移除并执行队列中的第一个函数,请使用dequeue()函