天天看點

并發程式設計-FutureTask解析 | 京東物流技術團隊

1、FutureTask對象介紹

Future對象大家都不陌生,是JDK1.5提供的接口,是用來以阻塞的方式擷取線程異步執行完的結果。

在Java中想要通過線程執行一個任務,離不開Runnable與Callable這兩個接口。

Runnable與Callable的差別在于,Runnable接口隻有一個run方法,該方法用來執行邏輯,但是并沒有傳回值;而Callable的call方法,同樣用來執行業務邏輯,但是是有一個傳回值的。

Callable執行任務過程中可以通過FutureTask獲得任務的執行狀态,并且可以在執行完成後通過Future.get()方式擷取執行結果。

Future是一個接口,而FutureTask就是Future的實作類。并且FutureTask實作了 RunnableFuture(Runnable + Future),說明我們可以建立一個FutureTask并直接把它放到線程池執行,然後擷取FutureTask的執行結果。

2、FutureTask源碼解析

2.1 主要方法和屬性

那麼FutureTask是如何通過阻塞的方式來擷取到異步線程執行的結果的呢?我們看下FutureTask中的屬性。

java複制代碼// FutureTask的狀态及其常量
private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    // callable對象,執行完後置空
    private Callable<V> callable;
    // 要傳回的結果或要引發的異常來自 get() 方法
    private Object outcome; // non-volatile, protected by state reads/writes
    // 執行Callable的線程
    private volatile Thread runner;
    // 等待線程的一個連結清單結構
    private volatile WaitNode waiters;


           

FutureTask中幾個比較重要的方法。

java複制代碼// 取消任務的執行
boolean cancel(boolean mayInterruptIfRunning);
// 傳回任務是否已經被取消
boolean isCancelled();
// 傳回任務是否已經完成,任務狀态不為NEW即為完成
boolean isDone();
// 通過get方法擷取任務的執行結果
V get() throws InterruptedException, ExecutionException;
// 通過get方法擷取任務的執行結果,帶有逾時,如果超過給定時間則抛出異常
V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;


           

2.2 FutureTask執行

當我們線上程池中執行一個Callable方法時,其實是将Callable任務封裝成一個RunnableFuture對象去執行,同時将這個RunnableFuture對象傳回,這樣我們就拿到了FutureTask的引用,可以随時擷取到任務執行的狀态,并且可以在任務執行完成後通過該對象擷取執行結果。

以下為ThreadPoolExecutor線程池送出一個callable方法的源碼。

scss複制代碼public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
	
	protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }


           

2.3 run方法介紹

RunnableFuture其實也是一個可以執行的runnable,我們看下他的run方法。其主要流程就是執行call方法,正常執行完畢後将result結果指派到outcome屬性上。

ini複制代碼public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            // 将callable指派到本地變量
            Callable<V> c = callable;
            // 判斷callable不為空并且FutureTask的狀态必須為新建立
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 執行call方法(使用者自己實作的call邏輯),并擷取到result結果
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 如果執行過程出現異常,則将異常對象指派到outcome上
                    setException(ex);
                }
                // 如果正常執行完畢,則将result指派到outcome屬性上
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }


           

以下邏輯為正常執行完成後指派的邏輯。

kotlin複制代碼// 如果任務沒有被取消,将future執行完的傳回值指派給result結果
// FutureTask任務的執行狀态是通過CAS的方式進行指派的,并且由此可知,COMPLETING其實是一個瞬時狀态
// 當将線程執行結果指派給outcome後,狀态會修改為對應的NORMAL,即正常結束
protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }


           

以下為執行異常時指派邏輯,直接将Throwable對象指派到outcome屬性上。

scss複制代碼protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }


           

無論是正常執行還是異常執行,最終都會調用一個finishCompletion方法,用來做工作的收尾工作。

2.4 get方法介紹

Future的get方法有兩個重載的方法,一個是get()擷取結果,一個是get(long, TimeUnit)帶有逾時時間的擷取結果,我們看下FutureTask中的這兩個方法是如何實作的。

java複制代碼// 不帶有逾時時間,一直阻塞直到擷取結果
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            // 等待結果完成,帶有逾時的get方法也是調用的awaitDone方法
            s = awaitDone(false, 0L);
        // 傳回結果
        return report(s);
    }

// 帶有逾時時間的擷取結果,如果超過時間還沒有擷取到結果則抛出異常
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        // 如果任務未中斷,調用awaitDone方法等待任務結果
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        // 傳回結果
        return report(s);
    }


           

我們主要看下awaitDone方法的執行邏輯。此方法會通過for循環的方式一直阻塞等待任務執行完成。如果帶有逾時時間,則超過截止時間後會直接傳回。

ini複制代碼// timed:是否需要逾時擷取
// nanos:逾時時間機關納秒
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        // 此方法會一直for循環判斷任務狀态是否已經完成,是Future.get阻塞的原因
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 任務狀态大于COMPLETING,則表明任務結束,直接傳回
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                // Thread.yield() 方法,使目前線程由執行狀态,變成為就緒狀态,讓出cpu時間,在下一個線程執行時候,此線程有可能被執行,也有可能沒有被執行。
                // COMPLETING狀态為瞬時狀态,任務執行完成,要麼是正常結束,要麼異常結束,後續會被置為NORMAL或者EXCEPTIONAL
                Thread.yield();
            else if (q == null)
                // 每調用一次get方法,都會建立一個WaitNode等待節點
                q = new WaitNode();
            else if (!queued)
                // 将該等待節點添加到連結清單結構waiters中,q.next = waiters 即在waiters的頭部插入
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果方法帶有逾時判斷,則判斷目前時間是否已經超過了截止時間,如果超過了及截止日期,則退出循環直接傳回目前狀态,此時任務狀态一定是NEW
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }


           

我們在看下report方法,在調用get方法時是如何傳回結果的。

這裡首先擷取outcome的值,并判斷任務是否已經執行完成,如果執行完成,則将outcome對象強轉成泛型指定的類型;如果任務被取消了,則抛出一個CancellationException異常;如果都不是,則說明任務在執行過程中發生了異常,此時任務狀态位EXCEPTIONAL,此時的outcome即為Throwable對象,是以将outcome強轉為Throwable并抛出異常。

由此可以知道,我們将一個FutureTask任務submit到線程池中執行的時候,如果發生了異常,是會在調用get方法的時候抛出的。

java複制代碼private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }


           

2.5 cancel方法介紹

cancel方法用于取消正在運作的任務,如果任務取消成功,則傳回TRUE,如果取消失敗則傳回FALSE。

java複制代碼// mayInterruptIfRunning:允許中斷正在運作的任務
public boolean cancel(boolean mayInterruptIfRunning) {
        // mayInterruptIfRunning如果為true則将狀态置為INTERRUPTING,如果未false則将狀态置為CANCELLED
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        // 如果狀态修改成功後,判斷是否允許中斷線程,如果允許,則調用Thread的interrupt方法中斷
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 取消後的收尾工作
            finishCompletion();
        }
        return true;
    }


           

2.6 isDone/isCancelled方法介紹

isDone方法用于判斷FutureTask是否已經完成;isCancelled方法用來判斷FutureTask是否已經取消,這兩個方法都是通過狀态位來判斷的。

typescript複制代碼public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }


           

2.7 finishCompletion方法介紹

我們看下finishCompletion方法都做了哪些工作。

ini複制代碼// 删除所有等待線程并發出信号,最後執行done方法
private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }


           

我們看到done方法是一個受保護的空方法,此處沒有任何邏輯,由其子類去根據自己的業務去實作相應的邏輯。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。

csharp複制代碼protected void done() { }

           

3、總結

通過源碼解讀可以了解到Future的原理:

第一步:主線程将任務封裝成一個Callable對象,通過submit方法送出到線程池去執行。

第二步:線程池執行任務的run方法,主線程則可以繼續執行其他邏輯。

第三步:線程池中方法執行完成後将結果指派到outcome屬性上,并修改任務狀态。

第四步:主線程在需要拿到異步任務結果的時候,主動調用fugure.get()方法來擷取結果。

第五步:如果異步線程在執行過程中發生異常,則會在調用future.get()方法的時候抛出來。

以上就是對于FutureTask的分析,我們可以了解FutureTask任務執行的方式以及Future.get已阻塞的方式擷取線程執行的結果原理,并且從代碼中可以了解FutureTask的任務執行狀态以及狀态的變化過程。

繼續閱讀