Jdk1.6 JUC源碼解析(16)-FutureTask
作者:大飛
功能簡介:
- FutureTask是一種異步任務(或異步計算),舉個栗子,主線程的邏輯中需要使用某個值,但這個值需要複雜的運算得來,那麼主線程可以提前建立一個異步任務來計算這個值(在其他的線程中計算),然後去做其他事情,當需要這個值的時候再通過剛才建立的異步任務來擷取這個值,有點并行的意思,這樣可以縮短整個主線程邏輯的執行時間。
- FutureTask也是基于AQS來建構的,使用共享模式,使用AQS的狀态來表示異步任務的運作狀态。
源碼分析:
- 先來看下FutureTask都實作了哪些接口。首先實作了RunnableFuture接口,先看下這個接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture又擴充了Runnable和Future:
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
public interface Future<V> {
/**
* 嘗試取消任務的執行,如果任務已經完成或者已經被取消或者由于某種原因
* 無法取消,方法傳回false。如果任務取消成功,或者任務開始執行之前調用
* 了取消方法,那麼任務就永遠不會執行了。mayInterruptIfRunning參數決定
* 了是否要中斷執行任務的線程。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判斷任務是否在完成之前被取消。
*/
boolean isCancelled();
/**
* 判斷任務是否完成。
*/
boolean isDone();
/**
* 等待,直到擷取任務的執行結果。如果任務還沒執行完,這個方法會阻塞。
*/
V get() throws InterruptedException, ExecutionException;
/**
* 等待,在給定的時間内擷取任務的執行結果。
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Runnable接口經常寫多線程程式的話一定非常熟悉了,這裡不說了。看下Future接口,它提供了取消任務接口,并提供了檢視任務狀态的接口,最重要的是提供了有阻塞行為的擷取任務執行結果的接口。
- 接下來看下FutureTask的實作,由于其基于AQS實作,那先看一下内部的同步機制:
private final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7828117401763700385L;
/** 表示任務正在執行 */
private static final int RUNNING = 1;
/** 表示任務已經運作完畢 */
private static final int RAN = 2;
/** 表示任務被取消 */
private static final int CANCELLED = 4;
/** 内部的callable */
private final Callable<V> callable;
/** 執行結果 */
private V result;
/** 執行過程中發生的異常 */
private Throwable exception;
/**
* 執行目前任務的線程。在set/cancel之後置空,說明可以
* 了。必須使用volatile來修飾,以確定任務完成後的可見性。
*/
private volatile Thread runner;
Sync(Callable<V> callable) {
this.callable = callable;
}
内部同步器接中使用了一個callable來儲存要執行的任務,看下這個接口:
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
這個接口的行為和Runnable類似,不同的是有傳回值,且能抛出異常,算是對Runnable的補充。
繼續看下同步器的innerRun方法,這個方法用于支援FutureTask的run方法:
void innerRun() {
//嘗試設定任務運作狀态為正在執行。
if (!compareAndSetState(0, RUNNING))
return; //如果設定失敗,直接傳回。
try {
runner = Thread.currentThread(); //設定執行線程。
if (getState() == RUNNING) //再次檢測任務狀态
innerSet(callable.call()); //執行任務,然後設定執行結果。
else
releaseShared(0); //說明任務已取消。
} catch (Throwable ex) {
innerSetException(ex); //如果執行任務過程中發生異常,設定異常。
}
}
看下設定執行結果的innerSet方法:
void innerSet(V v) {
for (;;) {
int s = getState(); //擷取任務執行狀态。
if (s == RAN)
return; //如果任務已經執行完畢,退出。
if (s == CANCELLED) {
//這裡釋放AQS控制權并設定runner為null,
//為了避免正在和一個試圖中斷線程的取消請求競
releaseShared(0);
return;
}
//嘗試将任務狀态設定為執行完成。
if (compareAndSetState(s, RAN)) {
result = v; //設定執行結果。
releaseShared(0); //釋放AQS控制權。
done(); //這裡調用一下done方法,子類可覆寫這個方法,做一些定制處理。
return;
}
}
}
AQS分析過,releaseShared方法中會調用tryReleaseShared方法,看一下目前同步器中這個方法的實作:
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}
innerRun方法中在執行抛異常後會調用innerSetException:
void innerSetException(Throwable t) {
for (;;) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
exception = t;
result = null;
releaseShared(0);
done();
return;
}
}
}
過程和innerSet類似,隻不過最後要設定異常,清空result。
和innerRun類似還有innerRunAndReset方法,看下實作:
boolean innerRunAndReset() {
if (!compareAndSetState(0, RUNNING))
return false;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
callable.call(); // don't set result
runner = null;
return compareAndSetState(RUNNING, 0);
} catch (Throwable ex) {
innerSetException(ex);
return false;
}
}
和innerRun的差別是不設定執行結果,最後執行完畢後重置異步任務狀态為0。
再看下同步器的innerGet方法,這個方法用于支援FutureTask的get方法:
V innerGet() throws InterruptedException, ExecutionException {
//擷取共享鎖,無法擷取時阻塞等待。
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException(); //如果任務狀态為取消,那麼抛出CancellationException
if (exception != null)
throw new ExecutionException(exception);//如果任務執行異常,抛出ExecutionException,并傳遞異常。
return result; //成功執行完成,傳回執行結果。
}
類似的有帶逾時的innerGet:
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
AQS分析過,acquireSharedInterruptibly和tryAcquireSharedNanos方法中會調用tryAcquireShared方法,看一下目前同步器中這個方法的實作:
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}
private boolean ranOrCancelled(int state) {
return (state & (RAN | CANCELLED)) != 0;
}
可見innerGet中會首先判斷任務是否完成,要依據任務(完成或取消)狀态來判斷。
最後看下同步器的innerCancel方法,這個方法用于支援FutureTask的cancel方法:
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false; //如果任務已經執行完畢或者取消。
if (compareAndSetState(s, CANCELLED))//否則嘗試設定任務狀态為取消。
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt(); //如果設定了mayInterruptIfRunning為true,需要中斷線程,
}
releaseShared(0); //釋放AQS的控制權。
done(); //這裡也會調用done,定制子類時需要注意下。
return true;
}
- 有了内部同步機制,FutureTask的實作起來就很容易了,看下代碼:
public class FutureTask<V> implements RunnableFuture<V> {
/** 内部同步器 */
private final Sync sync;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
public boolean isCancelled() {
return sync.innerIsCancelled();
}
public boolean isDone() {
return sync.innerIsDone();
}
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}
/**
* 本類中是一個空實作,子類可以覆寫這個方法,做回調或一些記錄工作。
* 可以來實作裡面通過任務狀态來判斷任務是否被取消。
*/
protected void done() { }
protected void set(V v) {
sync.innerSet(v);
}
protected void setException(Throwable t) {
sync.innerSetException(t);
}
public void run() {
sync.innerRun();
}
protected boolean runAndReset() {
return sync.innerRunAndReset();
}
...
實作都是基于上面分析過的方法,這裡不啰嗦了。注意構造方法中有一個Runnable到callable的轉換,使用了Executors中的方法,這個類後續會分析,這裡簡單看一下:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
就是簡單的通過一個适配類在适配Runnable和Callable。 小總結一下: 1.目前線程建立異步任務後,異步任務處于初始狀态(内部有一個數值表示狀态,初始為0),一般交由其他線程執行任務(比如送出給線程池處理)。目前線程通過異步任務的get方法來擷取執行結果時,如果異步任務此時還沒執行完畢(内部狀态既不是完成,也不是取消),那麼目前線程會在get方法處阻塞。 2.當其他線程(比如線程池中的工作線程)執行了異步任務,那麼會将異步任務的狀态改成"完成"(根據情況也可能是取消),同時将在get除等待的線程喚醒。
FutureTask的代碼解析完畢!
參見:Jdk1.6 JUC源碼解析(6)-locks-AbstractQueuedSynchronizer