當外部代碼能夠在活動自然完成之前,把它的狀态更改為完成狀态,那麼這個活動被稱為可取消(cancellable)。取消任務是一個很常見的需求,無論是由于使用者請求還是系統錯誤引起的服務關閉等等原因。最簡單的任務取消政策就是線上程中維持一個bool變量,在run方法中判斷此變量的bool值來決定是否取消任務。顯然,這個bool變量需要聲明為volatile,以保持多線程環境下可見性(所謂可見性,就是當一個線程修改共享對象的某個狀态變量後,另一個線程可以馬上看到修改結果)。下面是一個來自《java并發程式設計實踐》的例子:
package net.rubyeye.concurrency.chapter7;
import java.math.biginteger;
import java.util.arraylist;
import java.util.list;
import java.util.concurrent.timeunit;
public class primegenerator implements runnable {
private final list<biginteger> primes = new arraylist<biginteger>();
private volatile boolean cancelled;
public void run() {
biginteger p = biginteger.one;
while (!cancelled) {
p = p.nextprobableprime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
public synchronized list<biginteger> get() {
return new arraylist<biginteger>(primes);
public static void main(string args[]) throws interruptedexception {
primegenerator generator = new primegenerator();
new thread(generator).start();
try {
timeunit.seconds.sleep(1);
} finally {
generator.cancel();
}
main中啟動一個素數生成的任務,線程運作一秒就取消掉。通過線程中的cancelled變量來表征任務是否繼續執行。既然是最簡單的政策,那麼什麼是例外情況?顯然,阻塞操作下(比如調用join,wait,sleep方法),這樣的政策會出問題。任務因為調用這些阻塞方法而被阻塞,它将不會去檢查volatile變量,導緻取消操作失效。那麼解決辦法是什麼?中斷!考慮我們用blockingqueue去儲存生成的素數,blockingqueue的put方法是阻塞的(當blockingqueue滿的時候,put操作會阻塞直到有元素被take),讓我們看看不采用中斷,仍然采用簡單政策會出現什麼情況:
import java.util.concurrent.blockingqueue;
import java.util.concurrent.countdownlatch;
import java.util.concurrent.linkedblockingqueue;
public class brokenprimeproducer extends thread {
static int i = 1000;
private final blockingqueue<biginteger> queue;
private volatile boolean cancelled = false;
brokenprimeproducer(blockingqueue<biginteger> queue) {
this.queue = queue;
public void run() {
while (!cancelled) {
p = p.nextprobableprime();
queue.put(p);
} catch (interruptedexception cusumed) {
public void cancel() {
this.cancelled = false;
public static void main(string args[]) throws interruptedexception {
blockingqueue<biginteger> queue = new linkedblockingqueue<biginteger>(
10);
brokenprimeproducer producer = new brokenprimeproducer(queue);
producer.start();
while (needmoreprimes())
queue.take();
producer.cancel();
public static boolean needmoreprimes() throws interruptedexception {
boolean result = true;
i--;
if (i == 0)
result = false;
return result;
我們在main中通過queue.take來消費産生的素數(雖然僅僅是取出扔掉),我們隻消費了1000個素數,然後嘗試取消産生素數的任務,很遺憾,取消不了,因為産生素數的線程産生素數的速度大于我們消費的速度,我們在消費1000後就停止消費了,那麼任務将被queue的put方法阻塞,永遠也不會去判斷cancelled狀态變量,任務取消不了。正确的做法應當是使用中斷(interrupt):
public class primeproducer extends thread {
primeproducer(blockingqueue<biginteger> queue) {
while (!thread.currentthread().isinterrupted()) {
interrupt();
primeproducer producer = new primeproducer(queue);
另外一個取消任務的方法就是采用future來管理任務,這是jdk5引入的,用于管理任務的生命周期,處理異常等。比如調用executorservice的sumit方法會傳回一個future來描述任務,而future有一個cancel方法用于取消任務。
那麼,如果任務調用了不可中斷的阻塞方法,比如socket的read、write方法,java.nio中的同步i/o,那麼該怎麼處理呢?簡單地,關閉它們!參考下面的例子:
import java.io.ioexception;
import java.io.inputstream;
import java.net.socket;
/**
* 展示對于不可中斷阻塞的取消任務 通過關閉socket引發異常來中斷
*
* @author admin
*/
public abstract class readerthread extends thread {
private final socket socket;
private final inputstream in;
public readerthread(socket socket) throws ioexception {
this.socket = socket;
this.in = socket.getinputstream();
// 重寫interrupt方法
public void interrupt() {
socket.close();
} catch (ioexception e) {
super.interrupt();
byte[] buf = new byte[1024];
while (true) {
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processbuff(buf, count);
public abstract void processbuff(byte[] buf, int count);
reader線程重寫了interrupt方法,其中調用了socket的close方法用于中斷read方法,最後,又調用了super.interrupt(),防止當調用可中斷的阻塞方法時不能正常中斷。
文章轉自莊周夢蝶 ,原文釋出時間 2007-09-03