天天看點

java callable 線程池_[懷舊并發05]分析Java線程池Callable任務執行原理

Java并發程式設計源碼分析系列:

上一篇分析了線程池的執行原理,主要關于線程池的生命周期和任務如何在池裡建立、運作和終止。不過上次研究的是execute方法,執行的是Runnable任務,它不傳回任何值。如果希望任務完成後傳回結果,那麼需要使用Callable接口,這也是本文要研究的主題。

ExecutorService es = Executors.newSingleThreadExecutor();

Future> task = es.submit(new MyThread());

try {

//限定時間擷取結果

task.get(5, TimeUnit.SECONDS);

} catch (TimeoutException e) {

//逾時觸發線程中止

System.out.println("thread over time");

} catch (ExecutionException e) {

//抛出執行異常

throw e;

} finally {

//如果任務還在運作,執行中斷

boolean mayInterruptIfRunning = true;

task.cancel(mayInterruptIfRunning);

}

上面代碼是Future的一個簡單例子:MyThread實作Callable接口,執行時要求在限定時間内擷取結果,逾時執行會抛出TimeoutException,執行異常會抛出ExecutionException。最後在finally裡,如果任務還在執行,就進行取消;如果任務已經執行完,取消操作也沒有影響。

java callable 線程池_[懷舊并發05]分析Java線程池Callable任務執行原理

圖1 FutureTask

Future接口代表一個異步任務的結果,提供了相應方法判斷任務是否完成或者取消。從圖1可知,RunnableFuture同時繼承了Future和Runnable,是一個可運作、可知結果的任務,FutureTask是具體的實作類。

FutureTask的狀态

private volatile int state;

private static final int NEW = 0;

private static final int COMPLETING = 1;

private static final int NORMAL = 2;

private static final int EXCEPTIONAL = 3;

private static final int CANCELLED = 4;

private static final int INTERRUPTING = 5;

private static final int INTERRUPTED = 6;

FutureTask有7種狀态,初始狀态從NEW開始,狀态轉換路徑可以歸納為圖2所示。在後文的代碼,會使用int的大小比較判斷狀态處于哪個範圍,需要留意上面狀态的排列順序。

java callable 線程池_[懷舊并發05]分析Java線程池Callable任務執行原理

圖2 FutureTask狀态路徑

FutureTask的狀态路徑,取決于run和cancel的調用順序,在後文分析時,對号入座這幾條路徑。

NEW -> COMPLETING -> NORMAL 正常的流程

NEW -> COMPLETING -> EXCEPTIONAL 異常的流程

NEW -> CANCELLED 被取消流程

NEW -> INTERRUPTING -> INTERRUPTED 被中斷流程

FutureTask的變量

int state

Thread runner

WaitNode waiters

Callable callable

Object outcome

state、runner、waiters三個變量沒有使用原子類,而是使用Unsafe對象進行原子操作。代碼中會見到很多形如compareAndSwap的方法,入門原理可以看我以前寫的認識非阻塞的同步機制CAS。

callable是要執行的任務,runner是執行任務的線程,outcome是傳回的結果(正常結果或Exception結果)

static final class WaitNode {

volatile Thread thread;

volatile WaitNode next;

WaitNode() { thread = Thread.currentThread(); }

}

waiters的資料結構是WaitNode,儲存了Thread和下個WaitNode的引用。waiters儲存了等待結果的線程,每次操作隻會增減頭,是以是一個棧結構,詳細見後文對get方法的分析。

FutureTask的建立

public FutureTask(Callable callable) {

if (callable == null)

throw new NullPointerException();

this.callable = callable;

this.state = NEW; // ensure visibility of callable

}

public FutureTask(Runnable runnable, V result) {

this.callable = Executors.callable(runnable, result);

this.state = NEW; // ensure visibility of callable

}

FutureTask可以接受Callable或者Runnable,state從NEW開始。如果是Runnable,需要調用Executors.callable轉成Callable,傳回的結果是預先傳入的result。轉換過程使用一個實作了Callable的RunnableAdapter包裝Runnable和result,代碼比較簡單。

static final class RunnableAdapter implements Callable {

final Runnable task;

final T result;

RunnableAdapter(Runnable task, T result) {

this.task = task;

this.result = result;

}

public T call() {

task.run();

return result;

}

}

送出FutureTask到線程池的submit定義在AbstractExecutorService,根據入參的不同,有三個submit方法。下面以送出Callable為例:

public Future submit(Callable task) {

if (task == null) throw new NullPointerException();

RunnableFuture ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

protected RunnableFuture newTaskFor(Callable callable) {

return new FutureTask(callable);

}

FutureTask在newTaskFor建立,然後調用線程池的execute執行,最後傳回Future。擷取Future後,就可以調用get擷取結果,或者調用cancel取消任務。

FutureTask的運作

FutureTask實作了Runnable,線上程池裡執行時調用的方法是run。

public void run() {

//1

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))

return;

//2

try {

Callable c = callable;

if (c != null && state == NEW) {

V result;

boolean ran;

try {

result = c.call();

ran = true;

} catch (Throwable ex) {

result = null;

ran = false;

setException(ex);

}

if (ran)

set(result);

}

} finally {

//3

runner = null;

int s = state;

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

}

标記1處檢查FutureTask的狀态,如果不是處于NEW,說明狀态已經進入四條路徑之一,也就沒有必要繼續了。如果狀态是NEW,則将執行任務的線程交給runner。

标記2處開始正式執行任務,調用call方法擷取結果,沒有異常就算成功,最後執行set方法;出現異常就調用setException方法。

标記3處,無論任務執行是否成功,都需要将runner重新置為空。

protected void set(V v) {

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = v;

UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

finishCompletion();

}

}

protected void setException(Throwable t) {

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = t;

UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

finishCompletion();

}

}

任務執行成功與失敗,分别對應NEW -> COMPLETING -> NORMAL和NEW -> COMPLETING -> EXCEPTIONAL兩條路徑。這裡先将狀态修改為中間狀态,再對結果指派,最後再修改為最終狀态。

private void finishCompletion() {

// assert state > COMPLETING;

for (WaitNode q; (q = waiters) != null;) {

if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

for (;;) {

Thread t = q.thread;

if (t != null) {

q.thread = null;

LockSupport.unpark(t);

}

WaitNode next = q.next;

if (next == null)

break;

q.next = null; // unlink to help gc

q = next;

}

break;

}

}

done();

callable = null; // to reduce footprint

}

最後調用finishCompletion執行任務完成,喚醒并删除所有在waiters中等待的線程。done方法是空的,供子類實作,最後callable也設定為空。

FutureTask還有個runAndReset,邏輯和run類似,但沒有調用set方法來設定結果,執行完成後将任務重新初始化。

protected boolean runAndReset() {

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return false;

boolean ran = false;

int s = state;

try {

Callable c = callable;

if (c != null && s == NEW) {

try {

c.call(); // don't set result

ran = true;

} catch (Throwable ex) {

setException(ex);

}

}

} finally {

// runner must be non-null until state is settled to

// prevent concurrent calls to run()

runner = null;

// state must be re-read after nulling runner to prevent

// leaked interrupts

s = state;

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

return ran && s == NEW;

}

FutureTask的取消

對于已經送出執行的任務,可以調用cancel執行取消。

public boolean cancel(boolean mayInterruptIfRunning) {

//1

if (!(state == NEW &&

UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

return false;

try { // in case call to interrupt throws exception

//2

if (mayInterruptIfRunning) {

try {

Thread t = runner;

if (t != null)

t.interrupt();

} finally { // final state

UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);

}

}

} finally {

finishCompletion();

}

return true;

}

标記1處判斷任務狀态,為NEW才能被取消。如果mayInterruptIfRunning是true,代表任務需要被中斷,走NEW -> INTERRUPTING -> INTERRUPTED流程。否則代表任務被取消,走NEW -> CANCELLED流程。

标記2處理任務被中斷的情況,這裡僅僅是對線程發出中斷請求,不確定任務能檢測并進行中斷,詳細原理去看Java的中斷機制。

最後調用finishCompletion完成收尾工作。

public boolean isCancelled() {

return state >= CANCELLED;

}

判斷任務是否被取消,具體邏輯是判斷state >= CANCELLED,包括了被中斷一共兩條路徑的結果。

FutureTask擷取結果

調用FutureTask的get方法擷取任務的執行結果,可以阻塞直到擷取結果,也可以限制範圍時間内擷取結果,否則抛出TimeoutException。

public V get() throws InterruptedException, ExecutionException {

int s = state;

if (s <= COMPLETING)

s = awaitDone(false, 0L);

return report(s);

}

public V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

if (unit == null)

throw new NullPointerException();

int s = state;

if (s <= COMPLETING &&

(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

throw new TimeoutException();

return report(s);

}

get的核心實作調用了awaitDone,入參為是否開啟時間限制和最大的等待時間。

private int awaitDone(boolean timed, long nanos)

throws InterruptedException {

final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;

boolean queued = false;

for (;;) {

if (Thread.interrupted()) {

removeWaiter(q);

throw new InterruptedException();

}

int s = state;

if (s > COMPLETING) { //1

if (q != null)

q.thread = null;

return s;

}

else if (s == COMPLETING) // cannot time out yet //2

Thread.yield();

else if (q == null) //3

q = new WaitNode();

else if (!queued) //4

queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

q.next = waiters, q);

else if (timed) { //5

nanos = deadline - System.nanoTime();

if (nanos <= 0L) {

removeWaiter(q);

return state;

}

LockSupport.parkNanos(this, nanos);

}

else //6

LockSupport.park(this);

}

}

awaitDone主要邏輯是一個無限循環,首先判斷線程是否被中斷,是的話移除waiter并抛出中斷異常。接下來是一串if-else,一共六種情況。

判斷任務狀态是否已經完成,是就直接傳回;

任務狀态是COMPLETING,代表在set結果時被阻塞了,這裡先讓出資源;

如果WaitNode為空,就為目前線程初始化一個WaitNode;

如果目前的WaitNode還沒有加入waiters,就加入;

如果是限定時間執行,判斷有無逾時,逾時就将waiter移出,并傳回結果,否則阻塞一定時間;

如果沒有限定時間,就一直阻塞到下次被喚醒。

LockSupport是用來建立鎖和其他同步類的基本線程阻塞原語。park和unpark的作用分别是阻塞線程和解除阻塞線程。

private V report(int s) throws ExecutionException {

Object x = outcome;

if (s == NORMAL)

return (V)x;

if (s >= CANCELLED)

throw new CancellationException();

throw new ExecutionException((Throwable)x);

}

最後get調用report,使用outcome傳回結果。

java callable 線程池_[懷舊并發05]分析Java線程池Callable任務執行原理

圖3

看圖3,如果多個線程向同一個FutureTask執行個體get結果,但FutureTask又沒有執行完畢,線程将會阻塞并儲存在waiters中。待FutureTask擷取結果後,喚醒waiters等待的線程,并傳回同一個結果。

總結

java callable 線程池_[懷舊并發05]分析Java線程池Callable任務執行原理

圖4

圖4歸納了FutureTask的作用,任務的調用線程Caller和線程池的工作線程通過FutureTask互動。對比線程池的執行原理,FutureTask是比較簡單的。