并發程式設計Callable使用及源碼分析
一.帶傳回值的任務Callable介紹
在日常開發中,為了提高應用的處理能力我們經常會使用到線程Executors處理任務,在使用線程池時避免不了需要調用線程池的execute(Runnable)或submit(Callable);
對于大多數開發人員來說Runable接口并不陌生,實作起來也非常簡單,隻要實作Runnable接口重寫run()方法即可。在日常的開發中使用最多的也是這種方式。但是如果我們想擷取一個任務執行後的傳回結果那麼Runnable接口可能就無法滿足我們的需求了,這時就需要使用Callable接口重寫call()方法;
二.Callable接口的基本使用和原理分析
1.demo示範
在分析Callable接口在并發環境下的執行流程之前我們先以一個簡單的案例來熟悉一下它的使用:
ExecutorService es = Executors.newSingleThreadExecutor();//建立線程池
Future<?> task = es.submit(new MyThread());//這裡的MyThread就是一個簡單的實作了Callable接口的實作類
try {
//指定擷取任務接口的時間,也可以不指定時間後面會對這個這個進行進行分析
task.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
//逾時觸發線程中止
System.out.println("thread over time");
} catch (ExecutionException e) {
//抛出執行異常
throw e;
} finally {
//如果任務還在運作,執行中斷
boolean mayInterruptIfRunning = true;
task.cancel(mayInterruptIfRunning);
}
上面代碼是Future的一個簡單例子:MyThread實作Callable接口,執行時要求在限定時間内擷取結果,逾時執行會抛TimeoutException,執行異常會抛出ExecutionException。最後在finally裡,如果任務還在執行,就進行取消;如果任務已經執行完,取消操作也沒有影響。
2.Future接口分析
從上面的簡單案例中我們不難看出,當使用Executors對Callable實作類對象進行送出時需要将Callable對象封裝成一個 RunnableFuture task,并最終将task任務進行傳回(具體源碼後面會進行分析),我們可以從傳回的task對象中擷取call()方法的執行結果,繼承關系關系如下圖所示:
Future接口代表一個異步任務的結果,提供了相應方法判斷任務是否完成或者取消。從上圖可知,RunnableFuture同時繼承了Future和Runnable,是一個可運作、可知結果的任務,FutureTask是具體的實作類。
FutureTask的狀态:
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
FutureTask有7種狀态,初始狀态從NEW開始,7種狀态的轉換過程如下圖所示:
FutureTask的狀态轉換取決于run和cancel的先後調用順序,具體狀态流轉過程如下所示:
NEW -> COMPLETING -> NORMAL 正常的流程
NEW -> COMPLETING -> EXCEPTIONAL 異常的流程
NEW -> CANCELLED 被取消流程
NEW -> INTERRUPTING -> INTERRUPTED 被中斷流程
FutureTask的變量
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
callable是要執行的任務,runner是執行任務的線程,outcome是傳回的結果(正常結果或Exception結果),waiters的資料結構是WaitNode,儲存了Thread和下個WaitNode的引用。waiters儲存了等待結果的線程,每次操作隻會增減頭,是以是一個棧結構,如下所示:
注:waitNode作用,如果一個線程調用FutureTask中的get方法,這個調用FutureTask.get()的線程可能處于阻塞狀态。而waitNode節點就是用來儲存目前的阻塞線程;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
三.執行過程及源碼分析
1.建立線程池送出Callable任務
2. submit(Callable task)方法分析
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
注:從submit()方法的源碼我們可以看出,當送出一個Callable類型的任務後,首先會将Callable對象封裝成一個FutureTask類型的對象(上文已經介紹過類FutureTask相關屬性),從源碼不難看出初始化FutureTask對象的主要工作是完成callable和state屬性的初始化;當然submit方法也支援Runnable對象,隻不過需要先将Runnable對象轉換成Callable對象;
3.封裝futureTask對象執行線程池的execute(ftask)方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
注:execute()方法實際上才是線程池執行任務的入口方法,在我們使用Runnable類型的任務時會執行進入這個方法,execute()方法是主要任務時根據目前線程池的狀态建立核心線程或将任務加入阻塞隊列或建立非核心線程或執行拒絕政策,這裡不做過多介紹,我們主要關注addWorker()這個方法即可,這個方法是線程吃的核心方法;
4.addWorker()方法追蹤
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
注:addWorker()方法首先對task任務和線程池的運次狀态進行校驗,這個方法的核心代碼為 w = new Worker(firstTask),建立了一個Worker對象:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Worker對象本身就是一個Runnable的實作類,同時将自己指派給了目前線程 this.thread = getThreadFactory().newThread(this),這一點十分重要;
5.線程池執行具體task
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
注:到這裡已經非常清晰了,在runWorker(Worker w)方法中調用了 task.run()方法執行具體的任務。這裡需要大家注意:
如果task屬于Runnable則直接調用重寫的run()方法即可。
如果task屬于Callable則會調用FutureTask中的run()方法,如下所示:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
在FutureTask類的run()方法中調用了result = c.call(),這樣就調用我們最開始送出的Callable對象的call()方法,并得到了傳回結果result ,并使用 UNSAFE魔術類的方式在保證原子性的情況下設定目前任務的傳回值 outcome = v,代碼如下所示:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
6.FutureTask擷取結果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
調用FutureTask的get方法擷取任務的執行結果,可以阻塞直到擷取結果,也可以限制範圍時間内擷取結果,否則抛出TimeoutException。
get的核心實作調用了awaitDone,入參為是否開啟時間限制和最大的等待時間。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { //1
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet //2
Thread.yield();
else if (q == null) //3
q = new WaitNode();
else if (!queued) //4
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { //5
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else //6
LockSupport.park(this);
}
}
awaitDone主要邏輯是一個無限循環,首先判斷線程是否被中斷,是的話移除waiter并抛出中斷異常。接下來是一串if-else,一共六種情況。
判斷任務狀态是否已經完成,是就直接傳回;
任務狀态是COMPLETING,代表在set結果時被阻塞了,這裡先讓出資源;
如果WaitNode為空,就為目前線程初始化一個WaitNode;
如果目前的WaitNode還沒有加入waiters,就加入;
如果是限定時間執行,判斷有無逾時,逾時就将waiter移出,并傳回結果,否則阻塞一定時間;
如果沒有限定時間,就一直阻塞到下次被喚醒。
LockSupport是用來建立鎖和其他同步類的基本線程阻塞原語。park和unpark的作用分别是阻塞線程和解除阻塞線程。
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
四.擴充
FutureTask中還封裝了關于取消,中斷,喚醒等方法,大家可以自己嘗試跟蹤一下源碼,看看具體的實作方法。