java线程阻塞中断和LockSupport的常见问题

上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现。

 

在介绍之前,先抛几个问题。

 

  1. Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常?
  2. Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?
  3. 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
  4. LockSupport.park()和unpark(),与object.wait()和notify()的区别?
  5. LockSupport.park(Object blocker)传递的blocker对象做什么用?
  6. LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?
  7. Thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?

如果你都都能很明确的答上来了,说明你已经完全懂Thread.interrupt,可以不用往下看那了。

那如果不清楚的,带着这几个问题,一起来梳理下。

Thread的interrupt处理的几个方法:

  • public void interrupt() :  执行线程interrupt事件
  • public boolean isInterrupted() : 检查当前线程是否处于interrupt
  • public static boolean interrupted() : check当前线程是否处于interrupt,并重置interrupt信息。类似于resetAndGet()

理解:

1. 每个线程都有一个interrupt status标志位,用于表明当前线程是否处于中断状态

2. 一般调用Thread.interrupt()会有两种处理方式

  • 遇到一个低优先级的block状态时,比如object.wait(),object.sleep(),object.join()。它会立马触发一个unblock解除阻塞,并throw一个InterruptedException。
  • 其他情况,Thread.interrupt()仅仅只是更新了status标志位。然后你的工作线程通过Thread.isInterrrupted()进行检查,可以做相应的处理,比如也throw InterruptedException或者是清理状态,取消task等。

在interrupt javadoc中描述:

最佳实践

IBM上有篇文章写的挺不错。Java theory and practice: Dealing with InterruptedException , 里面提到了Interrupt处理的几条最佳实践。

  1. Don't swallow interrupts (别吃掉Interrupt,一般是两种处理:  继续throw InterruptedException异常。  另一种就是继续设置Thread.interupt()异常标志位,让更上一层去进行相应处理。
    1.public class TaskRunner implements Runnable {
    2.    private BlockingQueue<Task> queue;
    3.
    4.    public TaskRunner(BlockingQueue<Task> queue) {
    5.        this.queue = queue;
    6.    }
    7.
    8.    public void run() {
    9.        try {
    10.             while (true) {
    11.                 Task task = queue.take(10, TimeUnit.SECONDS);
    12.                 task.execute();
    13.             }
    14.         }
    15.         catch (InterruptedException e) {
    16.             // Restore the interrupted status
    17.             Thread.currentThread().interrupt();
    18.         }
    19.    }
    20.}
    
  1. Implementing cancelable tasks with Interrupt (使用Thread.interrupt()来设计和支持可被cancel的task)
    1.public class PrimeProducer extends Thread {
    2.    private final BlockingQueue<BigInteger> queue;
    3.
    4.    PrimeProducer(BlockingQueue<BigInteger> queue) {
    5.        this.queue = queue;
    6.    }
    7.
    8.    public void run() {
    9.        try {
    10.            BigInteger p = BigInteger.ONE;
    11.            while (!Thread.currentThread().isInterrupted())
    12.                queue.put(p = p.nextProbablePrime());
    13.        } catch (InterruptedException consumed) {
    14.            /* Allow thread to exit */
    15.        }
    16.    }
    17.
    18.    public void cancel() { interrupt(); } // 发起中断
    19.}<span style="white-space: normal;"> </span>
    

注册Interrupt处理事件(非正常用法)

一般正常的task设计用来处理cancel,都是采用主动轮询的方式检查Thread.isInterrupt(),对业务本身存在一定的嵌入性,还有就是存在延迟,你得等到下一个检查点(谁知道下一个检查点是在什么时候,特别是进行一个socket.read时,遇到过一个HttpClient超时的问题)。

 

来看一下,主动抛出InterruptedException异常的实现,借鉴于InterruptibleChannel的设计,比较取巧。

1.interface InterruptAble { // 定义可中断的接口
2.
3.    public void interrupt() throws InterruptedException;
4.}
5.
6.abstract class InterruptSupport implements InterruptAble {
7.
8.    private volatile boolean interrupted = false;
9.    private Interruptible    interruptor = new Interruptible() {
10.
11.                                             public void interrupt() {
12.                                                 interrupted = true;
13.                                                 InterruptSupport.this.interrupt(); // 位置3
14.                                             }
15.                                         };
16.
17.    public final boolean execute() throws InterruptedException {
18.        try {
19.            blockedOn(interruptor); // 位置1
20.            if (Thread.currentThread().isInterrupted()) { // 立马被interrupted
21.                interruptor.interrupt();
22.            }
23.            // 执行业务代码
24.            bussiness();
25.        } finally {
26.            blockedOn(null);   // 位置2
27.        }
28.
29.        return interrupted;
30.    }
31.
32.    public abstract void bussiness() ;
33.
34.    public abstract void interrupt();
35.
36.    // -- sun.misc.SharedSecrets --
37.    static void blockedOn(Interruptible intr) { // package-private
38.        sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
39.    }
40.}

代码说明,几个取巧的点:

位置1:利用sun提供的blockedOn方法,绑定对应的Interruptible事件处理钩子到指定的Thread上。

位置2:执行完代码后,清空钩子。避免使用连接池时,对下一个Thread处理事件的影响。

位置3:定义了Interruptible事件钩子的处理方法,回调InterruptSupport.this.interrupt()方法,子类可以集成实现自己的业务逻辑,比如sock流关闭等等。

 

使用: 

1.class InterruptRead extends InterruptSupport {
2.
3.    private FileInputStream in;
4.
5.    @Override
6.    public void bussiness() {
7.        File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完
8.        try {
9.            in = new FileInputStream(file);
10.            byte[] bytes = new byte[1024];
11.            while (in.read(bytes, 0, 1024) > 0) {
12.                // Thread.sleep(100);
13.                // if (Thread.interrupted()) {// 以前的Interrupt检查方式
14.                // throw new InterruptedException("");
15.                // }
16.            }
17.        } catch (Exception e) {
18.            throw new RuntimeException(e);
19.        }
20.    }
21.
22.    public FileInputStream getIn() {
23.        return in;
24.    }
25.
26.    @Override
27.    public void interrupt() {
28.        try {
29.            in.getChannel().close();
30.        } catch (IOException e) {
31.            e.printStackTrace();
32.        }
33.    }
34.
35.}
36.
37.public static void main(String args[]) throws Exception {
38.        final InterruptRead test = new InterruptRead();
39.        Thread t = new Thread() {
40.
41.            @Override
42.            public void run() {
43.                long start = System.currentTimeMillis();
44.                try {
45.                    System.out.println("InterruptRead start!");
46.                    test.execute();
47.                } catch (InterruptedException e) {
48.                    System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
49.                    e.printStackTrace();
50.                }
51.            }
52.        };
53.        t.start();
54.        // 先让Read执行3秒
55.        Thread.sleep(3000);
56.        // 发出interrupt中断
57.        t.interrupt();
58.    }

jdk源码介绍: 

1. sun提供的钩子可以查看System的相关代码, line : 1125

1.sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
2.            public sun.reflect.ConstantPool getConstantPool(Class klass) {
3.                return klass.getConstantPool();
4.            }
5.            public void setAnnotationType(Class klass, AnnotationType type) {
6.                klass.setAnnotationType(type);
7.            }
8.            public AnnotationType getAnnotationType(Class klass) {
9.                return klass.getAnnotationType();
10.            }
11.            public <E extends Enum<E>>
12.            E[] getEnumConstantsShared(Class<E> klass) {
13.                return klass.getEnumConstantsShared();
14.            }
15.            public void blockedOn(Thread t, Interruptible b) {
16.                t.blockedOn(b);
17.            }
18.        });

 2. Thread.interrupt()

1.public void interrupt() {
2.    if (this != Thread.currentThread())
3.        checkAccess();
4.
5.    synchronized (blockerLock) {
6.        Interruptible b = blocker;
7.        if (b != null) {
8.        interrupt0();       // Just to set the interrupt flag
9.        b.interrupt(); //回调钩子
10.        return;
11.        }
12.    }
13.    interrupt0();
14.    }

更多

更多关于Thread.stop,suspend,resume,interrupt的使用注意点,可以看一下sun的文档,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

 

 

最后来解答一下之前的几个问题:

问题1: Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常? 

答: Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()几个方法会主动抛出InterruptedException异常。而在其他的的block常见,只是通过设置了Thread的一个标志位信息,需要程序自我进行处理。

1.if (Thread.interrupted())  // Clears interrupted status!
2.    throw new InterruptedException();

问题2:Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING?

答:Thread.interrupt设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持RUNNING状态。比如Object.join()和一些支持interrupt的一些nio channel设计。

 

问题3: 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?

答: interrupt用途: unBlock操作,支持任务cancel, 数据清理等。

 

问题4: LockSupport.park()和unpark(),与object.wait()和notify()的区别?

答:

1.  面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。

2.  实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.

 

 

 

问题5: LockSupport.park(Object blocker)传递的blocker对象做什么用?

答: 对应的blcoker会记录在Thread的一个parkBlocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象.

1.public static void park(Object blocker) {
2.        Thread t = Thread.currentThread();
3.        setBlocker(t, blocker); // 设置Thread.parkBlocker属性的值
4.        unsafe.park(false, 0L);
5.        setBlocker(t, null);  // 清除Thread.parkBlocker属性的值
6.    }

 具体LockSupport的javadoc描述也比较清楚,可以看下:

 

 

问题6: LockSupport能响应Thread.interrupt()事件不?会抛出InterruptedException异常?

答:能响应interrupt事件,但不会抛出InterruptedException异常。针对LockSupport对Thread.interrupte支持,也先看一下javadoc中的描述:

 

相关测试代码

1.package com.agapple.cocurrent;
2.
3.import java.io.File;
4.import java.io.FileInputStream;
5.import java.lang.reflect.Field;
6.import java.util.concurrent.TimeUnit;
7.import java.util.concurrent.locks.LockSupport;
8.
9.public class LockSupportTest {
10.
11.    private static LockSupportTest blocker = new LockSupportTest();
12.
13.    public static void main(String args[]) throws Exception {
14.        lockSupportTest();
15.        parkTest();
16.        interruptParkTest();
17.        interruptSleepTest();
18.        interruptWaitTest();
19.    }
20.
21.    /**
22.     * LockSupport.park对象后,尝试获取Thread.blocker对象,调用其single唤醒
23.     *
24.     * @throws Exception
25.     */
26.    private static void lockSupportTest() throws Exception {
27.        Thread t = doTest(new TestCallBack() {
28.
29.            @Override
30.            public void callback() throws Exception {
31.                // 尝试sleep 5s
32.                System.out.println("blocker");
33.                LockSupport.park(blocker);
34.                System.out.println("wakeup now!");
35.            }
36.
37.            @Override
38.            public String getName() {
39.                return "lockSupportTest";
40.            }
41.
42.        });
43.        t.start(); // 启动读取线程
44.
45.        Thread.sleep(150);
46.        synchronized (blocker) {
47.            Field field = Thread.class.getDeclaredField("parkBlocker");
48.            field.setAccessible(true);
49.            Object fBlocker = field.get(t);
50.            System.out.println(blocker == fBlocker);
51.            Thread.sleep(100);
52.            System.out.println("notifyAll");
53.            blocker.notifyAll();
54.        }
55.    }
56.
57.    /**
58.     * 尝试去中断一个object.wait(),会抛出对应的InterruptedException异常
59.     *
60.     * @throws InterruptedException
61.     */
62.    private static void interruptWaitTest() throws InterruptedException {
63.        final Object obj = new Object();
64.        Thread t = doTest(new TestCallBack() {
65.
66.            @Override
67.            public void callback() throws Exception {
68.                // 尝试sleep 5s
69.                obj.wait();
70.                System.out.println("wakeup now!");
71.            }
72.
73.            @Override
74.            public String getName() {
75.                return "interruptWaitTest";
76.            }
77.
78.        });
79.        t.start(); // 启动读取线程
80.        Thread.sleep(2000);
81.        t.interrupt(); // 检查下在park时,是否响应中断
82.    }
83.
84.    /**
85.     * 尝试去中断一个Thread.sleep(),会抛出对应的InterruptedException异常
86.     *
87.     * @throws InterruptedException
88.     */
89.    private static void interruptSleepTest() throws InterruptedException {
90.        Thread t = doTest(new TestCallBack() {
91.
92.            @Override
93.            public void callback() throws Exception {
94.                // 尝试sleep 5s
95.                Thread.sleep(5000);
96.                System.out.println("wakeup now!");
97.            }
98.
99.            @Override
100.            public String getName() {
101.                return "interruptSleepTest";
102.            }
103.
104.        });
105.        t.start(); // 启动读取线程
106.        Thread.sleep(2000);
107.        t.interrupt(); // 检查下在park时,是否响应中断
108.    }
109.
110.    /**
111.     * 尝试去中断一个LockSupport.park(),会有响应但不会抛出InterruptedException异常
112.     *
113.     * @throws InterruptedException
114.     */
115.    private static void interruptParkTest() throws InterruptedException {
116.        Thread t = doTest(new TestCallBack() {
117.
118.            @Override
119.            public void callback() {
120.                // 尝试去park 自己线程
121.                LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
122.                System.out.println("wakeup now!");
123.            }
124.
125.            @Override
126.            public String getName() {
127.                return "interruptParkTest";
128.            }
129.
130.        });
131.        t.start(); // 启动读取线程
132.        Thread.sleep(2000);
133.        t.interrupt(); // 检查下在park时,是否响应中断
134.    }
135.
136.    /**
137.     * 尝试去中断一个LockSupport.unPark(),会有响应
138.     *
139.     * @throws InterruptedException
140.     */
141.    private static void parkTest() throws InterruptedException {
142.        Thread t = doTest(new TestCallBack() {
143.
144.            @Override
145.            public void callback() {
146.                // 尝试去park 自己线程
147.                LockSupport.park(blocker);
148.                System.out.println("wakeup now!");
149.            }
150.
151.            @Override
152.            public String getName() {
153.                return "parkTest";
154.            }
155.
156.        });
157.
158.        t.start(); // 启动读取线程
159.        Thread.sleep(2000);
160.        LockSupport.unpark(t);
161.        t.interrupt();
162.    }
163.
164.    public static Thread doTest(final TestCallBack call) {
165.        return new Thread() {
166.
167.            @Override
168.            public void run() {
169.                File file = new File("/dev/urandom"); // 读取linux黑洞
170.                try {
171.                    FileInputStream in = new FileInputStream(file);
172.                    byte[] bytes = new byte[1024];
173.                    while (in.read(bytes, 0, 1024) > 0) {
174.                        if (Thread.interrupted()) {
175.                            throw new InterruptedException("");
176.                        }
177.                        System.out.println(bytes[0]);
178.                        Thread.sleep(100);
179.                        long start = System.currentTimeMillis();
180.                        call.callback();
181.                        System.out.println(call.getName() + " callback finish cost : "
182.                                           + (System.currentTimeMillis() - start));
183.                    }
184.                } catch (Exception e) {
185.                    e.printStackTrace();
186.                }
187.            }
188.
189.        };
190.    }
191.}
192.
193.interface TestCallBack {
194.
195.    public void callback() throws Exception;
196.
197.    public String getName();
198.}

最后

发觉文章越写越长,那就索性发到了论坛,大家一起讨论下.毕竟文章中描述的都是一些使用层面的东东,并没有从操作系统或者sun native实现上去介绍Thread的一些机制,熟悉这块的大牛门也可以出来发表下高见.

 

本文仅当抛砖引玉,欢迎发言!

时间: 2024-11-16 15:26:03

java线程阻塞中断和LockSupport的常见问题的相关文章

java线程阻塞中断与LockSupport使用介绍_java

上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现. 在介绍之前,先抛几个问题. Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常? Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING? 一般Thread编程需要关注

多线程之Java线程阻塞与唤醒

线程的阻塞和唤醒在多线程并发过程中是一个关键点,当线程数量达到很大的数量级时,并发可能带来很多隐蔽的问题.如何正确暂停一个线程,暂停后又如何在一个要求的时间点恢复,这些都需要仔细考虑的细节.在Java发展史上曾经使用suspend().resume()方法对于线程进行阻塞唤醒,但随之出现很多问题,比较典型的还是死锁问题.如下代码,主要的逻辑代码是主线程启动线程mt一段时间后尝试使用suspend()让线程挂起,最后使用resume()恢复线程.但现实并不如愿,执行到suspend()时将一直卡住

Java线程中断的本质深入理解

    Java的中断是一种协作机制.也就是说调用线程对象的interrupt方法并不一定就中断了正在运行的线程,它只是要求线程自己在合适的时机中断自己. 一.Java中断的现象 首先,看看Thread类里的几个方法:  public static boolean interrupted 测试当前线程是否已经中断.线程的中断状态 由该方法清除.换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况

Java线程中断的本质深入理解(转)

  一.Java中断的现象 首先,看看Thread类里的几个方法: public static boolean interrupted 测试当前线程是否已经中断.线程的中断状态 由该方法清除.换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况除外). public boolean isInterrupted() 测试线程是否已经中断.线程的中断状态 不受该方法的影响. public void 

深入Java线程中断的本质与编程原则的概述_java

在历史上,Java试图提供过抢占式限制中断,但问题多多,例如前文介绍的已被废弃的Thread.stop.Thread.suspend和 Thread.resume等.另一方面,出于Java应用代码的健壮性的考虑,降低了编程门槛,减少不清楚底层机制的程序员无意破坏系统的概率.     如今,Java的线程调度不提供抢占式中断,而采用协作式的中断.其实,协作式的中断,原理很简单,就是轮询某个表示中断的标记,我们在任何普通代码的中都可以实现. 例如下面的代码:    volatile bool isI

Java线程中断的本质深入理解_java

一.Java中断的现象 首先,看看Thread类里的几个方法: public static boolean interrupted 测试当前线程是否已经中断.线程的中断状态 由该方法清除.换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况除外). public boolean isInterrupted() 测试线程是否已经中断.线程的中断状态 不受该方法的影响. public void in

多线程暂停和唤醒-java线程暂停和唤醒问题

问题描述 java线程暂停和唤醒问题 java线程如何暂停,暂停的时间未知.之后又可以在程序中唤醒呢? 解决方案 在数据不可用的时候,当前线程里wait:在数据可用的时候,当前线程notify 举个例子,如果一个自定义队列,有生产者线程和消费者线程: 那么,如果消费者发现队列里拿不到数据了,队列空了,那么就wait: 生产者一直在生产,如果生产了一条数据,插入队列以后,马上notifyAll,自然就把消费者唤醒了. 请举一反三. 解决方案二: java线程的暂停与唤醒测试java 线程阻塞的问题

详解Java多线程编程中LockSupport类的线程阻塞用法_java

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语. LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到"Thread.suspend 和 Thread.resume所可能引发的死锁"问题. 因为park() 和 unpark()有许可的存在:调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性. 基本用法LockSupport 很类似于二元信号量

如何中断JAVA线程

如何中断JAVA线程 程序是很简易的.然而,在编程人员面前,多线程呈现出了一组新的难题,如果没有被恰当的解决,将导致意外的行为以及细微的.难以发现的错误.       在本篇文章中,我们针对这些难题之一:如何中断一个正在运行的线程.                                                                                      背景     中断(Interrupt)一个线程意味着在该线程完成任务之前停止其正在进行的一切,