天天看點

線程任務的取消

  當外部代碼能夠在活動自然完成之前,把它的狀态更改為完成狀态,那麼這個活動被稱為可取消(cancellable)。取消任務是一個很常見的需求,無論是由于使用者請求還是系統錯誤引起的服務關閉等等原因。最簡單的任務取消政策就是線上程中維持一個bool變量,在run方法中判斷此變量的bool值來決定是否取消任務。顯然,這個bool變量需要聲明為volatile,以保持多線程環境下可見性(所謂可見性,就是當一個線程修改共享對象的某個狀态變量後,另一個線程可以馬上看到修改結果)。下面是一個來自《java并發程式設計實踐》的例子:

package net.rubyeye.concurrency.chapter7;

import java.math.biginteger;

import java.util.arraylist;

import java.util.list;

import java.util.concurrent.timeunit;

public class primegenerator implements runnable {

    private final list<biginteger> primes = new arraylist<biginteger>();

    private volatile boolean cancelled;

    public void run() {

        biginteger p = biginteger.one;

        while (!cancelled) {

            p = p.nextprobableprime();

            synchronized (this) {

                primes.add(p);

            }

        }

    }

    public void cancel() {

        cancelled = true;

    public synchronized list<biginteger> get() {

        return new arraylist<biginteger>(primes);

    public static void main(string args[]) throws interruptedexception {

        primegenerator generator = new primegenerator();

        new thread(generator).start();

        try {

            timeunit.seconds.sleep(1);

        } finally {

            generator.cancel();

}

    main中啟動一個素數生成的任務,線程運作一秒就取消掉。通過線程中的cancelled變量來表征任務是否繼續執行。既然是最簡單的政策,那麼什麼是例外情況?顯然,阻塞操作下(比如調用join,wait,sleep方法),這樣的政策會出問題。任務因為調用這些阻塞方法而被阻塞,它将不會去檢查volatile變量,導緻取消操作失效。那麼解決辦法是什麼?中斷!考慮我們用blockingqueue去儲存生成的素數,blockingqueue的put方法是阻塞的(當blockingqueue滿的時候,put操作會阻塞直到有元素被take),讓我們看看不采用中斷,仍然采用簡單政策會出現什麼情況:

import java.util.concurrent.blockingqueue;

import java.util.concurrent.countdownlatch;

import java.util.concurrent.linkedblockingqueue;

public class brokenprimeproducer extends thread {

    static int i = 1000;

    private final blockingqueue<biginteger> queue;

    private volatile boolean cancelled = false;

    brokenprimeproducer(blockingqueue<biginteger> queue) {

        this.queue = queue;

    public void run() {

            while (!cancelled) {

                p = p.nextprobableprime();

                queue.put(p);

        } catch (interruptedexception cusumed) {

    public void cancel() {

        this.cancelled = false;

    public static void main(string args[]) throws interruptedexception {

        blockingqueue<biginteger> queue = new linkedblockingqueue<biginteger>(

                10);

        brokenprimeproducer producer = new brokenprimeproducer(queue);

        producer.start();

            while (needmoreprimes())

                queue.take();

            producer.cancel();

    public static boolean needmoreprimes() throws interruptedexception {

        boolean result = true;

        i--;

        if (i == 0)

            result = false;

        return result;

    我們在main中通過queue.take來消費産生的素數(雖然僅僅是取出扔掉),我們隻消費了1000個素數,然後嘗試取消産生素數的任務,很遺憾,取消不了,因為産生素數的線程産生素數的速度大于我們消費的速度,我們在消費1000後就停止消費了,那麼任務将被queue的put方法阻塞,永遠也不會去判斷cancelled狀态變量,任務取消不了。正确的做法應當是使用中斷(interrupt):

public class primeproducer extends thread {

    primeproducer(blockingqueue<biginteger> queue) {

            while (!thread.currentthread().isinterrupted()) {

        interrupt();

        primeproducer producer = new primeproducer(queue);

   另外一個取消任務的方法就是采用future來管理任務,這是jdk5引入的,用于管理任務的生命周期,處理異常等。比如調用executorservice的sumit方法會傳回一個future來描述任務,而future有一個cancel方法用于取消任務。

   那麼,如果任務調用了不可中斷的阻塞方法,比如socket的read、write方法,java.nio中的同步i/o,那麼該怎麼處理呢?簡單地,關閉它們!參考下面的例子:

import java.io.ioexception;

import java.io.inputstream;

import java.net.socket;

/**

 * 展示對于不可中斷阻塞的取消任務 通過關閉socket引發異常來中斷

 * 

 * @author admin

 */

public abstract class readerthread extends thread {

    private final socket socket;

    private final inputstream in;

    public readerthread(socket socket) throws ioexception {

        this.socket = socket;

        this.in = socket.getinputstream();

    // 重寫interrupt方法

    public void interrupt() {

            socket.close();

        } catch (ioexception e) {

            super.interrupt();

            byte[] buf = new byte[1024];

            while (true) {

                int count = in.read(buf);

                if (count < 0)

                    break;

                else if (count > 0)

                    processbuff(buf, count);

    public abstract void processbuff(byte[] buf, int count);

    reader線程重寫了interrupt方法,其中調用了socket的close方法用于中斷read方法,最後,又調用了super.interrupt(),防止當調用可中斷的阻塞方法時不能正常中斷。文章轉自莊周夢蝶  ,原文釋出時間 2007-09-03

文章轉自莊周夢蝶  ,原文釋出時間