概述
FutureTask 是一個可取消的、異步執行任務的類,它的繼承結構如下:

它實作了 RunnableFuture 接口,而該接口又繼承了 Runnable 接口和 Future 接口,是以 FutureTask 也具有這兩個接口所定義的特征。FutureTask 的主要功能:
1. 異步執行任務,并且任務隻執行一次;
2. 監控任務是否完成、取消任務;
3. 擷取任務執行結果。
下面分析其代碼實作。
代碼分析
分析 FutureTask 的代碼之前,先看下它實作的接口。RunnableFuture 接口定義如下:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture 接口繼承了 Runnable 接口和 Future 接口,而 Runnable 接口隻有一個 run 方法,這裡不再贅述。下面分析 Future 接口。
Future 接口
Future 接口方法定義如下:
主要方法分析:
/*
* 嘗試取消執行任務。若任務已完成、已取消,或者由于其他某些原因無法取消,則嘗試失敗。
* 若成功,且調用該方法時任務未啟動,則此任務不會再運作;
* 若任務已啟動,則根據參數 mayInterruptIfRunning 決定是否中斷該任務。
*/
boolean cancel(boolean mayInterruptIfRunning);
// 若該任務正常結束之前被取消,則傳回 true
boolean isCancelled();
/*
* 若該任務已完成,則傳回 true
* 這裡的“完成”,可能是由于正常終止、異常,或者取消,這些情況都傳回 true
*/
boolean isDone();
// 等待計算完成(如果需要),然後擷取結果
V get() throws InterruptedException, ExecutionException;
// 如果需要,最多等待計算完成的給定時間,然後檢索其結果(如果可用)
// PS: 該方法與前者的差別在于加了逾時等待
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
FutureTask 代碼分析
任務的狀态變量:
// 任務的狀态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
其中 state 表示任務的狀态,總共有 7 種,它們之間的狀态轉換可能有以下 4 種情況:
1. 任務執行正常:NEW -> COMPLETING -> NORMAL
2. 任務執行異常:NEW -> COMPLETING -> EXCEPTIONAL
3. 任務取消:NEW -> CANCELLED
4. 任務中斷:NEW -> INTERRUPTING -> INTERRUPTED
示意圖:
在分析其他成員變量之前,先看一個内部嵌套類 WaitNode:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
代碼比較簡單,就是對 Thread 的封裝,可以了解為單連結清單的節點。
其他成員變量:
/** The underlying callable; nulled out after running */
// 送出的任務
private Callable<V> callable;
/** The result to return or exception to throw from get() */
// get() 方法傳回的結果(或者異常)
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
// 執行任務的線程
private volatile Thread runner;
/** Treiber stack of waiting threads */
// 等待線程的 Treiber 棧
private volatile WaitNode waiters;
其中 waiters 是一個 Treiber 棧,簡單來說,就是由單連結清單組成的線程安全的棧,如圖所示:
構造器
// 建立一個 FutureTask 對象,在運作時将執行給定的 Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// 建立一個 FutureTask,在運作時執行給定的 Runnable,
// 并安排 get 将在成功完成時傳回給定的結果
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
這兩個構造器分别傳入 Callable 對象和 Runnable 對象(适配為 Callable 對象),然後将其狀态初始化為 NEW。
run: 執行任務
public void run() {
// 使用 CAS 進行并發控制,防止任務被執行多次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 調用 Callable 的 call 方法執行任務
result = c.call();
ran = true;
} catch (Throwable ex) {
// 異常處理
result = null;
ran = false;
setException(ex);
}
// 正常處理
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 線程被中斷
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
set & setException: 更新狀态值,喚醒棧中等待的線程
protected void set(V v) {
// CAS 将 state 修改為 COMPLETING,該狀态是一個中間狀态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 輸出結果指派
// 将 state 更新為 NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
// CAS 将 state 修改為 COMPLETING,該狀态是一個中間狀态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t; // 輸出結果指派
// 将 state 更新為 EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
這兩個方法的操作類似,都是更新 state 的值并給傳回結果 outcome 指派,然後執行結束操作 finishCompletion 方法:
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 将 waiters 置空
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 喚醒 WaitNode 封裝的線程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
finishCompletion 方法的作用就是喚醒棧中所有等待的線程,并清空棧。其中的 done 方法實作為空:
protected void done() { }
子類可以重寫該方法實作回調功能。
get: 擷取執行結果
// 擷取執行結果(阻塞式)
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 若任務未執行完,則等待它執行完成
if (s <= COMPLETING)
// 任務未完成
s = awaitDone(false, 0L);
// 封裝傳回結果
return report(s);
}
// 擷取執行結果(有逾時等待)
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
這兩個方法都是擷取任務執行的結果,原理也基本一樣,差別在于後者有逾時等待(逾時會抛出 TimeoutException 異常)。
awaitDone: 等待任務執行完成
// Awaits completion or aborts on interrupt or timeout.
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 響應線程中斷
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// s > COMPLETING 表示任務已執行完成(包括正常執行、異常等狀态)
// 則傳回對應的狀态值
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// s == COMPLETING 是一個中間狀态,表示任務尚未完成
// 這裡讓出 CPU 時間片
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 執行到這裡,表示 s == NEW,将目前線程封裝為一個 WaitNode 節點
else if (q == null)
q = new WaitNode();
// 這裡表示 q 并未入棧,CAS 方式将當 WaitNode 入棧
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 有逾時的情況
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
// 将目前線程挂起
else
LockSupport.park(this);
}
}
該方法的主要判斷步驟如下:
1. 若線程被中斷,則響應中斷;
2. 若任務已完成,則傳回狀态值;
3. 若任務正在執行,則讓出 CPU 時間片;
4. 若任務未執行,則将目前線程封裝為 WaitNode 節點;
5. 若 WaitNode 未入棧,則執行入棧;
6. 若已入棧,則将線程挂起。
以上步驟是循環執行的,其實該方法的主要作用就是:當任務執行完成時,傳回狀态值;否則将目前線程挂起。
removeWaiter: 移除棧中的節點
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;
}
break;
}
}
}
report 方法:封裝傳回結果
private V report(int s) throws ExecutionException {
Object x = outcome; // 輸出結果指派
// 正常結束
if (s == NORMAL)
return (V)x;
// 取消
if (s >= CANCELLED)
throw new CancellationException();
// 執行異常
throw new ExecutionException((Throwable)x);
}
該方法就是對傳回結果的包裝,無論是正常結束或是抛出異常。
cancel: 取消任務
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
// 若允許中斷,則嘗試中斷線程
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
場景舉例
FutureTask 适合多線程執行一些耗時的操作,然後擷取執行結果。下面結合線程池簡單分析其用法,示例代碼如下(僅供參考):
public class FutureTaskTest {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<FutureTask<Integer>> taskList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int finalI = i;
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
// 模拟耗時任務
TimeUnit.SECONDS.sleep(finalI * 2);
System.out.println(Thread.currentThread().getName() + " 計算中……");
return finalI * finalI;
});
taskList.add(futureTask);
executorService.submit(futureTask); // 送出到線程池
}
System.out.println("任務全部送出,主線程做其他操作");
// 擷取執行結果
for (FutureTask<Integer> futureTask : taskList) {
Integer result = futureTask.get();
System.out.println("result-->" + result);
}
// 關閉線程池
executorService.shutdown();
}
}
小結
FutureTask 是一個封裝任務(Runnable 或 Callable)的類,可以異步執行任務,并擷取執行結果,适用于耗時操作場景。
參考連結:
http://www.hchstudio.cn/article/2017/2b8f/
https://segmentfault.com/a/1190000016572591
https://www.jianshu.com/p/43dab9b7c25b