线程池运用实例——一次错误的多线程程序设计以及修复过程

写在前面的话 

写下这篇文章只为了回顾之前在实际工作中犯的一个极其二逼的错误,用我的经历来提示后来者,诸位程序大神,大牛,小牛们看到此文笑笑即可,轻拍轻拍。。。

1 背景

有这么一个需求,我们的系统(后面简称:A系统)需要在后台执行一个报表导出任务,在这个任务的执行过程中需要通过CORBA调用其他系统(后面简称:B系统)的一个(也有可能是多个)接口去查询报表,待结果返回后,将这些结果写入Excel。这个需求是不是很简单?套用网上一些FutureTask或者线程池的例子一两小时就能搞定这个需求。当时我也是这样认为的,可谁想,这是一个巨大的坑….

2 初始设计

用过CORBA的同学会知道,如同数据库连接一样,CORBA的连接数也是是有限的,如果一个接口调用的时间过长,就会长时间占用CORBA有限的连接数,当这种长时间的同步调用过多时就会造成整个系统CORBA调用的阻塞,进而造成系统停止响应。由于查询操作很耗时,为了避免这种情况的发生,这个接口被设计成了一个异步接口。任务的执行流程就会是这样:任务开始执行,接着调用这个接口并且通过CORBA向B系统订阅一个事件,然后任务进入等待状态,当B系统执行完成后,会向A系统发送一个事件告知执行的结果,任务收到事件后重新开始执行直到结束,如图:

既然说到了事件,那么很自然而然的就想到了使用回调的方式去响应事件,并且为了避免事件超时(也就是长时间没有接收到事件)导致任务长时间等待,我还使用了一个定时的任务去检查任务的状态。所以我的程序看起来就像这样:

IEventFuture.java

1 public interface IEventFuture {
2     void onEventReceived(Event event);
3 }

ExportRptTask.java

01 public class ExportRptTask implements Callable<Void>, IEventFuture {
02     private static final int INITIALIZED = 0;
03     private static final int RUNNING = 1;
04     private static final int COMPLETED = 2;
05     private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
06     private Date lastUpdate = new Date();
07     private volatile int state = INITIALIZED;
08  
09     private Timer timer = new Timer();
10     private SystemBSer systemBSer = new SystemBSer();
11  
12     private int eventId = -1;
13  
14     @Override
15     public Void call() throws Exception {
16         this.state = RUNNING;
17         try {
18             systemBSer.doQuery();
19             subscribeEvent();
20             startTaskTimeoutMonitorTask();
21             Future future = createEventFuture();
22             future.get();
23         } catch (Throwable t) {
24             onTaskError(t);
25         } finally {
26             EventManager.unsubscribe(this.eventId);
27             timer.cancel();
28         }
29         return null;
30     }
31  
32     @Override
33     public void onEventReceived(Event event) {
34         this.lastUpdate = new Date();
35 // start to write excel
36 // .....
37 // end to write excel
38         this.state = COMPLETED;
39     }
40  
41     private void subscribeEvent() {
42         this.eventId = EventManager.subscribe(this);
43     }
44  
45     private Future createEventFuture() {
46         FutureTask<Void> listenFuture = new FutureTask<Void>(new Callable<Void>() {
47  
48             @Override
49             public Void call() throws Exception {
50                 while (state != COMPLETED) {
51  
52                 }
53                 return null;
54             }
55         });
56  
57         new Thread(listenFuture).start();
58         return listenFuture;
59     }
60  
61     private void startTaskTimeoutMonitorTask() {
62         timer.scheduleAtFixedRate(new TimerTask() {
63             @Override
64             public void run() {
65  
66                 if (state != COMPLETED || new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
67                     onTaskTimeout();
68                 }
69             }
70         }, 0, 15 * 60 * 1000);
71     }
72  
73     private void onTaskTimeout() {
74         // do something on task timeout.
75         //   ....
76         // end
77  
78         // set task to completed to end task.
79         this.state = COMPLETED;
80     }
81  
82     private void onTaskError(Throwable t) {
83 // do something to handle error.
84     }
85 }

3 升级改进

由于做这个需求的关系,我开始阅读一些关于JAVA多线程编程的一下教程,在阅读到关于闭锁的内容时,我突然灵光一现,这玩意不正好可以代替我那个丑陋的使用循环来让任务进入等待状态的实现么?然后我的程序就变成了这样:

ExportRptTask.java

01 public class ExportRptTask implements Callable<Void>, IEventFuture {
02     private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
03     private Date lastUpdate = new Date();
04  
05     private CountDownLatch endGate = new CountDownLatch(1);
06     private Timer timer = new Timer();
07     private SystemBSer systemBSer = new SystemBSer();
08  
09     private int eventId = -1;
10  
11     @Override
12     public Void call() throws Exception {
13         try {
14             systemBSer.doQuery();
15             subscribeEvent();
16             endGate.await();
17             startTaskTimeoutMonitorTask();
18         } catch (Throwable t) {
19             onTaskError(t);
20         } finally {
21             EventManager.unsubscribe(this.eventId);
22             timer.cancel();
23         }
24         return null;
25     }
26  
27     @Override
28     public void onEventReceived(Event event) {
29         this.lastUpdate = new Date();
30 // start to write excel
31 // .....
32 // end to write excel
33         this.endGate.countDown();
34     }
35  
36     private void subscribeEvent() {
37         this.eventId = EventManager.subscribe(this);
38     }
39  
40     private void startTaskTimeoutMonitorTask() {
41         timer.scheduleAtFixedRate(new TimerTask() {
42             @Override
43             public void run() {
44  
45                 if (new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
46                     onTaskTimeout();
47                 }
48             }
49         }, 0, 15 * 60 * 1000);
50     }
51  
52     private void onTaskTimeout() {
53 // do something on task timeout.
54 //   ....
55 // end
56  
57 // set task to completed to end task.
58         this.endGate.countDown();
59     }
60  
61     private void onTaskError(Throwable t) {
62 // do something to handle error.
63     }
64 }

4 问题浮现

正在我为我使用高大上的闭锁代替循环沾沾自喜的时候,测试大爷告诉我,任务经常莫名其妙的失败,并且日志中没有任何异常。开始,这让我觉得很不可思议,因为我已经在call()方法处处理了所有的异常,任务失败时至少也应该有个日志啥的吧。这个问题一直困扰着我,直到有一天分析日志我突然发现任务执行的工作线程(也就是call()方法所在的线程)和接收到事件后的回调并不是同一个线程。这就意味着在查询到报表结果后,所有写Excel,分发结果等等的操作都是在事件回调的线程中执行的,那么一旦这里发生异常原来call()中的catch块自然无法捕获,然后异常就被莫名其妙的吞掉了。好吧,我承认我之前对线程池也就了解点皮毛,对多线程也仅仅是有个概念,想当然的认为在线程池中可以Hold住任务的一切,包括响应这个任务在执行过程中创建的其他线程运行时发生的异常。而且更严重的是按照原来的实现,只有当整个任务执行完成(包括写完Excel)后,才会释放那个闭锁,所以一旦事件回调发生异常,那么整个任务都无法终止。在线程池中发生一个任务永远无法终止的后果,你懂的。

5 重新设计

痛定思痛,我决定重新梳理这个任务的流程。这个需求的难点就是在如何监听并响应B系统给我们发送的事件,实际上,这是一个很经典的生产者–消费者问题,而阻塞队列正好是解决这类问题的利器。重新设计的事件响应流程就变成:当B系统发送事件的时候,事件回调线程会往阻塞队列里面填充一个事件。在另一方面,任务调用完B系统的查询接口后,就开始从阻塞队列中取事件,当事件队列为空的时候,取事件的线程(也就是线程池执行任务的工作线程)会被阻塞。并且,阻塞队列的取操作可以设置超时时间,所以当取到的事件对象为空时,就意味着事件超时了,这样就省去了使用定时任务定时检查任务状态的工作。重新设计的程序是这样的:

EventProxy.java

01 public class EventProxy implements IEventFuture {
02     private static final BlockingQueue<Event> eventQueue = new ArrayBlockingQueue<Event>(10);
03     private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
04  
05     @Override
06     public void onEventReceived(Event event) {
07         eventQueue.offer(event);
08     }
09  
10     public Event getEvent() throws InterruptedException {
11         return eventQueue.poll(TASK_TIME_OUT_TIME, TimeUnit.MILLISECONDS);
12     }
13 }

ExportRptTask.java

01 public class ExportRptTask3 implements Callable<Void> {
02  
03     private SystemBSer systemBSer = new SystemBSer();
04     private EventProxy eventProxy = new EventProxy();
05  
06     private int eventId = -1;
07  
08     @Override
09     public Void call() throws Exception {
10         try {
11             systemBSer.doQuery();
12             subscribeEvent();
13  
14             Event event = eventProxy.getEvent();
15             if (event != null) {
16                 processEvent(event);
17             } else {
18                 onTaskTimeout();
19             }
20         } catch (Throwable t) {
21             onTaskError(t);
22         } finally {
23             EventManager.unsubscribe(this.eventId);
24         }
25         return null;
26     }
27  
28     private void subscribeEvent() {
29         this.eventId = EventManager.subscribe(eventProxy);
30     }
31  
32     private void processEvent(Event event) {
33 // do something on receive event.
34     }
35  
36     private void onTaskTimeout() {
37 // do something on task timeout.
38 //   ....
39 // end
40     }
41  
42     private void onTaskError(Throwable t) {
43 // do something to handle error.
44     }
45 }

6 总结

相信各位并发编程的大牛们能在一瞬间就可以把我的程序(包括改进后的)批得体无完肤,不过我还是想分享下我在这个过程中的收获。

  • 在动手写程序前,请先理解你的需求,特别是要注意用已有的模型去识别问题,在本例中,我就是没有识别响应事件的流程其实是个生产者–消费者问题导致了后面的错误
  • 请充分的了解你需要使用的技术和工具。比如,使用线程池你就要了解线程池的工作原理,这样你才能正确的使用这些技术。做技术切忌想当然。
  • 在使用线程池时,重要的操作尽量放在任务的主线程中执行(也就是call()/run()方法所在的线程),否则线程池本身难以对任务进行控制。
  • 如果一定要在任务中再创建新的线程,请确保任务主线程是任务最后退出的线程。切忌不要使用外部线程直接调用任务类的方法,在本例中我就犯了这样的错误。
时间: 2024-08-01 15:16:20

线程池运用实例——一次错误的多线程程序设计以及修复过程的相关文章

java线程池简单实例

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力.但如果对多线程应用不当,会增加对单个任务的处理时间.可以举一个简单的例子: 假设在一台服务器完成一项任务的时间为T       T1 创建线程的时间        T2 在线程中执行任务的时间,包括线程间同步所需时间        T3 线程销毁的时间   显然T = T1+T2+T3.注意这是一个极度简化的假设. 可以看出T1,T3是多线程本身的带来的开销,我们渴望减少T1,T3所用

以实例简介Java中线程池的工作特点_java

什么原因使我们不得不使用线程池? 个人认为主要原因是:短时间内需要处理的任务数量很多 使用线程池的好处: 1.减少在创建和销毁线程上所花的时间以及系统资源的开销 2.如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存 以下是Java自带的几种线程池: 1.newFixedThreadPool  创建一个指定工作线程数量的线程池. 每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中. 2.newCachedThreadPool 创建

简单介绍Java编程中的线程池_java

从 Java 5 开始,Java 提供了自己的线程池.线程池就是一个线程的容器,每次只执行额定数量的线程. java.util.concurrent.ThreadPoolExecutor 就是这样的线程池.它很灵活,但使用起来也比较复杂,本文就对其做一个介绍. 首先是构造函数.以最简单的构造函数为例: public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit uni

深入解析C++编程中线程池的使用_C 语言

为什么需要线程池目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短. 传 统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务.任务执行完毕后,线程退出,这就是是"即时创建,即 时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于 不停的创建线程,销

线程池的原理与实现详解_C 语言

一. 线程池的简介通常我们使用多线程的方式是,需要时创建一个新的线程,在这个线程里执行特定的任务,然后在任务完成后退出.这在一般的应用里已经能够满足我们应用的需求,毕竟我们并不是什么时候都需要创建大量的线程,并在它们执行一个简单的任务后销毁. 但是在一些web.email.database等应用里,比如彩铃,我们的应用在任何时候都要准备应对数目巨大的连接请求,同时,这些请求所要完成的任务却又可能非常的简单,即只占用很少的处理时间.这时,我们的应用有可能处于不停的创建线程并销毁线程的状态.虽说比起

Java中线程休眠编程实例_java

import java.awt.*; import java.util.*; import javax.swing.*; public class SleepMethodTest extends JFrame { /** * */ private static final long serialVersionUID = 1L; private Thread t; // 定义颜色数组 private static Color[] color = { Color.BLACK, Color.BLUE,

c# 线程池-C# 多线程 Ping 几千个 IP地址 测试 主机可达测试 线程池

问题描述 C# 多线程 Ping 几千个 IP地址 测试 主机可达测试 线程池 //有什么办法可以做到在多线程环境下测试数据准确,大能帮忙看看问题出在什么地方,有什么解决方案...以下程序可运行 解决方案 1.最好别使用线程来实现,开销实在是太大了. 2.可以这么考虑,先只管发,即向每个ip地址发送1个icmp请求报文,然后就开始接收所有的icmp应答报文. 解决方案二: 考虑使用raw socket,把所有的icmp报文都收上来,然后再过滤. 解决方案三: 是否是多线程情况下,icmp请求过多

C#实现控制线程池最大数并发线程_C#教程

1. 实验目的:       使用线程池的时候,有时候需要考虑服务器的最大线程数目和程序最快执行所有业务逻辑的取舍. 并非逻辑线程越多也好,而且新的逻辑线程必须会在线程池的等待队列中等待 ,直到线程池中工作的线程执行完毕, 才会有系统线程取出等待队列中的逻辑线程,进行CPU运算. 2.  解决问题:      <a>如果不考虑服务器实际可支持的最大并行线程个数,程序不停往线程池申请新的逻辑线程,这个时候我们可以发现CPU的使用率会不断飙升,并且内存.网络带宽占用也会随着逻辑线程在CPU队列中堆

.NET 中的多线程的概念与线程池

为什么使用多线程 1.使用户界面能够随时相应用户输入 当某个应用程序在进行大量运算时候,为了保证应用程序能够随时相应客户的输入,这个时候我们往往需要让大量运算和相应用户输入这两个行为在不同的线程中进行. 2.效率原因 应用程序经常需要等待一些资源,如等待网络资源,等待io资源,等待用户输入等等.这种情况下使用多线程可以避免CPU长时间处于闲置状态. 用户态,内核态 线程内的资源有两种运行态,即用户态和内核态.某些运算可以在堆栈上进行,这种情况线程是在用户态运行的,某些需要高权限运行的指令,或者某