天天看点

线程任务的取消

当外部代码能够在活动自然完成之前,把它的状态更改为完成状态,那么这个活动被称为可取消(cancellable)。取消任务是一个很常见的需求,无论是由于用户请求还是系统错误引起的服务关闭等等原因。最简单的任务取消策略就是在线程中维持一个bool变量,在run方法中判断此变量的bool值来决定是否取消任务。显然,这个bool变量需要声明为volatile,以保持多线程环境下可见性(所谓可见性,就是当一个线程修改共享对象的某个状态变量后,另一个线程可以马上看到修改结果)。下面是一个来自《java并发编程实践》的例子:

package net.rubyeye.concurrency.chapter7;

import java.math.biginteger;

import java.util.arraylist;

import java.util.list;

import java.util.concurrent.timeunit;

public class primegenerator implements runnable {

    private final list<biginteger> primes = new arraylist<biginteger>();

    private volatile boolean cancelled;

    public void run() {

        biginteger p = biginteger.one;

        while (!cancelled) {

            p = p.nextprobableprime();

            synchronized (this) {

                primes.add(p);

            }

        }

    }

    public void cancel() {

        cancelled = true;

    public synchronized list<biginteger> get() {

        return new arraylist<biginteger>(primes);

    public static void main(string args[]) throws interruptedexception {

        primegenerator generator = new primegenerator();

        new thread(generator).start();

        try {

            timeunit.seconds.sleep(1);

        } finally {

            generator.cancel();

}

    main中启动一个素数生成的任务,线程运行一秒就取消掉。通过线程中的cancelled变量来表征任务是否继续执行。既然是最简单的策略,那么什么是例外情况?显然,阻塞操作下(比如调用join,wait,sleep方法),这样的策略会出问题。任务因为调用这些阻塞方法而被阻塞,它将不会去检查volatile变量,导致取消操作失效。那么解决办法是什么?中断!考虑我们用blockingqueue去保存生成的素数,blockingqueue的put方法是阻塞的(当blockingqueue满的时候,put操作会阻塞直到有元素被take),让我们看看不采用中断,仍然采用简单策略会出现什么情况:

import java.util.concurrent.blockingqueue;

import java.util.concurrent.countdownlatch;

import java.util.concurrent.linkedblockingqueue;

public class brokenprimeproducer extends thread {

    static int i = 1000;

    private final blockingqueue<biginteger> queue;

    private volatile boolean cancelled = false;

    brokenprimeproducer(blockingqueue<biginteger> queue) {

        this.queue = queue;

    public void run() {

            while (!cancelled) {

                p = p.nextprobableprime();

                queue.put(p);

        } catch (interruptedexception cusumed) {

    public void cancel() {

        this.cancelled = false;

    public static void main(string args[]) throws interruptedexception {

        blockingqueue<biginteger> queue = new linkedblockingqueue<biginteger>(

                10);

        brokenprimeproducer producer = new brokenprimeproducer(queue);

        producer.start();

            while (needmoreprimes())

                queue.take();

            producer.cancel();

    public static boolean needmoreprimes() throws interruptedexception {

        boolean result = true;

        i--;

        if (i == 0)

            result = false;

        return result;

    我们在main中通过queue.take来消费产生的素数(虽然仅仅是取出扔掉),我们只消费了1000个素数,然后尝试取消产生素数的任务,很遗憾,取消不了,因为产生素数的线程产生素数的速度大于我们消费的速度,我们在消费1000后就停止消费了,那么任务将被queue的put方法阻塞,永远也不会去判断cancelled状态变量,任务取消不了。正确的做法应当是使用中断(interrupt):

public class primeproducer extends thread {

    primeproducer(blockingqueue<biginteger> queue) {

            while (!thread.currentthread().isinterrupted()) {

        interrupt();

        primeproducer producer = new primeproducer(queue);

   另外一个取消任务的方法就是采用future来管理任务,这是jdk5引入的,用于管理任务的生命周期,处理异常等。比如调用executorservice的sumit方法会返回一个future来描述任务,而future有一个cancel方法用于取消任务。

   那么,如果任务调用了不可中断的阻塞方法,比如socket的read、write方法,java.nio中的同步i/o,那么该怎么处理呢?简单地,关闭它们!参考下面的例子:

import java.io.ioexception;

import java.io.inputstream;

import java.net.socket;

/**

 * 展示对于不可中断阻塞的取消任务 通过关闭socket引发异常来中断

 * 

 * @author admin

 */

public abstract class readerthread extends thread {

    private final socket socket;

    private final inputstream in;

    public readerthread(socket socket) throws ioexception {

        this.socket = socket;

        this.in = socket.getinputstream();

    // 重写interrupt方法

    public void interrupt() {

            socket.close();

        } catch (ioexception e) {

            super.interrupt();

            byte[] buf = new byte[1024];

            while (true) {

                int count = in.read(buf);

                if (count < 0)

                    break;

                else if (count > 0)

                    processbuff(buf, count);

    public abstract void processbuff(byte[] buf, int count);

    reader线程重写了interrupt方法,其中调用了socket的close方法用于中断read方法,最后,又调用了super.interrupt(),防止当调用可中断的阻塞方法时不能正常中断。

文章转自庄周梦蝶  ,原文发布时间 2007-09-03