天天看點

JDK源碼分析-FutureTask

概述

FutureTask 是一個可取消的、異步執行任務的類,它的繼承結構如下:

JDK源碼分析-FutureTask

它實作了 RunnableFuture 接口,而該接口又繼承了 Runnable 接口和 Future 接口,是以 FutureTask 也具有這兩個接口所定義的特征。FutureTask 的主要功能:

1. 異步執行任務,并且任務隻執行一次;

2. 監控任務是否完成、取消任務;

3. 擷取任務執行結果。

下面分析其代碼實作。

代碼分析

分析 FutureTask 的代碼之前,先看下它實作的接口。RunnableFuture 接口定義如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
           

RunnableFuture 接口繼承了 Runnable 接口和 Future 接口,而 Runnable 接口隻有一個 run 方法,這裡不再贅述。下面分析 Future 接口。

Future 接口

Future 接口方法定義如下:

JDK源碼分析-FutureTask

主要方法分析:

/*
 * 嘗試取消執行任務。若任務已完成、已取消,或者由于其他某些原因無法取消,則嘗試失敗。
 * 若成功,且調用該方法時任務未啟動,則此任務不會再運作;
 * 若任務已啟動,則根據參數 mayInterruptIfRunning 決定是否中斷該任務。
 */
boolean cancel(boolean mayInterruptIfRunning);


// 若該任務正常結束之前被取消,則傳回 true
boolean isCancelled();


/*
 * 若該任務已完成,則傳回 true
 * 這裡的“完成”,可能是由于正常終止、異常,或者取消,這些情況都傳回 true
 */
boolean isDone();


// 等待計算完成(如果需要),然後擷取結果
V get() throws InterruptedException, ExecutionException;


// 如果需要,最多等待計算完成的給定時間,然後檢索其結果(如果可用)
// PS: 該方法與前者的差別在于加了逾時等待
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
           

FutureTask 代碼分析

任務的狀态變量:

// 任務的狀态
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
           

其中 state 表示任務的狀态,總共有 7 種,它們之間的狀态轉換可能有以下 4 種情況:

1. 任務執行正常:NEW -> COMPLETING -> NORMAL

2. 任務執行異常:NEW -> COMPLETING -> EXCEPTIONAL

3. 任務取消:NEW -> CANCELLED

4. 任務中斷:NEW -> INTERRUPTING -> INTERRUPTED

示意圖:

JDK源碼分析-FutureTask

在分析其他成員變量之前,先看一個内部嵌套類 WaitNode:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}
           

代碼比較簡單,就是對 Thread 的封裝,可以了解為單連結清單的節點。

其他成員變量:

/** The underlying callable; nulled out after running */
// 送出的任務
private Callable<V> callable;


/** The result to return or exception to throw from get() */
// get() 方法傳回的結果(或者異常)
private Object outcome; // non-volatile, protected by state reads/writes


/** The thread running the callable; CASed during run() */
// 執行任務的線程
private volatile Thread runner;


/** Treiber stack of waiting threads */
// 等待線程的 Treiber 棧
private volatile WaitNode waiters;
           

其中 waiters 是一個 Treiber 棧,簡單來說,就是由單連結清單組成的線程安全的棧,如圖所示:

JDK源碼分析-FutureTask

構造器

// 建立一個 FutureTask 對象,在運作時将執行給定的 Callable
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}


// 建立一個 FutureTask,在運作時執行給定的 Runnable,
// 并安排 get 将在成功完成時傳回給定的結果
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
           

這兩個構造器分别傳入 Callable 對象和 Runnable 對象(适配為 Callable 對象),然後将其狀态初始化為 NEW。

run: 執行任務

public void run() {
    // 使用 CAS 進行并發控制,防止任務被執行多次
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 調用 Callable 的 call 方法執行任務
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                // 異常處理
                result = null;
                ran = false;
                setException(ex);
            }
            // 正常處理
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        // 線程被中斷
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
           

set & setException: 更新狀态值,喚醒棧中等待的線程

protected void set(V v) {
    // CAS 将 state 修改為 COMPLETING,該狀态是一個中間狀态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v; // 輸出結果指派
        // 将 state 更新為 NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}


protected void setException(Throwable t) {
    // CAS 将 state 修改為 COMPLETING,該狀态是一個中間狀态    
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t; // 輸出結果指派
        // 将 state 更新為 EXCEPTIONAL        
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
           

這兩個方法的操作類似,都是更新 state 的值并給傳回結果 outcome 指派,然後執行結束操作 finishCompletion 方法:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        // 将 waiters 置空
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    // 喚醒 WaitNode 封裝的線程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }


    done();
    
    callable = null;        // to reduce footprint
}
           

finishCompletion 方法的作用就是喚醒棧中所有等待的線程,并清空棧。其中的 done 方法實作為空:

protected void done() { }
           

子類可以重寫該方法實作回調功能。

get: 擷取執行結果

// 擷取執行結果(阻塞式)
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 若任務未執行完,則等待它執行完成
    if (s <= COMPLETING)
        // 任務未完成
        s = awaitDone(false, 0L);
    // 封裝傳回結果
    return report(s);
}


// 擷取執行結果(有逾時等待)
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}
           

這兩個方法都是擷取任務執行的結果,原理也基本一樣,差別在于後者有逾時等待(逾時會抛出 TimeoutException 異常)。

awaitDone: 等待任務執行完成

// Awaits completion or aborts on interrupt or timeout.
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 響應線程中斷
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        
        int s = state;
        // s > COMPLETING 表示任務已執行完成(包括正常執行、異常等狀态)
        // 則傳回對應的狀态值
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // s == COMPLETING 是一個中間狀态,表示任務尚未完成
        // 這裡讓出 CPU 時間片
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 執行到這裡,表示 s == NEW,将目前線程封裝為一個 WaitNode 節點
        else if (q == null)
            q = new WaitNode();
        // 這裡表示 q 并未入棧,CAS 方式将當 WaitNode 入棧
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 有逾時的情況
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        // 将目前線程挂起
        else
            LockSupport.park(this);
    }
}
           

該方法的主要判斷步驟如下:

1. 若線程被中斷,則響應中斷;

2. 若任務已完成,則傳回狀态值;

3. 若任務正在執行,則讓出 CPU 時間片;

4. 若任務未執行,則将目前線程封裝為 WaitNode 節點;

5. 若 WaitNode 未入棧,則執行入棧;

6. 若已入棧,則将線程挂起。

以上步驟是循環執行的,其實該方法的主要作用就是:當任務執行完成時,傳回狀态值;否則将目前線程挂起。

removeWaiter: 移除棧中的節點

private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                    continue retry;
            }
            break;
        }
    }
}
           

report 方法:封裝傳回結果

private V report(int s) throws ExecutionException {
    Object x = outcome; // 輸出結果指派
    // 正常結束
    if (s == NORMAL)
        return (V)x;
    // 取消
    if (s >= CANCELLED)
        throw new CancellationException();
    // 執行異常
    throw new ExecutionException((Throwable)x);
}
           

該方法就是對傳回結果的包裝,無論是正常結束或是抛出異常。

cancel: 取消任務

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                // 若允許中斷,則嘗試中斷線程
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
           

場景舉例

FutureTask 适合多線程執行一些耗時的操作,然後擷取執行結果。下面結合線程池簡單分析其用法,示例代碼如下(僅供參考):

public class FutureTaskTest {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<FutureTask<Integer>> taskList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            FutureTask<Integer> futureTask = new FutureTask<>(() -> {
                // 模拟耗時任務
                TimeUnit.SECONDS.sleep(finalI * 2);
                System.out.println(Thread.currentThread().getName() + " 計算中……");
                return finalI * finalI;
            });
            taskList.add(futureTask);
            executorService.submit(futureTask); // 送出到線程池
        }


        System.out.println("任務全部送出,主線程做其他操作");
        // 擷取執行結果
        for (FutureTask<Integer> futureTask : taskList) {
            Integer result = futureTask.get();
            System.out.println("result-->" + result);
        }
        // 關閉線程池
        executorService.shutdown();
    }
}
           

小結

FutureTask 是一個封裝任務(Runnable 或 Callable)的類,可以異步執行任務,并擷取執行結果,适用于耗時操作場景。

參考連結:

http://www.hchstudio.cn/article/2017/2b8f/

https://segmentfault.com/a/1190000016572591

https://www.jianshu.com/p/43dab9b7c25b

JDK源碼分析-FutureTask

繼續閱讀