1、FutureTask對象介紹
Future對象大家都不陌生,是JDK1.5提供的接口,是用來以阻塞的方式擷取線程異步執行完的結果。
在Java中想要通過線程執行一個任務,離不開Runnable與Callable這兩個接口。
Runnable與Callable的差別在于,Runnable接口隻有一個run方法,該方法用來執行邏輯,但是并沒有傳回值;而Callable的call方法,同樣用來執行業務邏輯,但是是有一個傳回值的。
Callable執行任務過程中可以通過FutureTask獲得任務的執行狀态,并且可以在執行完成後通過Future.get()方式擷取執行結果。
Future是一個接口,而FutureTask就是Future的實作類。并且FutureTask實作了 RunnableFuture(Runnable + Future),說明我們可以建立一個FutureTask并直接把它放到線程池執行,然後擷取FutureTask的執行結果。
2、FutureTask源碼解析
2.1 主要方法和屬性
那麼FutureTask是如何通過阻塞的方式來擷取到異步線程執行的結果的呢?我們看下FutureTask中的屬性。
java複制代碼// FutureTask的狀态及其常量
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// callable對象,執行完後置空
private Callable<V> callable;
// 要傳回的結果或要引發的異常來自 get() 方法
private Object outcome; // non-volatile, protected by state reads/writes
// 執行Callable的線程
private volatile Thread runner;
// 等待線程的一個連結清單結構
private volatile WaitNode waiters;
FutureTask中幾個比較重要的方法。
java複制代碼// 取消任務的執行
boolean cancel(boolean mayInterruptIfRunning);
// 傳回任務是否已經被取消
boolean isCancelled();
// 傳回任務是否已經完成,任務狀态不為NEW即為完成
boolean isDone();
// 通過get方法擷取任務的執行結果
V get() throws InterruptedException, ExecutionException;
// 通過get方法擷取任務的執行結果,帶有逾時,如果超過給定時間則抛出異常
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
2.2 FutureTask執行
當我們線上程池中執行一個Callable方法時,其實是将Callable任務封裝成一個RunnableFuture對象去執行,同時将這個RunnableFuture對象傳回,這樣我們就拿到了FutureTask的引用,可以随時擷取到任務執行的狀态,并且可以在任務執行完成後通過該對象擷取執行結果。
以下為ThreadPoolExecutor線程池送出一個callable方法的源碼。
scss複制代碼public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
2.3 run方法介紹
RunnableFuture其實也是一個可以執行的runnable,我們看下他的run方法。其主要流程就是執行call方法,正常執行完畢後将result結果指派到outcome屬性上。
ini複制代碼public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 将callable指派到本地變量
Callable<V> c = callable;
// 判斷callable不為空并且FutureTask的狀态必須為新建立
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 執行call方法(使用者自己實作的call邏輯),并擷取到result結果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 如果執行過程出現異常,則将異常對象指派到outcome上
setException(ex);
}
// 如果正常執行完畢,則将result指派到outcome屬性上
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
以下邏輯為正常執行完成後指派的邏輯。
kotlin複制代碼// 如果任務沒有被取消,将future執行完的傳回值指派給result結果
// FutureTask任務的執行狀态是通過CAS的方式進行指派的,并且由此可知,COMPLETING其實是一個瞬時狀态
// 當将線程執行結果指派給outcome後,狀态會修改為對應的NORMAL,即正常結束
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
以下為執行異常時指派邏輯,直接将Throwable對象指派到outcome屬性上。
scss複制代碼protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
無論是正常執行還是異常執行,最終都會調用一個finishCompletion方法,用來做工作的收尾工作。
2.4 get方法介紹
Future的get方法有兩個重載的方法,一個是get()擷取結果,一個是get(long, TimeUnit)帶有逾時時間的擷取結果,我們看下FutureTask中的這兩個方法是如何實作的。
java複制代碼// 不帶有逾時時間,一直阻塞直到擷取結果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 等待結果完成,帶有逾時的get方法也是調用的awaitDone方法
s = awaitDone(false, 0L);
// 傳回結果
return report(s);
}
// 帶有逾時時間的擷取結果,如果超過時間還沒有擷取到結果則抛出異常
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 如果任務未中斷,調用awaitDone方法等待任務結果
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
// 傳回結果
return report(s);
}
我們主要看下awaitDone方法的執行邏輯。此方法會通過for循環的方式一直阻塞等待任務執行完成。如果帶有逾時時間,則超過截止時間後會直接傳回。
ini複制代碼// timed:是否需要逾時擷取
// nanos:逾時時間機關納秒
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 此方法會一直for循環判斷任務狀态是否已經完成,是Future.get阻塞的原因
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 任務狀态大于COMPLETING,則表明任務結束,直接傳回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
// Thread.yield() 方法,使目前線程由執行狀态,變成為就緒狀态,讓出cpu時間,在下一個線程執行時候,此線程有可能被執行,也有可能沒有被執行。
// COMPLETING狀态為瞬時狀态,任務執行完成,要麼是正常結束,要麼異常結束,後續會被置為NORMAL或者EXCEPTIONAL
Thread.yield();
else if (q == null)
// 每調用一次get方法,都會建立一個WaitNode等待節點
q = new WaitNode();
else if (!queued)
// 将該等待節點添加到連結清單結構waiters中,q.next = waiters 即在waiters的頭部插入
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果方法帶有逾時判斷,則判斷目前時間是否已經超過了截止時間,如果超過了及截止日期,則退出循環直接傳回目前狀态,此時任務狀态一定是NEW
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
我們在看下report方法,在調用get方法時是如何傳回結果的。
這裡首先擷取outcome的值,并判斷任務是否已經執行完成,如果執行完成,則将outcome對象強轉成泛型指定的類型;如果任務被取消了,則抛出一個CancellationException異常;如果都不是,則說明任務在執行過程中發生了異常,此時任務狀态位EXCEPTIONAL,此時的outcome即為Throwable對象,是以将outcome強轉為Throwable并抛出異常。
由此可以知道,我們将一個FutureTask任務submit到線程池中執行的時候,如果發生了異常,是會在調用get方法的時候抛出的。
java複制代碼private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
2.5 cancel方法介紹
cancel方法用于取消正在運作的任務,如果任務取消成功,則傳回TRUE,如果取消失敗則傳回FALSE。
java複制代碼// mayInterruptIfRunning:允許中斷正在運作的任務
public boolean cancel(boolean mayInterruptIfRunning) {
// mayInterruptIfRunning如果為true則将狀态置為INTERRUPTING,如果未false則将狀态置為CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
// 如果狀态修改成功後,判斷是否允許中斷線程,如果允許,則調用Thread的interrupt方法中斷
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 取消後的收尾工作
finishCompletion();
}
return true;
}
2.6 isDone/isCancelled方法介紹
isDone方法用于判斷FutureTask是否已經完成;isCancelled方法用來判斷FutureTask是否已經取消,這兩個方法都是通過狀态位來判斷的。
typescript複制代碼public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
2.7 finishCompletion方法介紹
我們看下finishCompletion方法都做了哪些工作。
ini複制代碼// 删除所有等待線程并發出信号,最後執行done方法
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
我們看到done方法是一個受保護的空方法,此處沒有任何邏輯,由其子類去根據自己的業務去實作相應的邏輯。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
csharp複制代碼protected void done() { }
3、總結
通過源碼解讀可以了解到Future的原理:
第一步:主線程将任務封裝成一個Callable對象,通過submit方法送出到線程池去執行。
第二步:線程池執行任務的run方法,主線程則可以繼續執行其他邏輯。
第三步:線程池中方法執行完成後将結果指派到outcome屬性上,并修改任務狀态。
第四步:主線程在需要拿到異步任務結果的時候,主動調用fugure.get()方法來擷取結果。
第五步:如果異步線程在執行過程中發生異常,則會在調用future.get()方法的時候抛出來。
以上就是對于FutureTask的分析,我們可以了解FutureTask任務執行的方式以及Future.get已阻塞的方式擷取線程執行的結果原理,并且從代碼中可以了解FutureTask的任務執行狀态以及狀态的變化過程。