上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现。
在介绍之前,先抛几个问题。
- Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
- Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
- 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
- LockSupport.park()和unpark(),与object.wait()和notify()的区别?
- LockSupport.park(Object blocker)传递的blocker对象做什么用?
- LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
- Thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?
如果你都都能很明确的答上来了,说明你已经完全懂Thread.interrupt,可以不用往下看那了。
那如果不清楚的,带着这几个问题,一起来梳理下。
Thread的interrupt处理的几个方法:
- public void interrupt() : 执行线程interrupt事件
- public boolean isInterrupted() : 检查当前线程是否处于interrupt
- public static boolean interrupted() : check当前线程是否处于interrupt,并重置interrupt信息。类似于resetAndGet()
理解:
1. 每个线程都有一个interrupt status标志位,用于表明当前线程是否处于中断状态
2. 一般调用Thread.interrupt()会有两种处理方式
- 遇到一个低优先级的block状态时,比如object.wait(),object.sleep(),object.join()。它会立马触发一个unblock解除阻塞,并throw一个InterruptedException。
- 其他情况,Thread.interrupt()仅仅只是更新了status标志位。然后你的工作线程通过Thread.isInterrrupted()进行检查,可以做相应的处理,比如也throw InterruptedException或者是清理状态,取消task等。
在interrupt javadoc中描述:
最佳实践
IBM上有篇文章写的挺不错。Java theory and practice: Dealing with InterruptedException , 里面提到了Interrupt处理的几条最佳实践。
- Don't swallow interrupts (别吃掉Interrupt,一般是两种处理: 继续throw InterruptedException异常。 另一种就是继续设置Thread.interupt()异常标志位,让更上一层去进行相应处理。
1.public class TaskRunner implements Runnable { 2. private BlockingQueue<Task> queue; 3. 4. public TaskRunner(BlockingQueue<Task> queue) { 5. this.queue = queue; 6. } 7. 8. public void run() { 9. try { 10. while (true) { 11. Task task = queue.take(10, TimeUnit.SECONDS); 12. task.execute(); 13. } 14. } 15. catch (InterruptedException e) { 16. // Restore the interrupted status 17. Thread.currentThread().interrupt(); 18. } 19. } 20.}
- Implementing cancelable tasks with Interrupt (使用Thread.interrupt()来设计和支持可被cancel的task)
1.public class PrimeProducer extends Thread { 2. private final BlockingQueue<BigInteger> queue; 3. 4. PrimeProducer(BlockingQueue<BigInteger> queue) { 5. this.queue = queue; 6. } 7. 8. public void run() { 9. try { 10. BigInteger p = BigInteger.ONE; 11. while (!Thread.currentThread().isInterrupted()) 12. queue.put(p = p.nextProbablePrime()); 13. } catch (InterruptedException consumed) { 14. /* Allow thread to exit */ 15. } 16. } 17. 18. public void cancel() { interrupt(); } // 发起中断 19.}<span style="white-space: normal;"> </span>
注册Interrupt处理事件(非正常用法)
一般正常的task设计用来处理cancel,都是采用主动轮询的方式检查Thread.isInterrupt(),对业务本身存在一定的嵌入性,还有就是存在延迟,你得等到下一个检查点(谁知道下一个检查点是在什么时候,特别是进行一个socket.read时,遇到过一个HttpClient超时的问题)。
来看一下,主动抛出InterruptedException异常的实现,借鉴于InterruptibleChannel的设计,比较取巧。
1.interface InterruptAble { // 定义可中断的接口
2.
3. public void interrupt() throws InterruptedException;
4.}
5.
6.abstract class InterruptSupport implements InterruptAble {
7.
8. private volatile boolean interrupted = false;
9. private Interruptible interruptor = new Interruptible() {
10.
11. public void interrupt() {
12. interrupted = true;
13. InterruptSupport.this.interrupt(); // 位置3
14. }
15. };
16.
17. public final boolean execute() throws InterruptedException {
18. try {
19. blockedOn(interruptor); // 位置1
20. if (Thread.currentThread().isInterrupted()) { // 立马被interrupted
21. interruptor.interrupt();
22. }
23. // 执行业务代码
24. bussiness();
25. } finally {
26. blockedOn(null); // 位置2
27. }
28.
29. return interrupted;
30. }
31.
32. public abstract void bussiness() ;
33.
34. public abstract void interrupt();
35.
36. // -- sun.misc.SharedSecrets --
37. static void blockedOn(Interruptible intr) { // package-private
38. sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
39. }
40.}
代码说明,几个取巧的点:
位置1:利用sun提供的blockedOn方法,绑定对应的Interruptible事件处理钩子到指定的Thread上。
位置2:执行完代码后,清空钩子。避免使用连接池时,对下一个Thread处理事件的影响。
位置3:定义了Interruptible事件钩子的处理方法,回调InterruptSupport.this.interrupt()方法,子类可以集成实现自己的业务逻辑,比如sock流关闭等等。
使用:
1.class InterruptRead extends InterruptSupport {
2.
3. private FileInputStream in;
4.
5. @Override
6. public void bussiness() {
7. File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完
8. try {
9. in = new FileInputStream(file);
10. byte[] bytes = new byte[1024];
11. while (in.read(bytes, 0, 1024) > 0) {
12. // Thread.sleep(100);
13. // if (Thread.interrupted()) {// 以前的Interrupt检查方式
14. // throw new InterruptedException("");
15. // }
16. }
17. } catch (Exception e) {
18. throw new RuntimeException(e);
19. }
20. }
21.
22. public FileInputStream getIn() {
23. return in;
24. }
25.
26. @Override
27. public void interrupt() {
28. try {
29. in.getChannel().close();
30. } catch (IOException e) {
31. e.printStackTrace();
32. }
33. }
34.
35.}
36.
37.public static void main(String args[]) throws Exception {
38. final InterruptRead test = new InterruptRead();
39. Thread t = new Thread() {
40.
41. @Override
42. public void run() {
43. long start = System.currentTimeMillis();
44. try {
45. System.out.println("InterruptRead start!");
46. test.execute();
47. } catch (InterruptedException e) {
48. System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
49. e.printStackTrace();
50. }
51. }
52. };
53. t.start();
54. // 先让Read执行3秒
55. Thread.sleep(3000);
56. // 发出interrupt中断
57. t.interrupt();
58. }
jdk源码介绍:
1. sun提供的钩子可以查看System的相关代码, line : 1125
1.sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
2. public sun.reflect.ConstantPool getConstantPool(Class klass) {
3. return klass.getConstantPool();
4. }
5. public void setAnnotationType(Class klass, AnnotationType type) {
6. klass.setAnnotationType(type);
7. }
8. public AnnotationType getAnnotationType(Class klass) {
9. return klass.getAnnotationType();
10. }
11. public <E extends Enum<E>>
12. E[] getEnumConstantsShared(Class<E> klass) {
13. return klass.getEnumConstantsShared();
14. }
15. public void blockedOn(Thread t, Interruptible b) {
16. t.blockedOn(b);
17. }
18. });
2. Thread.interrupt()
1.public void interrupt() {
2. if (this != Thread.currentThread())
3. checkAccess();
4.
5. synchronized (blockerLock) {
6. Interruptible b = blocker;
7. if (b != null) {
8. interrupt0(); // Just to set the interrupt flag
9. b.interrupt(); //回调钩子
10. return;
11. }
12. }
13. interrupt0();
14. }
更多
更多关于Thread.stop,suspend,resume,interrupt的使用注意点,可以看一下sun的文档,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html
最后来解答一下之前的几个问题:
问题1: Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
答: Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()几个方法会主动抛出InterruptedException异常。而在其他的的block常见,只是通过设置了Thread的一个标志位信息,需要程序自我进行处理。
1.if (Thread.interrupted()) // Clears interrupted status!
2. throw new InterruptedException();
问题2:Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
答:Thread.interrupt设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持RUNNING状态。比如Object.join()和一些支持interrupt的一些nio channel设计。
问题3: 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
答: interrupt用途: unBlock操作,支持任务cancel, 数据清理等。
问题4: LockSupport.park()和unpark(),与object.wait()和notify()的区别?
答:
1. 面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。
2. 实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.
问题5: LockSupport.park(Object blocker)传递的blocker对象做什么用?
答: 对应的blcoker会记录在Thread的一个parkBlocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象.
1.public static void park(Object blocker) {
2. Thread t = Thread.currentThread();
3. setBlocker(t, blocker); // 设置Thread.parkBlocker属性的值
4. unsafe.park(false, 0L);
5. setBlocker(t, null); // 清除Thread.parkBlocker属性的值
6. }
具体LockSupport的javadoc描述也比较清楚,可以看下:
问题6: LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
答:能响应interrupt事件,但不会抛出InterruptedException异常。针对LockSupport对Thread.interrupte支持,也先看一下javadoc中的描述:
相关测试代码
1.package com.agapple.cocurrent;
2.
3.import java.io.File;
4.import java.io.FileInputStream;
5.import java.lang.reflect.Field;
6.import java.util.concurrent.TimeUnit;
7.import java.util.concurrent.locks.LockSupport;
8.
9.public class LockSupportTest {
10.
11. private static LockSupportTest blocker = new LockSupportTest();
12.
13. public static void main(String args[]) throws Exception {
14. lockSupportTest();
15. parkTest();
16. interruptParkTest();
17. interruptSleepTest();
18. interruptWaitTest();
19. }
20.
21. /**
22. * LockSupport.park对象后,尝试获取Thread.blocker对象,调用其single唤醒
23. *
24. * @throws Exception
25. */
26. private static void lockSupportTest() throws Exception {
27. Thread t = doTest(new TestCallBack() {
28.
29. @Override
30. public void callback() throws Exception {
31. // 尝试sleep 5s
32. System.out.println("blocker");
33. LockSupport.park(blocker);
34. System.out.println("wakeup now!");
35. }
36.
37. @Override
38. public String getName() {
39. return "lockSupportTest";
40. }
41.
42. });
43. t.start(); // 启动读取线程
44.
45. Thread.sleep(150);
46. synchronized (blocker) {
47. Field field = Thread.class.getDeclaredField("parkBlocker");
48. field.setAccessible(true);
49. Object fBlocker = field.get(t);
50. System.out.println(blocker == fBlocker);
51. Thread.sleep(100);
52. System.out.println("notifyAll");
53. blocker.notifyAll();
54. }
55. }
56.
57. /**
58. * 尝试去中断一个object.wait(),会抛出对应的InterruptedException异常
59. *
60. * @throws InterruptedException
61. */
62. private static void interruptWaitTest() throws InterruptedException {
63. final Object obj = new Object();
64. Thread t = doTest(new TestCallBack() {
65.
66. @Override
67. public void callback() throws Exception {
68. // 尝试sleep 5s
69. obj.wait();
70. System.out.println("wakeup now!");
71. }
72.
73. @Override
74. public String getName() {
75. return "interruptWaitTest";
76. }
77.
78. });
79. t.start(); // 启动读取线程
80. Thread.sleep(2000);
81. t.interrupt(); // 检查下在park时,是否响应中断
82. }
83.
84. /**
85. * 尝试去中断一个Thread.sleep(),会抛出对应的InterruptedException异常
86. *
87. * @throws InterruptedException
88. */
89. private static void interruptSleepTest() throws InterruptedException {
90. Thread t = doTest(new TestCallBack() {
91.
92. @Override
93. public void callback() throws Exception {
94. // 尝试sleep 5s
95. Thread.sleep(5000);
96. System.out.println("wakeup now!");
97. }
98.
99. @Override
100. public String getName() {
101. return "interruptSleepTest";
102. }
103.
104. });
105. t.start(); // 启动读取线程
106. Thread.sleep(2000);
107. t.interrupt(); // 检查下在park时,是否响应中断
108. }
109.
110. /**
111. * 尝试去中断一个LockSupport.park(),会有响应但不会抛出InterruptedException异常
112. *
113. * @throws InterruptedException
114. */
115. private static void interruptParkTest() throws InterruptedException {
116. Thread t = doTest(new TestCallBack() {
117.
118. @Override
119. public void callback() {
120. // 尝试去park 自己线程
121. LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
122. System.out.println("wakeup now!");
123. }
124.
125. @Override
126. public String getName() {
127. return "interruptParkTest";
128. }
129.
130. });
131. t.start(); // 启动读取线程
132. Thread.sleep(2000);
133. t.interrupt(); // 检查下在park时,是否响应中断
134. }
135.
136. /**
137. * 尝试去中断一个LockSupport.unPark(),会有响应
138. *
139. * @throws InterruptedException
140. */
141. private static void parkTest() throws InterruptedException {
142. Thread t = doTest(new TestCallBack() {
143.
144. @Override
145. public void callback() {
146. // 尝试去park 自己线程
147. LockSupport.park(blocker);
148. System.out.println("wakeup now!");
149. }
150.
151. @Override
152. public String getName() {
153. return "parkTest";
154. }
155.
156. });
157.
158. t.start(); // 启动读取线程
159. Thread.sleep(2000);
160. LockSupport.unpark(t);
161. t.interrupt();
162. }
163.
164. public static Thread doTest(final TestCallBack call) {
165. return new Thread() {
166.
167. @Override
168. public void run() {
169. File file = new File("/dev/urandom"); // 读取linux黑洞
170. try {
171. FileInputStream in = new FileInputStream(file);
172. byte[] bytes = new byte[1024];
173. while (in.read(bytes, 0, 1024) > 0) {
174. if (Thread.interrupted()) {
175. throw new InterruptedException("");
176. }
177. System.out.println(bytes[0]);
178. Thread.sleep(100);
179. long start = System.currentTimeMillis();
180. call.callback();
181. System.out.println(call.getName() + " callback finish cost : "
182. + (System.currentTimeMillis() - start));
183. }
184. } catch (Exception e) {
185. e.printStackTrace();
186. }
187. }
188.
189. };
190. }
191.}
192.
193.interface TestCallBack {
194.
195. public void callback() throws Exception;
196.
197. public String getName();
198.}
最后
发觉文章越写越长,那就索性发到了论坛,大家一起讨论下.毕竟文章中描述的都是一些使用层面的东东,并没有从操作系统或者sun native实现上去介绍Thread的一些机制,熟悉这块的大牛门也可以出来发表下高见.
本文仅当抛砖引玉,欢迎发言!