CompletableFuture 不能被中断

我之前写过一篇关于InterruptedException and interrupting threads的文章。总之,如果你调用Future.cancel(),那么Future不仅会终止正在等待的get(),还会试图去中断底层的线程。这是个很重要的特征,它能够使线程池变得更加利于使用。我在之前的文章中也说过,相对于标准的Future,尽量使用CompletableFuture。但事实证明,Future的更加强大的兄弟-CompletableFuture并不能优雅地处理cancel()。

请思考下面的任务代码,在接下来的测试中会用到:

class InterruptibleTask implements Runnable {

    private final CountDownLatch started = new CountDownLatch(1)
    private final CountDownLatch interrupted = new CountDownLatch(1)

    @Override
    void run() {
        started.countDown()
        try {
            Thread.sleep(10_000)
        } catch (InterruptedException ignored) {
            interrupted.countDown()
        }
    }

    void blockUntilStarted() {
        started.await()
    }

    void blockUntilInterrupted() {
        assert interrupted.await(1, TimeUnit.SECONDS)
    }

}

客户端线程可以检查InterruptibleTask是否已经开始运行或者是被中断了。首先,我们可以从外部查看InterruptibleTask到底会对cancel()作出怎么样的反应:

def "Future is cancelled without exception"() {
    given:
        def task = new InterruptibleTask()
        def future = myThreadPool.submit(task)
        task.blockUntilStarted()
    and:
        future.cancel(true)
    when:
        future.get()
    then:
        thrown(CancellationException)
}

def "CompletableFuture is cancelled via CancellationException"() {
    given:
        def task = new InterruptibleTask()
        def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
        task.blockUntilStarted()
    and:
        future.cancel(true)
    when:
        future.get()
    then:
        thrown(CancellationException)
}

到目前为止一切顺利,Future和CompletableFuture都以几乎相同的方式工作着-在cancel之后取回结果会抛出CancellationException(这里要解释一下,Future.cancel()是不会抛出异常的,而CompletableFuture.cancel()则会以抛出CancellationException强行结束,上面的代码作者都手动抛出了CancellationException)。但在myThreadPool中的线程会怎样呢?我猜会被中断然后被线程池重新回收,我大错特错!

def "should cancel Future"() {
    given:
        def task = new InterruptibleTask()
        def future = myThreadPool.submit(task)
        task.blockUntilStarted()
    when:
        future.cancel(true)
    then:
        task.blockUntilInterrupted()
}

@Ignore("Fails with CompletableFuture")
def "should cancel CompletableFuture"() {
    given:
        def task = new InterruptibleTask()
        def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
        task.blockUntilStarted()
    when:
        future.cancel(true)
    then:
        task.blockUntilInterrupted()
}

第一个测试提交普通的Runnable给ExecutorService然后等待直到它开始执行,接着我们取消Future等待直到抛出InterruptedException,当底层的线程被中断的时候blockUntilInterrupted()会返回。第二个测试失败了,CompletableFuture.cancel()不会中断线程,尽管Future看起来被取消了,但后台线程仍然在执行,sleep()不会抛出InterruptionException。这是一个bug还是这就是CompletableFuture的特点?你们可以查看此文档,不幸地是这就是它的特点:

Parameters: mayInterruptIfRunning – this value has no effect in this implementation because interrupts are not used to control processing.

RTFM(Read The Fucking Manual),但为什么CompletableFuture会以这样的方式工作?首先让我们检查一下“老的”Future的实现与CompletableFuture的有什么不同。FutureTask会在执行ExecutorService.submit()之后返回,而且它的cancel()有如下的实现(我移除了Unsafe以及相似的非线程安全的Java代码,所以仅仅把它当作伪代码看待):

public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
        return false;
    state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED;
    try {
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                state = INTERRUPTED;
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

FutureTask的state变量状态如下图:

万一执行cancel(),我们要么进入CANCELLED状态,要么通过INTERRUPTING进入INTERRUPTED。这里的核心部分是我们要获取runner线程(如果存在,例如如果task正在被执行)然后试着去中断它。这里要小心对于正在运行的线程的强制中断。最后在finishCompletion()中我们要通知所有阻塞在Future.get()的线程(这一步在这里无关痛痒可以忽略)。所以我们可以直观的看到老的Future是如何取消正在运行的tasks的。那CompletableFuture呢?它的cancel()伪代码如下:

public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = false;
    if (result == null) {
        result = new AltResult(new CancellationException());
        cancelled = true;
    }
    postComplete();
    return cancelled || isCancelled();
}

这相当令人失望,我们很少把result赋值为CancellationException而忽略mayInterruptIfRunning标志。postComplete()的作用和finishCompletion()的作用相似,通知所有注册在future下的正在等待的回调操作。这种实现相当让人不愉快(使用了非阻塞的Treiber stack),但它的确没有中断任何底层的线程。

Reasons and implications

CompletableFuture的这种cancel限制并不是bug,而是一种设计决定。CompletableFuture天生就没有和任何线程绑定在一起,但Future却几乎总是代表在后台运行的task。使用new关键字创造一个CompletableFuture(new CompletableFuture<>())就很好,这时没有任何底层的线程去取消。但是仍然有大部分的CompletableFuture和后台的task以及线程有联系,在这种情况下有问题的cancel()就是一个潜在的问题。我不建议盲目地用CompletableFuture替换Future,因为如果程序里面有cancel(),那么替换可能会改变程序的行为。这就意味着CompletableFuture有意地违背了里氏替换原则,我们要认真思考这样做的含义。

转载自 并发编程网 - ifeve.com

时间: 2024-09-06 02:17:01

CompletableFuture 不能被中断的相关文章

win7下载中断故障解决

  用电脑下载程序或者是软件.资料应该是咱们日常生活或者是工作中都经常用到的功能吧?但是大家有没有发现win7系统有这样的一个问题,若是下载的时间需要比较长的话,下载的任务便会出现自动停止或者是中断的情况,可能我们下载了很久之后才发现,任务已经不知道什么时候被中断了,那么这种问题如何解决呢? 其实经过win7之家的测试,发现这个问题原来是因为电源设置问题引起的,既然找到了病因,那么解决的方法也是呼之欲出了! 首先,我们打开win7系统的菜单,进入控制面板中,找到网络和共享中心点击进去. 接下来找

Bug:StampedLock的中断问题导致CPU爆满

StampedLock作为JAVA8中出现的新型锁,很可能在大多数场景都可以替代ReentrantReadWriteLock.它对于读/写都提供了四个接口(换成write为写锁): readLock() tryReadLock() tryReadLock(long time, TimeUnit unit) readLockInterruptibly() 这几个方法对应的语义为: 获取读锁(阻塞,不响应中断) 获取读锁(立即) 限时获取读锁(响应中断) 获取读锁(阻塞,响应中断) 然而在readL

中断中C函数调用C++

  之前,我们在单片机程序开发时都会面对中断函数.众所周知的,这个中断函数肯定是要用C函数来定义的.我在用C++进行程序开发的时候就发现了一个需要解决了问题:在断函数中怎么调用C++的成员函数?     我的中断函数定义在文件 IRQHander.c 文件中,我想在串口中断函数调用 gPrinter.Putchar(ch) 函数.用于向 gPrinter 发送字符打印消息.    尝试1:直接将 CDebug.h 文件包含进来. <strong>#include "CDebug.h&

java线程阻塞中断和LockSupport的常见问题

上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下Thread.interrupt和java 5之后的LockSupport的实现.   在介绍之前,先抛几个问题.   Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常? Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING? 一般Thread编程

中断子系统4_i8259a中断控制器

// 8259A 中的寄存器: // ICW: Initialization Command Word,初始化命令寄存器 // OCW: Operation Command Word,操作命令字,用于控制 8259A // IRR: Interrupt Request Register,中断请求寄存器,共 8bit,对应 IR0~IR7 八个中断管脚.当某个管脚的中断请求到来后, // 若该管脚没有被屏蔽,IRR 中对应的 bit 被置1.表示 PIC 已经收到设备的中断请求,但还未提交给 CP

Linux系统针对网卡中断的优化处理

中断: 当网卡接收到数据包后,会触发硬中断,通知CPU来收包.硬中断是一个CPU和网卡交互的过程.这其实会消耗CPU资源.特别是在使用速度极快的万兆网卡之后,大量的网络交互使得CPU很大一部分资源消耗在网卡中断处理上.此时,瓶颈并不在网卡,而是在CPU上.因此,现在的网卡都采用多队列的技术,用于充分利用多核心CPU. 中断的详细解释:<Linux的中断和异常扫盲笔记> SMP IRQ affinity 为了防止多个设置发送相同的中断, Linux设计了一套中断请求系统, 使得计算机系统中的每个

如何中断JAVA线程

如何中断JAVA线程 程序是很简易的.然而,在编程人员面前,多线程呈现出了一组新的难题,如果没有被恰当的解决,将导致意外的行为以及细微的.难以发现的错误.       在本篇文章中,我们针对这些难题之一:如何中断一个正在运行的线程.                                                                                      背景     中断(Interrupt)一个线程意味着在该线程完成任务之前停止其正在进行的一切,

中断处理程序不能使用printf的本质

vxworks 中断处理程序之所以不用printf,本质在于printf是将信息输出到标准输出设备(STDOUT)中, 整个标准输出设备是一个全局变量,由于有semTake操作,那么就会发生阻塞,vxworks属于硬实时操作系统,不能在规定的时间内完成操作即会死机或复位.所以vxworks不用printf的原因在于阻塞. 网上说printf 因为引用全局变量stdout,所以是不可重入的.这个稍微解释一下.如果用到了全局变量,但是用信号量保护,是线程安全的,但是不可重入的(会导致死锁,譬如一个任

中断子系统2_apic

// io apic.apic // 1.io apic, intel从pentiun III开始引入一种名为I/O高级可编程控制器(io apic)用于代替老式8259A可编程控制器. // 2.apic, cpu内部都包含一个本地apic,每个本地apic都有32位的寄存器,一个内部时钟,一个本地定时设备 // 及为本地apic中断保留的两条额外的IRQ线,LINT0和LINT1.所有本地apic都连接到一个外部 io apic.