java thread reuse(good)

 

I have always read that creating threads is expensive. I also know that you cannot rerun a thread.

I see in the doc of Executors class: Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.

Mind the word 'reuse'.

How do thread pools 'reuse' threads?

Answer:

The thread pool consists of a number of fixed worker threads that can take tasks from an internal task queue.
So if one task ends, the thread does not end but waits for the next task. If you abort a thread, it is automatically replaced.

Look at the documentation for more details.

From Thread.start() Javadoc:

 * Causes this thread to begin execution; the Java Virtual Machine
 * calls the <code>run</code> method of this thread.

BUT then inside each Thread's run() method Runnable shall be dequeued and the run() method of each Runnable is going to be called. So each thread can process several Runnable. That's what they refer to by "thread reuse".

One way to do your own thread pool is to use a blocking queue on to which you enqueue runnables and have each of your thread, once it's done processing the run() method of a Runnable, dequeue the next Runnable (or block) and run its run() method, then rinse and repeat.

I guess part of the confusion (and it is a bit confusing) comes from the fact that a Thread takes a Runnable and upon calling start() the Runnable 's run() method is called while the default thread pools also take Runnable.

http://stackoverflow.com/questions/2324030/java-thread-reuse

Worker所在的线程启动后,首先执行创建其时传入的Runnable任务,执行完成后,循环调用getTask来获取新的任务,在没有任务的情况下,退出此线程。

getTask方法实现:

Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
getTask就是通过WorkQueue的poll或task方法来获取下一个要执行的任务。
回到execute方法 ,execute 方法部分实现:

if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated

如果当前线程池数量大于corePoolSize或addIfUnderCorePoolSize方法执行失败,则执行后续操作;如果线程池处于运行状态并且workQueue中成功加入任务,再次判断如果线程池的状态不为运行状态或当前线程池数为0,则调用ensureQueuedTaskHandled方法

ensureQueuedTaskHandled方法实现:
private void ensureQueuedTaskHandled(Runnable command) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try {
int state = runState;
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);
} finally {
mainLock.unlock();
}
if (reject)
reject(command);
else if (t != null)
t.start();
}
ensureQueuedTaskHandled方法判断线程池运行,如果状态不为运行状态,从workQueue中删除, 并调用reject做拒绝处理。
reject方法实现:
void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

再次回到execute方法,

if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
如线程池workQueue offer失败或不处于运行状态,调用addIfUnderMaximumPoolSize,addIfUnderMaximumPoolSize方法基本和addIfUnderCorePoolSize实现类似,不同点在于根据最大线程数(maximumPoolSize)进行比较,如果超过最大线程数,返回false,调用reject方法,下面是addIfUnderMaximumPoolSize方法实现:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}

3. 添加任务处理流程
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果当前线程池中的数量小于corePoolSize,并线程池处于Running状态,创建并添加的任务。
如果当前线程池中的数量等于corePoolSize,并线程池处于Running状态,缓冲队列 workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。
如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于maximumPoolSize,新提交任务会创建新线程执行任务。
如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。

当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。

 

根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是:
1、workQueue.offer(command)
workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。
2、ensureQueuedTaskHandled(command)
这个是线程执行的关键语句。看看它的源码:

Java代码

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. ..........
  3. private void ensureQueuedTaskHandled(Runnable command) {
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. boolean reject = false;
  7. Thread t = null;
  8. try {
  9. int state = runState;
  10. if (state != RUNNING && workQueue.remove(command))
  11. reject = true;
  12. else if (state < STOP &&
  13. poolSize < Math.max(corePoolSize, 1) &&
  14. !workQueue.isEmpty())
  15. t = addThread(null);
  16. } finally {
  17. mainLock.unlock();
  18. }
  19. if (reject)
  20. reject(command);
  21. else if (t != null)
  22. t.start();
  23. }
  24. ..........
  25. }

在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码:

Java代码

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. ..........
  3. private Thread addThread(Runnable firstTask) {
  4. Worker w = new Worker(firstTask);
  5. Thread t = threadFactory.newThread(w);
  6. if (t != null) {
  7. w.thread = t;
  8. workers.add(w);
  9. int nt = ++poolSize;
  10. if (nt > largestPoolSize)
  11. largestPoolSize = nt;
  12. }
  13. return t;
  14. }
  15. ..........
  16. }

这里两个重点,很明显:
1、Worker w = new Worker(firstTask)
2、Thread t = threadFactory.newThread(w)
先看Worker是个什么结构:

Java代码

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. ..........
  3. private final class Worker implements Runnable {
  4. ..........
  5. Worker(Runnable firstTask) {
  6. this.firstTask = firstTask;
  7. }
  8. private Runnable firstTask;
  9. ..........
  10. public void run() {
  11. try {
  12. Runnable task = firstTask;
  13. firstTask = null;
  14. while (task != null || (task = getTask()) != null) {
  15. runTask(task);
  16. task = null;
  17. }
  18. } finally {
  19. workerDone(this);
  20. }
  21. }
  22. }
  23. Runnable getTask() {
  24. for (;;) {
  25. try {
  26. int state = runState;
  27. if (state > SHUTDOWN)
  28. return null;
  29. Runnable r;
  30. if (state == SHUTDOWN) // Help drain queue
  31. r = workQueue.poll();
  32. else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
  33. r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
  34. else
  35. r = workQueue.take();
  36. if (r != null)
  37. return r;
  38. if (workerCanExit()) {
  39. if (runState >= SHUTDOWN) // Wake up others
  40. interruptIdleWorkers();
  41. return null;
  42. }
  43. // Else retry
  44. } catch (InterruptedException ie) {
  45. // On interruption, re-check runState
  46. }
  47. }
  48. }
  49. }
  50. ..........
  51. }

Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。
在看看newThread是一个什么方法:

Java代码

  1. public class Executors {
  2. ..........
  3. static class DefaultThreadFactory implements ThreadFactory {
  4. ..........
  5. public Thread newThread(Runnable r) {
  6. Thread t = new Thread(group, r,
  7. namePrefix + threadNumber.getAndIncrement(),
  8. 0);
  9. if (t.isDaemon())
  10. t.setDaemon(false);
  11. if (t.getPriority() != Thread.NORM_PRIORITY)
  12. t.setPriority(Thread.NORM_PRIORITY);
  13. return t;
  14. }
  15. ..........
  16. }
  17. ..........
  18. }

通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。

之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。
从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。

http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315645.html

 

 

Java代码  

  1. private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
  2.        Thread t = null;  
  3.        final ReentrantLock mainLock = this.mainLock;  
  4.        mainLock.lock();  
  5.        try {  
  6.         //poolSize < corePoolSize 即当前工作线程的数量一定要小于你设置的线程最大数量  
  7.         //CachedThreadPool永远也不会进入该方法,因为它的corePoolSize初始为0  
  8.            if (poolSize < corePoolSize && runState == RUNNING)  
  9.                t = addThread(firstTask);  
  10.        } finally {  
  11.            mainLock.unlock();  
  12.        }  
  13.        if (t == null)  
  14.            return false;  
  15.        t.start();   //线程执行了  
  16.        return true;  
  17.    }  

    看’t.start()’,这表示工作线程启动了,工作线程t启动的前提条件是’t = addThread(firstTask); ‘返回值t必须不为null。好了,现在想看看java线程池中工作线程是怎么样的吗?请看addThread方法: 
   

Java代码  

  1. private Thread addThread(Runnable firstTask) {  
  2.     //Worker就是典型的工作线程,所以的核心线程都在工作线程中执行  
  3.        Worker w = new Worker(firstTask);  
  4.        //采用默认的线程工厂生产出一线程。注意就是设置一些线程的默认属性,如优先级、是否为后台线程等  
  5.        Thread t = threadFactory.newThread(w);   
  6.        if (t != null) {  
  7.            w.thread = t;  
  8.            workers.add(w);  
  9.          //没生成一个工作线程 poolSize加1,但poolSize等于最大线程数corePoolSize时,则不能再生成工作线程  
  10.            int nt = ++poolSize;    
  11.            if (nt > largestPoolSize)  
  12.                largestPoolSize = nt;  
  13.        }  
  14.        return t;  
  15.    }  

   看见没,Worker就是工作线程类,它是ThreadPoolExecutor中的一个内部类。下面,我们主要分析Worker类,如了解了Worker类,那基本就了解了java线程池的整个原理了。不用怕,Worker类的逻辑很简单,它其实就是一个线程,实现了Runnable接口的,所以,我们先从run方法入手,run方法源码如下: 

 

Java代码  

  1. public void run() {  
  2.             try {  
  3.                 Runnable task = firstTask;  
  4.                 firstTask = null;  
  5.                 /** 
  6.                  * 注意这段while循环的执行逻辑,没执行完一个核心线程后,就会去线程池 
  7.                  * 队列中取下一个核心线程,如取出的核心线程为null,则当前工作线程终止 
  8.                  */  
  9.                 while (task != null || (task = getTask()) != null) {  
  10.                     runTask(task);  //你所提交的核心线程(任务)的运行逻辑  
  11.                     task = null;  
  12.                 }  
  13.             } finally {  
  14.                 workerDone(this); // 当前工作线程退出  
  15.             }  
  16.         }  
  17.     }  

    从源码中可看出,我们所提交的核心线程(任务)的逻辑是在Worker中的runTask()方法中实现的。这个方法很简单,自己可以打开看看。这里要注意一点,在runTask()方法中执行核心线程时是调用核心线程的run()方法,这是一个寻常方法的调用,千万别与线程的启动(start())混合了。这里还有一个比较重要的方法,那就是上述代码中while循环中的getTask()方法,它是一个从池队列中取的核心线程(任务)的方法。具体代码如下: 

   

Java代码  

  1. Runnable getTask() {  
  2.         for (;;) {  
  3.             try {  
  4.                 int state = runState;  
  5.                 if (state > SHUTDOWN)    
  6.                     return null;  
  7.                 Runnable r;  
  8.                 if (state == SHUTDOWN)  //帮助清空队列  
  9.                     r = workQueue.poll();  
  10.                /* 
  11.                 * 对于条件1,如果可以超时,则在等待keepAliveTime时间后,则返回一null对象,这时就 
  12.                 *  销毁该工作线程,这就是CachedThreadPool为什么能回收空闲线程的原因了。 
  13.                 * 注意以下几点:1.这种功能情况一般不可能在fixedThreadPool中出现 
  14.                 *            2.在使用CachedThreadPool时,条件1一般总是成立,因为CachedThreadPool的corePoolSize 
  15.                 *              初始为0 
  16.                 */  
  17.                 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  //------------------条件1  
  18.                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);    
  19.                 else  
  20.                     r = workQueue.take();       //如果队列不存在任何元素 则一直等待。 FiexedThreadPool典型模式----------条件2  
  21.                 if (r != null)  
  22.                     return r;  
  23.                 if (workerCanExit()) {       //--------------------------条件3  
  24.                     if (runState >= SHUTDOWN) // Wake up others  
  25.                         interruptIdleWorkers();  
  26.                     return null;  
  27.                 }  
  28.                 // Else retry  
  29.             } catch (InterruptedException ie) {  
  30.                 // On interruption, re-check runState  
  31.             }  
  32.         }  
  33.     }  

    从这个方法中,我们需要了解一下几点: 
    1.CachedThreadPool获得任务逻辑是条件1,条件1的处理逻辑请看注释,CachedThreadPool执行条件1的原因是:CachedThreadPool的corePoolSize时刻为0。 

    2.FixedThreadPool执行的逻辑为条件2,从’workQueue.take()’中我们就明白了为什么FixedThreadPool不会释放工作线程的原因了(除非你关闭线程池)。 

    最后,我们了解下Worker(工作线程)终止时的处理吧,这个对理解CachedThreadPool有帮助,具体代码如下: 
   

Java代码  

  1. /** 
  2.     * 工作线程退出要处理的逻辑 
  3.     * @param w 
  4.     */  
  5.    void workerDone(Worker w) {  
  6.        final ReentrantLock mainLock = this.mainLock;  
  7.        mainLock.lock();  
  8.        try {  
  9.            completedTaskCount += w.completedTasks;   
  10.            workers.remove(w);  //从工作线程缓存中删除  
  11.            if (--poolSize == 0) //poolSize减一,这时其实又可以创建工作线程了  
  12.                tryTerminate(); //尝试终止  
  13.        } finally {  
  14.            mainLock.unlock();  
  15.        }  
  16.    }  

    注意workDone()方法中的tyrTerminate()方法,它是你以后理解线程池中shuDown()以及CachedThreadPool原理的关键,具体代码如下:   

   

Java代码  

    1. private void tryTerminate() {  
    2.     //终止的前提条件就是线程池里已经没有工作线程(Worker)了  
    3.        if (poolSize == 0) {  
    4.            int state = runState;  
    5.            /** 
    6.             * 如果当前已经没有了工作线程(Worker),但是线程队列里还有等待的线程任务,则创建一个 
    7.             * 工作线程来执行线程队列中等待的任务 
    8.             */  
    9.            if (state < STOP && !workQueue.isEmpty()) {      
    10.                state = RUNNING; // disable termination check below  
    11.                Thread t = addThread(null);  
    12.                if (t != null)  
    13.                    t.start();  
    14.            }  
    15.            //设置池状态为终止状态  
    16.            if (state == STOP || state == SHUTDOWN) {  
    17.                runState = TERMINATED;  
    18.                termination.signalAll();   
    19.                terminated();   
    20.            }  
    21.        }  
    22.    }  

http://xtu-xiaoxin.iteye.com/blog/647744

 

前面一篇文章从Executors中的工厂方法入手,已经对ThreadPoolExecutor的构造和使用做了一些整理。而这篇文章,我们将接着前面的介绍,从源码实现上对ThreadPoolExecutor在任务的提交、执行,线程重用和线程数维护等方面做下分析。

0.    ThreadPoolExecutor类的声明属性变量分析


1

public class ThreadPoolExecutor extends AbstractExecutorService

从这个类声明中我们可以看到java.util.ThreadPoolExecutor是继承于AbstractExecutorService的,而之前的文章我也提到过,AbstractExecutorService已经实现了一些任务提交处理的方法,如submit()方法都是在这个抽象类中实现的。但submit()方法,最后也是会调用ThreadPoolExecutor的execute()方法。

打开SunJDK中的ThreadPoolExecutor类源码,除了上篇文章提到的一些和构造方法中参数对应的属性之外,让我们看看还有什么:

  • mainLock 对整个ThreadPoolExecutor对象的锁
  • workers  存储工作线程对应Worker对象的HashSet
  • termination 线程池ThreadPoolExecutor对象的生命周期终止条件,和mainLock相关
  • largestPoolSize 线程池跑过的最大线程数
  • completedTaskCount 完成任务数
  • ctl 执行器ThreadPoolExecutor的生命周期状态和活动状态的worker数封装

稍微需要说一下最后一个, ctl是一个AtomicInteger对象,以位运算的方式打包封装了当前线程池ThreadPoolExecutor对象的状态和活动线程数两个数据

1.    执行器状态

ExecutorService中已经指定了这个接口对应的类要实现的方法,其中就包括shutdown()和shutdownNow()等方法。在ThreadPoolExecutor中指明了状态的含义,并包含其于ctl属性中。

ThreadPoolExecutor对象有五种状态,如下:

  • RUNNING 在ThreadPoolExecutor被实例化的时候就是这个状态
  • SHUTDOWN 通常是已经执行过shutdown()方法,不再接受新任务,等待线程池中和队列中任务完成
  • STOP 通常是已经执行过shutdownNow()方法,不接受新任务,队列中的任务也不再执行,并尝试终止线程池中的线程
  • TIDYING 线程池为空,就会到达这个状态,执行terminated()方法
  • TERMINATED terminated()执行完毕,就会到达这个状态,ThreadPoolExecutor终结

2.    Worker内部类

它既实现了Runnable,同时也是一个AQS ( AbstractQueuedSynchronizer )。


1

2

3

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

封装了3样东西,Runnable类的首个任务对象,执行的线程thread和完成的任务数(volatile)completedTasks。


1

2

3

final Thread thread;

Runnable firstTask;

volatile long completedTasks;

这个类还提供了interruptIfStarted()这样一个方法,里面做了(getState()>= 0)的判断。与此呼应,Worker的构造方法里对state设置了-1,避免在线程执行前被停掉。


1

2

3

4

5

Worker(Runnable firstTask) {

    setState(-1); // inhibit interrupts until runWorker

    this.firstTask = firstTask;

    this.thread = getThreadFactory().newThread(this);

}

3. 提交任务

上篇文章已经提到了,提交新任务的时候,如果没达到核心线程数corePoolSize,则开辟新线程执行。如果达到核心线程数corePoolSize, 而队列未满,则放入队列,否则开新线程处理任务,直到maximumPoolSize,超出则丢弃处理。

这段源码逻辑如下,不细说了。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

 

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

    else if (!addWorker(command, false))

        reject(command);

}

4. addWorker()的实现

在上面提交任务的时候,会出现开辟新的线程来执行,这会调用addWorker()方法。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);

 

        // Check if queue empty only if necessary.

        if (rs >= SHUTDOWN &&

            ! (rs == SHUTDOWN &&

               firstTask == null &&

               ! workQueue.isEmpty()))

            return false;

 

        for (;;) {

            int wc = workerCountOf(c);

            if (wc >= CAPACITY ||

                wc >= (core ? corePoolSize : maximumPoolSize))

                return false;

            if (compareAndIncrementWorkerCount(c))

                break retry;

            c = ctl.get();  // Re-read ctl

            if (runStateOf(c) != rs)

                continue retry;

            // else CAS failed due to workerCount change; retry inner loop

        }

    }

 

    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try {

        final ReentrantLock mainLock = this.mainLock;

        w = new Worker(firstTask);

        final Thread t = w.thread;

        if (t != null) {

            mainLock.lock();

            try {

                // Recheck while holding lock.

                // Back out on ThreadFactory failure or if

                // shut down before lock acquired.

                int c = ctl.get();

                int rs = runStateOf(c);

 

                if (rs < SHUTDOWN ||

                    (rs == SHUTDOWN && firstTask == null)) {

                    if (t.isAlive()) // precheck that t is startable

                        throw new IllegalThreadStateException();

                    workers.add(w);

                    int s = workers.size();

                    if (s > largestPoolSize)

                        largestPoolSize = s;

                    workerAdded = true;

                }

            } finally {

                mainLock.unlock();

            }

            if (workerAdded) {

                t.start();

                workerStarted = true;

            }

        }

    } finally {

        if (! workerStarted)

            addWorkerFailed(w);

    }

    return workerStarted;

}

代码较长,我们可以分两大部分看:

第一段从第3行到第26行,是双层无限循环,尝试增加线程数到ctl变量,并且做一些比较判断,如果超出线程数限定或者ThreadPoolExecutor的状态不符合要求,则直接返回false,增加worker失败。

第二段从第28行开始到结尾,把firstTask这个Runnable对象传给Worker构造方法,赋值给Worker对象的task属性。Worker对象把自身(也是一个Runnable)封装成一个Thread对象赋予Worker对象的thread属性。锁住整个线程池并实际增加worker到workers的HashSet对象当中。成功增加后开始执行t.start(),就是worker的thread属性开始运行,实际上就是运行Worker对象的run方法。Worker的run()方法实际上调用了ThreadPoolExecutor的runWorker()方法。

5. 任务的执行runWorker()


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

final void runWorker(Worker w) {

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

        w.firstTask = null;

        w.unlock(); // allow interrupts

        boolean completedAbruptly = true;

        try {

            while (task != null || (task = getTask()) != null) {

                w.lock();

                // If pool is stopping, ensure thread is interrupted;

                // if not, ensure thread is not interrupted.  This

                // requires a recheck in second case to deal with

                // shutdownNow race while clearing interrupt

                if ((runStateAtLeast(ctl.get(), STOP) ||

                     (Thread.interrupted() &&

                      runStateAtLeast(ctl.get(), STOP))) &&

                    !wt.isInterrupted())

                    wt.interrupt();

                try {

                    beforeExecute(wt, task);

                    Throwable thrown = null;

                    try {

                        task.run();

                    } catch (RuntimeException x) {

                        thrown = x; throw x;

                    } catch (Error x) {

                        thrown = x; throw x;

                    } catch (Throwable x) {

                        thrown = x; throw new Error(x);

                    } finally {

                        afterExecute(task, thrown);

                    }

                } finally {

                    task = null;

                    w.completedTasks++;

                    w.unlock();

                }

            }

            completedAbruptly = false;

        } finally {

            processWorkerExit(w, completedAbruptly);

        }

    }

这段代码实际上就是执行提交给线程池执行的Runnable任务的实际内容。其中,值得注意的有以下几点:

  • 线程开始执行前,需要对worker加锁,完成一个任务后执行unlock()
  • 在任务执行前后,执行beforeExecute()和afterExecute()方法
  • 记录任务执行中的异常后,继续抛出
  • 每个任务完成后,会记录当前线程完成的任务数
  • 当worker执行完一个任务的时候,包括初始任务firstTask,会调用getTask()继续获取任务,这个方法调用是可以阻塞的
  • 线程退出,执行processWorkerExit(w, completedAbruptly)处理

5. Worker线程的复用和任务的获取getTask()

在上一段代码中,也就是runWorker()方法,任务的执行过程是嵌套在while循环语句块中的。每当一个任务执行完毕,会从头开始做下一次循环执行,实现了空闲线程的复用。而要执行的任务则是来自于getTask()方法:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

private Runnable getTask() {

        boolean timedOut = false; // Did the last poll() time out?

 

        retry:

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

 

            // Check if queue empty only if necessary.

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

                decrementWorkerCount();

                return null;

            }

 

            boolean timed;      // Are workers subject to culling?

 

            for (;;) {

                int wc = workerCountOf(c);

                timed = allowCoreThreadTimeOut || wc > corePoolSize;

 

                if (wc <= maximumPoolSize && ! (timedOut && timed))

                     break;

                if (compareAndDecrementWorkerCount(c))

                     return null;

                c = ctl.get();

                // Re-read ctl

                if (runStateOf(c) != rs)

                     continue retry;

                // else CAS failed due to workerCount change; retry inner loop

             }

             try {

                 Runnable r = timed ?

                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                     workQueue.take();

                 if (r != null)

                     return r;

                 timedOut = true;

             } catch (InterruptedException retry) {

                 timedOut = false;

             }

         }

     }

getTask()实际上是从工作队列(workQueue)中取提交进来的任务。这个workQueue是一个BlockingQueue,通常当队列中没有新任务的时候,则getTask()会阻塞。另外,还有定时阻塞这样一段逻辑:如果从队列中取任务是计时的,则用poll()方法,并设置等待时间为keepAlive,否则调用阻塞方法take()。当poll()超时,则获取到的任务为null,timeOut设置为 true。这段代码也是放在一个for(;;)循环中,前面有判断超时的语句,如果超时,则return null。这意味着runWorker()方法的while循环结束,线程将退出,执行processWorkerExit()方法。

回头看看是否计时是如何确定的。


1

2

int wc = workerCountOf(c);

timed = allowCoreThreadTimeOut || wc &gt; corePoolSize;

即判断当前线程池的线程数是否超出corePoolSize,如果超出这个值并且空闲时间多于keepAlive则当前线程退出。

另外一种情况就是allowCoreThreadTimeOut为true,就是允许核心在空闲超时的情况下停掉。

6. 线程池线程数的维护和线程的退出处理

刚刚也提到了,我们再看下processWorkerExit()方法。这个方法最主要就是从workers的Set中remove掉一个多余的线程。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

private void processWorkerExit(Worker w, boolean completedAbruptly) {

         if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

             decrementWorkerCount();

         final ReentrantLock mainLock = this.mainLock;

         mainLock.lock();

         try {

             completedTaskCount += w.completedTasks;

             workers.remove(w);

         } finally {

             mainLock.unlock();

         }

         tryTerminate();

         int c = ctl.get();

         if (runStateLessThan(c, STOP)) {

             if (!completedAbruptly) {

                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

                 if (min == 0 && ! workQueue.isEmpty())

                    min = 1;

                 if (workerCountOf(c) >= min)

                    return; // replacement not needed

            }

            addWorker(null, false);

        }

    }

这个方法的第二个参数是判断是否在runWorker()中正常退出了循环向下执行,如果不是,说明在执行任务的过程中出现了异常,completedAbruptly为true,线程直接退出,需要直接对活动线程数减1 。

之后,加锁统计完成的任务数,并从workers这个集合中移除当前worker。

执行tryTerminate(),这个方法后面会详细说,主要就是尝试将线程池推向TERMINATED状态。

最后比较当前线程数是不是已经低于应有的线程数,如果这个情况发生,则添加无任务的空Worker到线程池中待命。

以上,增加新的线程和剔除多余的线程的过程大概就是如此,这样线程池能保持额定的线程数,并弹性伸缩,保证系统的资源不至于过度消耗。

 

http://www.molotang.com/articles/522.html

 

时间: 2024-08-31 07:19:41

java thread reuse(good)的相关文章

Java数据对象(JDO)的应用

对象|数据 在本篇文章中,我们将详细地讨论Sun公司的Java数据对象(JDO)标准.JDO允许我们使用Java对象,支持事务和多用户.与ODBC不同的是,它使我们无需考虑SQL和与数据库有关的其他东西.它与串行化也有所区别,因为它支持多个用户和事务.JDO允许Java开发人员将他们的数据模型用作数据模型,无需在"数据端"."对象端"之间的转移方面花费大量的时间. 包括CocoBase.WebGain TOPLink和Castor JDO在内的多种产品都可以实现了J

java线程编程(一):线程基础

在学习java中,我发现有关于对线程的讲解比较少,我打算为一些java初学者提一些关于线程方面的参考, 为深入学习java奠定基础.我本着共同进步的原则特写下了关于java线程编程的一系列文章 java线程编程(一):线程基础 ◆线程(thread)其实是控制线程(thread of control)的缩写. 每一个线程都是独立的,因此线程中的每个方法的局部变量都是和其他线程隔离开的,这些变量完全是私有的,因此对于 线程而言,是没有办法访问其他线程的局部变量的.如果两个线程同时访问同一个方法,则

Java集合细节(三):subList的缺陷

我们经常使用subString方法来对String对象进行分割处理,同时我们也可以使用subList.subMap.subSet来对List.Map.Set进行分割处理,但是这个分割存在某些瑕疵. 一.subList返回仅仅只是一个视图 首先我们先看如下实例: public static void main(String[] args) { List<Integer> list1 = new ArrayList<Integer>(); list1.add(1); list1.add

JAVA之旅(十三)——线程的安全性,synchronized关键字,多线程同步代码块,同步函数,同步函数的锁是this

JAVA之旅(十三)--线程的安全性,synchronized关键字,多线程同步代码块,同步函数,同步函数的锁是this 我们继续上个篇幅接着讲线程的知识点 一.线程的安全性 当我们开启四个窗口(线程)把票陆陆续续的卖完了之后,我们要反思一下,这里面有没有安全隐患呢?在实际情况中,这种事情我们是必须要去考虑安全问题的,那我们模拟一下错误 package com.lgl.hellojava; import javax.security.auth.callback.TextInputCallback

【面试虐菜】—— JAVA面试题(1)

今天参加笔试,里面有设计模式,和一些基础题! 印象最深的是:什么不是Object的函数,我蒙的finalize,哎,无知! 还问了,接口与抽象类的不同,还有多线程的实现方式!下面基本都有. 另外还问了,观察者模式,设计模式,我一点不会,看来要下功夫了! 1.  面向对象编程的三大特性是什么,请简要阐述 (1).继承: 继承是一种联结类的层次模型,并且允许和鼓励类的重用,它提供了一种明确表述共性的方法.对象的一个新类可以从现有的类中派生,这个过程称为类继承.新类继 承了原始类的特性,新类称为原始类

【面试虐菜】—— JAVA面试题(3)

1 throws与throw的区别 解析:throws和throw是异常处理时两个常见的关键字,初级程序员常常容易正确理解throw和throws的作用和区别,说明已经能比较深入理解异常处理.Throw用来抛出异常,如果执行了throw语句,程序将发生异常,进入到异常处理机制.Throws用来声明异常,说明这个方法可能会发生某些类型的异常,那么编译器将强制在调用这个方法的时候处理异常.API中的很多方法都使用了throws声明了异常,所以使用这些方法时编译器会提示需要处理异常.   参考答案:t

JAVA之旅(四)——面向对象思想,成员/局部变量,匿名对象,封装 , private,构造方法,构造代码块

JAVA之旅(四)--面向对象思想,成员/局部变量,匿名对象,封装 , private,构造方法,构造代码块 加油吧,节奏得快点了 1.概述 上篇幅也是讲了这点,这篇幅就着重的讲一下思想和案例 就拿买电脑来说吧,首先,你不懂电脑,你去电脑城买电脑,和大象装冰箱里一样,是什么步骤?咨询 砍价 ,谈妥了就那电脑走人,对吧,这就是面向过程的思想,而面向对象是:你有一个哥们,他懂电脑,什么都会,你只要带他去,就行,你这个哥们就是对象,在JAVA中,我们就是操作一个对象去完成各种各样的操作的,这就是面向对

值的关注的Java开源项目(原创)

项目|原创 值的关注的Java开源项目   名称 资料 概况 OFBiz http://ofbizchina.com:8080/ http://www.ofbiz.org/ https://ofbiz.dev.java.net/ OFBiz是一个非常著名的开源项目,提供了创建基于最新J2EE/XML规范和技术标准,构建大中型企业级.跨平台.跨数据库.跨应用服务器的多层.分布式电子商务类WEB应用系统的框架.     OFBiz最主要的特点是OFBiz提供了一整套的开发基于Java的web应用程序

Java与XML(二)用java编写xml的读写程序

xml|程序 Java与XML(二)用java编写xml的读写程序 这是读取xml文件的java程序,我调试好的.采用的是dom方式读取xml文件到Vector中.package src;import java.io.*;import java.util.Vector;import javax.xml.parsers.*;import org.w3c.dom.*;public class readxml { static Document document; private boolean va