线程任务的取消

当外部代码能够在活动自然完成之前,把它的状态更改为完成状态,那么这个活动被称为可取消(cancellable)。取消任务是一个很常见的需求,无论是由于用户请求还是系统错误引起的服务关闭等等原因。最简单的任务取消策略就是在线程中维持一个bool变量,在run方法中判断此变量的bool值来决定是否取消任务。显然,这个bool变量需要声明为volatile,以保持多线程环境下可见性(所谓可见性,就是当一个线程修改共享对象的某个状态变量后,另一个线程可以马上看到修改结果)。下面是一个来自《java并发编程实践》的例子:

package net.rubyeye.concurrency.chapter7;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PrimeGenerator implements Runnable {
    private final List<BigInteger> primes = new ArrayList<BigInteger>();

    private volatile boolean cancelled;
    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }
    public void cancel() {
        cancelled = true;
    }
    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }

    public static void main(String args[]) throws InterruptedException {
        PrimeGenerator generator = new PrimeGenerator();
        new Thread(generator).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } finally {
            generator.cancel();
        }
    }
}

    main中启动一个素数生成的任务,线程运行一秒就取消掉。通过线程中的cancelled变量来表征任务是否继续执行。既然是最简单的策略,那么什么是例外情况?显然,阻塞操作下(比如调用join,wait,sleep方法),这样的策略会出问题。任务因为调用这些阻塞方法而被阻塞,它将不会去检查volatile变量,导致取消操作失效。那么解决办法是什么?中断!考虑我们用BlockingQueue去保存生成的素数,BlockingQueue的put方法是阻塞的(当BlockingQueue满的时候,put操作会阻塞直到有元素被take),让我们看看不采用中断,仍然采用简单策略会出现什么情况:

package net.rubyeye.concurrency.chapter7;

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BrokenPrimeProducer extends Thread {
    static int i = 1000;

    private final BlockingQueue<BigInteger> queue;

    private volatile boolean cancelled = false;

    BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        BigInteger p = BigInteger.ONE;
        try {
            while (!cancelled) {
                p = p.nextProbablePrime();
                queue.put(p);
            }
        } catch (InterruptedException cusumed) {
        }
    }

    public void cancel() {
        this.cancelled = false;
    }

    public static void main(String args[]) throws InterruptedException {
        BlockingQueue<BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
                10);
        BrokenPrimeProducer producer = new BrokenPrimeProducer(queue);
        producer.start();
        try {
            while (needMorePrimes())
                queue.take();
        } finally {
            producer.cancel();
        }
    }

    public static boolean needMorePrimes() throws InterruptedException {
        boolean result = true;
        i--;
        if (i == 0)
            result = false;
        return result;
    }
}

    我们在main中通过queue.take来消费产生的素数(虽然仅仅是取出扔掉),我们只消费了1000个素数,然后尝试取消产生素数的任务,很遗憾,取消不了,因为产生素数的线程产生素数的速度大于我们消费的速度,我们在消费1000后就停止消费了,那么任务将被queue的put方法阻塞,永远也不会去判断cancelled状态变量,任务取消不了。正确的做法应当是使用中断(interrupt):

package net.rubyeye.concurrency.chapter7;

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class PrimeProducer extends Thread {
    static int i = 1000;

    private final BlockingQueue<BigInteger> queue;

    private volatile boolean cancelled = false;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        BigInteger p = BigInteger.ONE;
        try {
            while (!Thread.currentThread().isInterrupted()) {
                p = p.nextProbablePrime();
                queue.put(p);
            }
        } catch (InterruptedException cusumed) {
        }
    }

    public void cancel() {
        interrupt();
    }

    public static void main(String args[]) throws InterruptedException {
        BlockingQueue<BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
                10);
        PrimeProducer producer = new PrimeProducer(queue);
        producer.start();
        try {
            while (needMorePrimes())
                queue.take();
        } finally {
            producer.cancel();
        }
    }

    public static boolean needMorePrimes() throws InterruptedException {
        boolean result = true;
        i--;
        if (i == 0)
            result = false;
        return result;
    }
}

   在run方法中,通过Thread的isInterrupted来判断interrupt status是否已经被修改,从而正确实现了任务的取消。关于interrupt,有一点需要特别说明,调用interrupt并不意味着必然停止目标线程的正在进行的工作,它仅仅是传递一个请求中断的信号给目标线程,目标线程会在下一个方便的时刻中断。而对于阻塞方法产生的InterruptedException的处理,两种选择:要么重新抛出让上层代码来处理,要么在catch块中调用Thread的interrupt来保存中断状态。除非你确定要让工作线程终止(如上所示代码),否则不要仅仅是catch而不做任务处理工作(生吞了InterruptedException),更详细可以参考这里。如果不清楚外部线程的中断策略,生搬硬套地调用interrupt可能产生不可预料的后果,可参见书中7.1.4例子。

   另外一个取消任务的方法就是采用Future来管理任务,这是JDK5引入的,用于管理任务的生命周期,处理异常等。比如调用ExecutorService的sumit方法会返回一个Future来描述任务,而Future有一个cancel方法用于取消任务。
   那么,如果任务调用了不可中断的阻塞方法,比如Socket的read、write方法,java.nio中的同步I/O,那么该怎么处理呢?简单地,关闭它们!参考下面的例子:

package net.rubyeye.concurrency.chapter7;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;

/**
 * 展示对于不可中断阻塞的取消任务 通过关闭socket引发异常来中断
 * 
 * @author Admin
 * 
 */
public abstract class ReaderThread extends Thread {
    private final Socket socket;

    private final InputStream in;

    public ReaderThread(Socket socket) throws IOException {
        this.socket = socket;
        this.in = socket.getInputStream();
    }

    // 重写interrupt方法
    public void interrupt() {
        try {
            socket.close();
        } catch (IOException e) {
        } finally {
            super.interrupt();
        }
    }

    public void run() {
        try {
            byte[] buf = new byte[1024];
            while (true) {
                int count = in.read(buf);
                if (count < 0)
                    break;
                else if (count > 0)
                    processBuff(buf, count);
            }
        } catch (IOException e) {
        }
    }

    public abstract void processBuff(byte[] buf, int count);
}

    Reader线程重写了interrupt方法,其中调用了socket的close方法用于中断read方法,最后,又调用了super.interrupt(),防止当调用可中断的阻塞方法时不能正常中断。

文章转自庄周梦蝶  ,原文发布时间 2007-09-03

时间: 2024-12-30 12:59:42

线程任务的取消的相关文章

linux下pthread_cancel无法取消线程的原因【转】

转自:http://blog.csdn.net/huangshanchun/article/details/47420961 版权声明:欢迎转载,如有不足之处,恳请斧正. 一个线程可以调用pthread_cancel终止同一进程中的另一个线程,但是值得强调的是:同一进程的线程间,pthread_cancel向另一线程发终止信号.系统并不会马上关闭被取消线程,只有在被取消线程下次系统调用时,才会真正结束线程.或调用pthread_testcancel,让内核去检测是否需要取消当前线程.被取消的线程

关于线程中断的总结

在Core Java中有这样一句话:"没有任何语言方面的需求要求一个被中断的程序应该终止.中断一个线程只是为了引起该线程的注意,被中断线程可以决定如何应对中断 " 中断是一种协作机制.当一个线程中断另一个线程时,被中断的线程不一定要立即停止正在做的事情.相反,中断是礼貌地请求另一个线程在它愿意并且方便的时候停止它正在做的事情.有些方法,例如 Thread.sleep(),很认真地对待这样的请求,但每个方法不是一定要对中断作出响应.对于中断请求,不阻塞但是仍然要花较长时间执行的方法可以轮

《C#多线程编程实战(原书第2版)》——3.5 实现一个取消选项

3.5 实现一个取消选项 本节将通过一个示例来展示如何在线程池中取消异步操作. 3.5.1 准备工作 为了学习本节,你需要安装Visual Studio 2015.除此之外无需其他准备.本节的源代码放置在BookSamples\Chapter3\Recipe4目录中. 3.5.2 实现方式 请执行以下步骤来了解如何在线程中实现一个取消选项: 1.启动Visual Studio 2015.新建一个C#控制台应用程序项目. 2.在Program.cs文件中加入以下using指令: 3.在Main方法

基于线程池的匹配文件数量计算

除了常规的计算匹配文件数量外,这个程序打印出执行过程中池中的最大线程数量.但从ExecutorService接口不能得到这个信息.因此,我们必须将pool对象转型成一个ThreadPoolExecutor类对象. import java.io.*; import java.util.*; import java.util.concurrent.*; public class ThreadPoolTest {    public static void main(String[] args) th

详解Java编程中对线程的中断处理_java

1. 引言 当我们点击某个杀毒软件的取消按钮来停止查杀病毒时,当我们在控制台敲入quit命令以结束某个后台服务时--都需要通过一个线程去取消另一个线程正在执行的任务.Java没有提供一种安全直接的方法来停止某个线程,但是Java提供了中断机制. 如果对Java中断没有一个全面的了解,可能会误以为被中断的线程将立马退出运行,但事实并非如此.中断机制是如何工作的?捕获或检测到中断后,是抛出InterruptedException还是重设中断状态以及在方法中吞掉中断状态会有什么后果?Thread.st

iOS多线程编程之一——NSThread线程管理

iOS多线程编程之一--NSThread线程管理 NSTread是iOS中进行多线程开发的一个类,其结构逻辑清晰,使用十分方便,但其封装度和性能不高,线程周期,加锁等需要手动处理. 一.NSThread类方法总结 获取当前线程 ? 1 + (NSThread *)currentThread; 这个方法通过开启一个新的线程执行选择器方法 ? 1 + (void)detachNewThreadSelector:(SEL)selector toTarget:(id)target withObject:

Java多线程--让主线程等待所有子线程执行完毕在执行_java

朋友让我帮忙写个程序从文本文档中导入数据到oracle数据库中,技术上没有什么难度,文档的格式都是固定的只要对应数据库中的字段解析就行了,关键在于性能. 数据量很大百万条记录,因此考虑到要用多线程并发执行,在写的过程中又遇到问题,我想统计所有子进程执行完毕总共的耗时,在第一个子进程创建前记录当前时间用System.currentTimeMillis()在最后一个子进程结束后记录当前时间,两次一减得到的时间差即为总共的用时,代码如下  long tStart = System.currentTim

Linux中线程和进程的区别

Linux中线程和进程的区别 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,它是系统进行资源分配和调度的一个独立单位.例如,用户运行自己的程序,系统就创建一个进程,并为它分配资源,包括各种表格.内存空间.磁盘空间.I/O设备等,然后该进程被放入到进程的就绪队列,进程调度程序选中它,为它分配CPU及其他相关资源,该进程就被运行起来. 线程是进程的一个实体,是CPU调度和分配的基本单位,线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器.一组寄存器和栈),但

Java并发基础实践:退出任务II

在本系列的上一篇中所述的退出并发任务的方式都是基于JDK 5之前的API,本文将介绍使用由JDK 5引入的并发工具包中的API来退出任务.(2013.10.08最后更新) 在本系列的前一篇中讲述了三种退出并发任务的方式--停止线程:可取消的任务:中断,但都是基于JDK 5之前的API.本篇将介绍由JDK 5引入的java.concurrent包中的Future来取消任务的执行. 1. Future模式 Future是并发编程中的一种常见设计模式,它相当于是Proxy模式与Thread-Per-M