多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的例子:
假设在一台服务器完成一项任务的时间为T
T1 创建线程的时间 T2 在线程中执行任务的时间,包括线程间同步所需时间 T3 线程销毁的时间
显然T = T1+T2+T3。注意这是一个极度简化的假设。
可以看出T1,T3是多线程本身的带来的开销,我们渴望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到 这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点(T1,T3),而不是优点(并发性)。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目。在看一个例子:
假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。我们比较利用线程池技术和不利于线程池技术的服务器 处理这些请求时所产生的线程总数。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目或者上限(以下简称线程池尺寸),而如果 服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池尺寸是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处 理请求时浪费时间,从而提高效率。
这些都是假设,不能充分说明问题,下面我将讨论线程池的简单实现并对该程序进行对比测试,以说明线程技术优点及应用领域。
一般一个简单线程池至少包含下列组成部分
线程池管理器(ThreadPoolManager):用于创建并管理线程池
工作线程(WorkThread): 线程池中线程
任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
任务队列:用于存放没有处理的任务。提供一种缓冲机制。
线程池管理器至少有下列功能:创建线程池,销毁线程池,添加新任务。下面就是小弟的实现,还是欢迎拍砖哈:
代码如下 | 复制代码 |
public class ThreadPoolManager { private static ThreadPoolManager instance = null; private List<Upload> taskQueue = Collections.synchronizedList(new LinkedList<Upload>());//任务队列 private WorkThread[] workQueue ; //工作线程(真正执行任务的线程) private static int worker_num = 5; //工作线程数量(默认工作线程数量是5) private static int worker_count = 0; private ThreadPoolManager(){ this(5); } private ThreadPoolManager(int num){ worker_num = num; workQueue = new WorkThread[worker_num]; for(int i=0;i<worker_num;i++){ workQueue[i] = new WorkThread(i); } } public static synchronized ThreadPoolManager getInstance(){ if(instance==null) instance = new ThreadPoolManager(); return instance; } public void addTask(Upload task){ //对任务队列的操作要上锁 synchronized (taskQueue) { if(task!=null){ taskQueue.add(task); taskQueue.notifyAll(); System.out.println("task id "+task.getInfo() + " submit!"); } } } public void BatchAddTask(Upload[] tasks){ //对任务队列的修改操作要上锁 synchronized (taskQueue) { for(Upload e:tasks){ if(e!=null){ taskQueue.add(e); taskQueue.notifyAll(); System.out.println("task id "+e.getInfo() + " submit!"); } } } } public void destory(){ System.out.println("pool begins to destory ..."); for(int i = 0;i<worker_num;i++){ workQueue[i].stopThread(); workQueue[i] = null; } //对任务队列的操作要上锁 synchronized (taskQueue) { taskQueue.clear(); } System.out.println("pool ends to destory ..."); } private class WorkThread extends Thread{ private int taksId ; private boolean isRuning = true; private boolean isWaiting = false; public WorkThread(int taskId){ this.taksId= taskId; this.start(); } public boolean isWaiting(){ return isWaiting; } // 如果任务进行中时,不能立刻终止线程,需要等待任务完成之后检测到isRuning为false的时候,退出run()方法 public void stopThread(){ isRuning = false; } @Override public void run() { while(isRuning){ Upload temp = null; //对任务队列的操作要上锁 synchronized (taskQueue) { //任务队列为空,等待新的任务加入 while(isRuning&&taskQueue.isEmpty()){ try { taskQueue.wait(20); } catch (InterruptedException e) { System.out.println("InterruptedException occre..."); e.printStackTrace(); } } if(isRuning) temp = taskQueue.remove(0); } //当等待新任务加入时候,终止线程(调用stopThread函数)造成 temp = null if(temp!=null){ System.out.println("task info: "+temp.getInfo()+ " is begining"); isWaiting = false; temp.uploadPic(); isWaiting = true; System.out.println("task info: "+temp.getInfo()+ " is finished"); } } } } } |
然后定义任务接口(Task):这里我定义的是上传图片的功能接口(这里用抽象类或者接口随你自己).
代码如下 | 复制代码 |
Upload public abstract class Upload { protected String info; abstract boolean uploadPic(); public String getInfo(){ return info; } } |
然后定义具体任务类:我这里简单,让它睡眠2s。当然你也可以定义很多实现Upload的任务类。
代码如下 | 复制代码 |
TaskUpload public class TaskUpload extends Upload { public TaskUpload(String info){ this.info = info; } public String getInfo(){ return info; } @Override public boolean uploadPic() { // TODO Auto-generated method stub System.out.println(info+"sleep begin ...."); try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(info+"sleep end ...."); return false; } } |
最后,测试这个简单的线程池:
代码如下 | 复制代码 |
public class ThreadPoolManagerTest { public static void main(String[] args) { |