浅谈JAVA ThreadPoolExecutor(转)

这篇文章分为两部分,前面是ThreadPoolExecutor的一些基本知识,后一部分则是Mina中一个特殊的ThreadPoolExecutor代码解析。算是我的Java学习笔记吧。

 

基础

在我看来,java比C++的一个大好处就是提供了对多线程的支持(C++只有多线程的库,语言本身不包含线程的概念)。而其中我最爱用的就是ThreadPoolExecutor这个类,它实现了一个非常棒的thread pool。
thread pool一般被用来解决两个问题:当处理大量的同步task的时候,它能够避免thread不断创建销毁的开销;而另外一个也许更重要的含义是,它其实表示了一个boundary,通过使用thread pool可以限制这些任务所消耗的资源,比如最大线程数,比如最大的消息缓冲池。
需要指出的是,ThreadPoolExecutor不仅仅是简单的多个thread的集合,它还带有一个消息队列。

在Java中,如果只是需要一个简单的thread pool,ExecuteService可能更为合适,这是一个Interface。可以通过调用Executor的静态方法来获得一些简单的threadpool,如:

[java] view plain copy

 

  1. ExecuteService pool = Executors.newFixedThreadPool(poolSize);  

 

但如果要用定制的thread pool,则要使用ThreadPoolExecutor类,这是一个高度可定制的线程池类,下面是一些重要的参数和方法:

 

corePoolSize 和 maxPoolSize

这两个参数其实和threadpool的调度策略密切相关:
如果poolsize小于coresize,那么只要来了一个request,就新创建一个thread来执行;
如果poolsize已经大于或等于coresize,那么来了一个request后,就放进queue中,等来线程执行;
一旦且只有queue满了,才会又创建新的thread来执行;
当然,coresize和maxpoolsize可以在运行时通过set方法来动态的调节;
(queue如果是一个确定size的队列,那么很有可能发生reject request的事情(因为队列满了)。很多人会认为这样的系统不好。但其实,reject request很多时候是个好事,因为当负载大于系统的capacity的时候,如果不reject request,系统会出问题的。)

 

ThreadFactory 
可以通过设置默认的ThreadFactory来改变threadpool如何创建thread

keep-alive time 
如果实际的线程数大于coresize,那么这些超额的thread过了keep-alive的时间之后,就会被kill掉。这个时间是可以动态设定的;

queue 
任何一个BlockingQueue都可以做为threadpool中的队列,又可以分为三种:
AsynchronousQueue,采用这种queue,任何的task会被直接交到thread手中,queue本身不缓存任何的task,所以如果所有的线程在忙的话,新进入的task是会被拒绝的;
LinkedBlockingQueue,queue的size是无限的,根据前面的调度策略可知,thread的size永远也不会大于coresize;
ArrayBlockingQueue,这其实是需要仔细调整参数的一种方式。因为通过设定maxsize和queuesize,其实就是设定这个threadpool所能使用的resource,然后试图达到一种性能的最优;(Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput. )

 

此外,还有诸如beforeExecute,afterExecute等方法可以被重写。以上的这些内容其实都可以在ThreadPoolExecutor的javadoc中找到。应该说,ThreadPoolExecutor是可以非常灵活的被设置的,只除了一点,你没办法改变它的调度策略。

 

一个实例

通过分析一个特殊的ThreadPoolExeuctor的源代码,能够更好的理解它的内部机制和灵活性。

Mina中有一个特殊的ThreadPoolExecutor--org.apache.mina.filter.executor.OrderedThreadPoolExecutor。
这个executor是用来处理从网络中来的请求。它的不同之处在于,对于同一个session来的请求,它能够按照请求到达的时间顺序的执行。举个例子,在一个session中,如果先接收到request A,然后再接收到request B,那么,OrderedThreadPoolExecutor能够保证一定处理完A之后再处理B。而一般的thread pool,会将A和B传递给不同的thread处理,很有可能request B会先于request A完成。

 

先看看它的构造函数:

 

[java] view plain copy

 

  1. public OrderedThreadPoolExecutor(  
  2.             int corePoolSize, int maximumPoolSize,  
  3.             long keepAliveTime, TimeUnit unit,  
  4.             ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler) {  
  5.         // We have to initialize the pool with default values (0 and 1) in order to  
  6.         // handle the exception in a better way. We can't add a try {} catch() {}  
  7.         // around the super() call.  
  8.         super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit,   
  9.             new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());  
  10.         if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {  
  11.             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);  
  12.         }  
  13.         if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {  
  14.             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);  
  15.         }  
  16.         // Now, we can setup the pool sizes  
  17.         super.setCorePoolSize( corePoolSize );  
  18.         super.setMaximumPoolSize( maximumPoolSize );  
  19.           
  20.         // The queueHandler might be null.  
  21.         if (eventQueueHandler == null) {  
  22.             this.eventQueueHandler = IoEventQueueHandler.NOOP;  
  23.         } else {  
  24.             this.eventQueueHandler = eventQueueHandler;  
  25.         }  
  26. }  

 

这里比较意外的是,它竟然用的是SynchronousQueue?! 也就是说,来了一个task,不会被放入Queue中,而是直接送给某个thread。这和一般的threadpoolExecutor是非常不一样的,因为一旦thread全用满了,task就不能再被接受了。后面我们会看到为什么使用SynchronousQueue。

 

再看看它的execute函数:

 

[java] view plain copy

 

  1. public void execute(Runnable task) {  
  2.     if (shutdown) {  
  3.         rejectTask(task);  
  4.     }  
  5.     // Check that it's a IoEvent task  
  6.     checkTaskType(task);  
  7.     IoEvent event = (IoEvent) task;  
  8.       
  9.     // Get the associated session  
  10.     IoSession session = event.getSession();  
  11.       
  12.     // Get the session's queue of events  
  13.     SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);  
  14.     Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;  
  15.       
  16.     boolean offerSession;  
  17.     boolean offerEvent = eventQueueHandler.accept(this, event);  
  18.       
  19.     if (offerEvent) {  
  20.         // Ok, the message has been accepted  
  21.         synchronized (tasksQueue) {  
  22.             // Inject the event into the executor taskQueue  
  23.             tasksQueue.offer(event);  
  24.               
  25.             if (sessionTasksQueue.processingCompleted) {  
  26.                 sessionTasksQueue.processingCompleted = false;  
  27.                 offerSession = true;  
  28.             } else {  
  29.                 offerSession = false;  
  30.             }  
  31.             //.......  
  32.         }  
  33.     } else {  
  34.         offerSession = false;  
  35.     }  
  36.     if (offerSession) {  
  37.         waitingSessions.offer(session);  
  38.     }  
  39.     addWorkerIfNecessary();  
  40.     //..............  

 

这里有几点需要解释的:
首先是getSessionTaskQueue函数。从这个函数可以看出,对于每一个session,都创建了一个queue来存储它的task。也就是说,同一个session的task被放在了同一个queue中。这是非常关键的地方,后面会看到,正是这个queue保证了同一个session的task能够按照顺序来执行;
其次是waitingSessions.offer(session)这条语句。waitingSessions是OrderedThreadPoolExecutor的一个私有成员,它也是一个queue: BlockingQueue<IoSession> waitingSessions ...;
这个queue里面放的是该threadpool所接收到的每个task所对应的Session,并且,如果两个task对应的是同一个session,那么这个session只会被放进waitingSessions中一次。waitingSession.offer(session)这条语句就是要将session放进queue。而offerSession这个变量和前面的十几行代码就是在判断task所对应的session是否要放入到queue中;
最后一行代码addWorkerIfNecessary();字面上很容易理解,就是判断是否添加worker。可是,worker又是什么呢?

 

看看Worker这个类:

 

[java] view plain copy

 

  1. private class Worker implements Runnable {  
  2.         private volatile long completedTaskCount;  
  3.         private Thread thread;  
  4.           
  5.         public void run() {  
  6.             thread = Thread.currentThread();  
  7.             try {  
  8.                 for (;;) {  
  9.                     IoSession session = fetchSession();  
  10.                     //..........  
  11.                     try {  
  12.                         if (session != null) {  
  13.                             runTasks(getSessionTasksQueue(session));  
  14.                         }  
  15.                     } finally {  
  16.                         idleWorkers.incrementAndGet();  
  17.                     }  
  18.                 }  
  19.             } finally {  
  20.                 //.......  
  21.             }  
  22.         }  
  23.         private IoSession fetchSession() {  
  24.             //........  
  25.             for (;;) {  
  26.                 try {  
  27.                     try {  
  28.                         session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);  
  29.                         break;  
  30.                     } finally {  
  31.                         //..............  
  32.                     }  
  33.                 } catch (InterruptedException e) {  
  34.                     //........  
  35.                 }  
  36.             }  
  37.             return session;  
  38.         }  
  39.         private void runTasks(SessionTasksQueue sessionTasksQueue) {  
  40.             for (;;) {  
  41.                 //......  
  42.                 runTask(task);  
  43.             }  
  44.         }  
  45.         private void runTask(Runnable task) {  
  46.             beforeExecute(thread, task);  
  47.             boolean ran = false;  
  48.             try {  
  49.                 task.run();  
  50.                 ran = true;  
  51.                 afterExecute(task, null);  
  52.                 completedTaskCount ++;  
  53.             } catch (RuntimeException e) {  
  54.                 if (!ran) {  
  55.                     afterExecute(task, e);  
  56.                 }  
  57.                 throw e;  
  58.             }  
  59.         }  
  60.     }  

 

在Worker.run()中,一开始就调用fetchSession(),这个函数从WaitingSessions这个queue中拿出一个Session。然后又调用了runTasks,这个函数会将Session中的那个TaskQueue中的每个Task挨个执行一遍。

 

OK,现在OrderedThreadPoolExecutor的整体设计就清晰了:

从外面看上去,OrderedThreadPoolExecutor只是一个thread pool,但本质上,它是有由两个thread pool拼接而成, 只不过后一个thread pool被隐藏在了类的内部实现中。第一个thread pool中的thread只需要完成很简单的一个任务,即将接收到的task对应的session添加到waitingSessions中(如果需要的话)。正因为如此,所以第一个threadpool的queue被设置成了SynchronousQueue。而后一个thread pool中的那些worker(也是一些thread)才真正的执行task。并且,后一个thread pool所能创建的thread的数量也受到了coreSize和MaxSize的限制。所以,整个OrderedThreadPoolExecutor实际上创建了2 * coreSize的thread。

前面的解释可能有些乱,再重新梳理整个OrderedThreadPoolExecutor的执行流程:
1. 当一个task被接收,前一个thread pool中的某个thread被指定负责处理这个task;
2. thread会找到task所对应的session,将这个task放入该session的TaskQueue中;
3. 如果该session已经被放入了waitingSessions,那么什么都不做,否则,将该session放入waitingSessions中;
4. 后一个threadpool中的某一个worker从waitingSessions中将该Session取出;
5. 找到该Session中的TaskQueue,依次执行queue中的task;

 

总结

总的来说,Java的TheadPoolExecutor整体架构设计的很具有扩展性,可以通过继承改写来实现不同的各具功能的threadpool,唯一的缺点就是它的调度策略是不能够改变的,但很多时候一个threadpool的调度策略会对系统性能产生很大的影响。所以,如果ThreadPoolExecutor的调度策略不适合你的话,就只能手工再造个“轮子”了。
另外,如果读过SOSP01年的“SEDA: An Architecture for Well-Conditioned, Scalable Internet Services”,那么会发现Java中的ThreadPoolExecutor非常类似于SEDA中的Stage概念。虽然我没有找到总够的证据,但是从时间的顺序看,java1.5版才加入的ThreadPoolExecutor很可能受到了01年这篇论文的启发。

 

http://blog.csdn.net/historyasamirror/article/details/5961368

时间: 2024-11-05 16:39:15

浅谈JAVA ThreadPoolExecutor(转)的相关文章

浅谈java 执行jar包中的main方法_java

浅谈java 执行jar包中的main方法 通过 OneJar 或 Maven 打包后 jar 文件,用命令: java -jar ****.jar 执行后总是运行指定的主方法,如果 jar 中有多个 main 方法,那么如何运行指定的 main 方法呢? 用下面的命令试试看: java -classpath ****.jar ****.****.className [args] "****.****"表示"包名": "className"表示&

浅谈java异常链与异常丢失_java

1.在java的构造方法中提供了 异常链.. 也就是我们可以通过构造方法不断的将 异常串联成一个异常链...   之所以需要异常连,是因为处于代码的可理解性,以及阅读和程序的可维护性...  我们知道我们每抛出一个异常都需要进行try catch ...那么岂不是代码很臃肿... 我们如果可以将异常串联成一个异常连,然后我们只捕获我们的包装 异常,我们知道 RuntimeException 以及其派生类可以不进行try catch 而被jvm自动捕获并处理.. 当然了我们可以自己定义自己的异常类

浅谈java中异步多线程超时导致的服务异常_java

在项目中为了提高大并发量时的性能稳定性,经常会使用到线程池来做多线程异步操作,多线程有2种,一种是实现runnable接口,这种没有返回值,一种是实现Callable接口,这种有返回值. 当其中一个线程超时的时候,理论上应该不 影响其他线程的执行结果,但是在项目中出现的问题表明一个线程阻塞,其他线程返回的接口都为空.其实是个很简单的问题,但是由于第一次碰到,还是想了一些时间的.很简单,就是因为阻塞的那个线 程没有释放,并发量一大,线程池数量就满了,所以其他线程都处于等待状态. 附上一段自己写的调

浅谈Java 对于继承的初级理解_java

概念:继承,是指一个类的定义可以基于另外一个已存在的类,即子类继承父类,从而实现父类的代码的重用.两个类的关系:父类一般具有各个子类共性的特征,而子类可以增加一些更具个性的方法.类的继承具有传递性,即子类还可以继续派生子类,位于上层的类概念更加抽象,位于下层的类的概念更加具体. 1.定义子类: 语法格式 [修饰符] class 子类名 extends 父类名{ 子类体 } 修饰符:public private protected default 子类体是子类在继承父类的内容基础上添加的新的特有内

浅谈java异常处理(父子异常的处理)_java

我当初学java异常处理的时候,对于父子异常的处理,我记得几句话"子类方法只能抛出父类方法所抛出的异常或者是其子异常,子类构造器必须要抛出父类构造器的异常或者其父异常".那个时候还不知道子类方法为什么要这样子抛出异常,后来通过学习<Thinking in Java>,我才明白其中的道理,现在我再来温习一下. 一.子类方法只能抛出父类方法的异常或者是其子异常 对于这种限制,主要是因为子类在做向上转型的时候,不能正确地捕获异常 package thinkinginjava; p

浅谈java异常处理之空指针异常_java

听老师说,在以后的学习中大部分的异常都是空指针异常.所以抽点打游戏的时间来查询一下什么是空指针异常 一:空指针异常产生的主要原因如下: (1)当一个对象不存在时又调用其方法会产生异常obj.method() // obj对象不存在 (2)当访问或修改一个对象不存在的字段时会产生异常obj.method() // method方法不存在 (3)字符串变量未初始化: (4)接口类型的对象没有用具体的类初始化,比如: List lt:会报错 List lt = new ArrayList():则不会报

浅谈Java反射与代理_java

Java反射机制与动态代理,使得Java更加强大,Spring核心概念IoC.AOP就是通过反射机制与动态代理实现的. 1 Java反射 示例: User user = new User(); user.setTime5Flag("test"); Class<?> cls = Class.forName("com.test.User"); //接口必须public,无论是否在本类内部使用!或者使用cls.getDeclaredMethod(),或者遍历修

浅谈java中BigDecimal的equals与compareTo的区别_java

这两天在处理支付金额校验的时候出现了点问题,有个金额比较我用了BigDecimal的equals方法来比较两个金额是否相等,结果导致金额比较出现错误(比如3.0与3.00的比较等). [注:以下所讲都是以sun jdk 1.4.2版本为例,其他版本实现未必一致,请忽略] 首先看一下BigDecimal的equals方法: public boolean equals(Object x){ if (!(x instanceof BigDecimal)) return false; BigDecima

浅谈java+内存分配及变量存储位置的区别_java

Java内存分配与管理是Java的核心技术之一,之前我们曾介绍过Java的内存管理与内存泄露以及Java垃圾回收方面的知识,今天我们再次深入Java核心,详细介绍一下Java在内存分配方面的知识.一般Java在内存分配时会涉及到以下区域: ◆寄存器:我们在程序中无法控制 ◆栈:存放基本类型的数据和对象的引用,但对象本身不存放在栈中,而是存放在堆中(new 出来的对象) ◆堆:存放用new产生的数据 ◆静态域:存放在对象中用static定义的静态成员 ◆常量池:存放常量 ◆非RAM存储:硬盘等永久