天天看點

java Future,FutureTask,Callable詳解

#Future

Future的作用就是在主線程之外另外起一個線程去做另一件事,讓java同時可以并行處理。并且提供方法随時去檢視Future的是否完成,并取得任務的結果。本篇文章會詳細講解流程和每個方法。

先寫一個小例子:

這個例子中要做3件事,其中job2和job3各耗時2000毫秒,而Future可以讓他們同時進行

long a = System.currentTimeMillis();
  System.out.println("do job1");
  Callable<String> callable = () -> { //子線程任務,實作Callable<T> 接口,傳回值為T
        Thread.sleep(2000);
        System.out.println("do job2");
        return "job2 is done";
   };
  FutureTask<String> task = new FutureTask<>(callable); //new 一個FutureTask 将任務引用傳遞進去
  new Thread(task).start();  //啟動Future任務
  Thread.sleep(2000);  
  System.out.println("do job3");
  while (!task.isDone()) {  //循環判斷Future任務是否完成了
  Thread.onSpinWait(); //相當于Thread.sleep(); 不過效率更高
  }
  System.out.println("job2結果 => " + task.get() + "時間:" + (System.currentTimeMillis() - a));
           

結果

do job1
do job2
do job3
job2結果 => job2 is done時間:2006
           

可以看到Future确實讓任務并行處理了。

我們這裡使用了isDone()方法查詢計算是否完成了,下面細節講解運作原理和它提供的方法。

#Future提供的方法

FutureTask實作了Future接口,該接口提供了以下方法:

  • boolean cancel(boolean mayInterruptIfRunning);//取消計算
  • boolean isCancelled(); //判斷計算是否被取消。
  • boolean isDone(); //判斷計算是否結束。
  • V get();//擷取結果,如果計算沒結束會一直堵塞到結束。
  • V get(long timeout, TimeUnit unit) ;//擷取結果,如果計算沒結束最多等待一定時間。

#運作原理和流程

1.首先先看下FutureTask 有哪些屬性呢

  • 運作狀态的判斷:
private volatile int state; //存儲狀态的變量 使用volatile
    private static final int NEW          = 0; //代表是剛剛建立沒進行如何操作
    private static final int COMPLETING   = 1; //正在計算中
    private static final int NORMAL       = 2; //正常
    private static final int EXCEPTIONAL  = 3; //發生了異常
    private static final int CANCELLED    = 4; //被取消了
    private static final int INTERRUPTING = 5; //中斷中
    private static final int INTERRUPTED  = 6; //以被中斷
           
  • 其他屬性
/** The underlying callable; nulled out after running */
    private Callable<V> callable; //我們送出的任務
    /** The result to return or exception to throw from get() */
    private Object outcome; //存儲結果 或者異常
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner; //記錄運作的線程
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters; //等待結果的隊列
           

2.下面講解運作原理和流程

首先我們建立了一個Callable 記錄我們任務的方法,然後new FutureTask 會把Callable指派給callable屬性,将state 設定為NEW也就是0。然後啟動線程調用run方法

public void run() {
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread())) //如果狀态不是new或者将runner屬性指派為目前線程時失敗就直接傳回,這裡表明了我們的任務隻能啟動一次。
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) { // callable 不為空,狀态為new才繼續計算
                V result;
                boolean ran;//flag記錄是否計算成功
                try {
                    result = c.call();//調用我們的方法
                    ran = true;//完成了 把flag設定為ture
                } catch (Throwable ex) {  //有異常 結果設定為空 flag設為false
                    result = null;
                    ran = false;
                    setException(ex);  //處理異常的方法
                }
                if (ran)  //計算成功
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;  
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING) //如果有其它線程在中斷任務會等到中斷結束
                handlePossibleCancellationInterrupt(s);  
        }
    }
           

###先講解下CAS 這裡面好多CAS操作

RUNNER.compareAndSet(this, null, Thread.currentThread())這裡VarHandle 來提供CAS操作,我這裡是jdk11,你們的版本可能是用unsafe來操作的。他們的作用是一樣的,就是先擷取某個屬性相對對象的記憶體位址偏移值,然後用CAS方式修改屬性的值,因為一個對象屬性在記憶體中的排列是固定的,這樣隻要有對象的位址和屬性的偏移值就能定位屬性在記憶體的位址。這裡的RUNNER 是這樣來的,可以看到定位了FutureTask這個類的runner屬性,類型是Thread

MethodHandles.Lookup l = MethodHandles.lookup();
 RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
           

RUNNER.compareAndSet(A,B,C)方法:修改A對象的RUNNER所代表的屬性,如果目前值是B就修改為C,如果失敗會一直重試直到,目前屬性不是B了或者修改成功。

使用unsafe也是一樣的,隻不過不會去先取得VarHandle這個句柄,而是在調用時将屬性偏移值一起傳遞進去。

if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread())) 
           

是以意思計算如果狀态不是new或者将runner屬性指派為目前線程時失敗

###繼續

####發生異常

後面就是調用我們自己寫的方法了,如果發送了異常我們看下

protected void setException(Throwable t) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = t;
            STATE.setRelease(this, EXCEPTIONAL); // final state
            finishCompletion();//喚起所有等待線程
        }
    }
           

就是先發狀态設定為計算中,然後記錄下異常,在把狀态設定為EXCEPTIONAL異常,注意setRelease方法表示設定值後這個值不能再被修改。

####計算完成

protected void set(V v) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            outcome = v;
            STATE.setRelease(this, NORMAL); // final state
            finishCompletion();
        }
    }
           

其實和發送異常是一樣的,隻是記錄狀态不一樣

都會調finishCompletion()方法

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) { //方法内變量  記錄連結清單 
            if (WAITERS.weakCompareAndSet(this, q, null)) {//将屬性的設定為空
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();//空方法 ,給使用者擴充用

        callable = null;        // to reduce footprint
    }
           

這裡的WaitNode 是一個連結清單,記錄使用等待結果的線程。上面說的get方法會堵塞直到計算完成,被堵塞的線程就會存儲到這個連結清單。

for (WaitNode q; (q = waiters) != null;) { //方法内變量  記錄連結清單 
            if (WAITERS.weakCompareAndSet(this, q, null)) {//将屬性的設定為空
        //這裡可能會有新的隊列
           

這裡會一直循環,防止将waiters屬性指派為空之後又有新的線程加入到隊列中

然後循環連結清單把線程喚起使用 LockSupport.unpark(t),這個方法點進去可以發現其實用的是前面提到的Unsafe,juc的包很多都是用Unsafe實作的。

#下面講解各個方法

###isDone

public boolean isDone() {
        return state != NEW;
    }
           

非常簡單。。。

###get

/**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
           

兩個重載方法,一個傳入了最大等待時間

如果狀态的<= COMPLETING 也就是NEW 和COMPLETING 就進入等待的方法

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;  //記錄目前線程 
        boolean queued = false;  //記錄是否把目前線程加入到了隊列
        for (;;) {
            int s = state;
            if (s > COMPLETING) { //大于計算中,代表完成||中斷||取消 直接傳回 
                if (q != null)
                    q.thread = null; //目前線程設定為空,finishCompletion() 會判斷 連結清單隊列裡的線程是否為空 
                return s;
            }
            else if (s == COMPLETING) //如果在計算中 放開cpu資源 
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) { //如果目前線程被中斷 
                removeWaiter(q); //從等待隊列中移除目前線程對象
                throw new InterruptedException(); 
            }
            else if (q == null) {  //這裡隻有第一次循環會進入
                if (timed && nanos <= 0L) //如果是等待一段時間,并且時間小于等于0 直接退出了 
                    return s;
                q = new WaitNode(); //new 一個連結清單元素  這裡new的時候會把目前線程對象放進這個元素裡面 
            }
            else if (!queued) //如果沒加入 FutureTask的連結清單裡就加入 
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); //把waiters 屬性設定成q的下一個 替換 waiters 屬性為q
            else if (timed) { //設定了等待時間才會進入
                final long parkNanos; //記錄時間
                if (startTime == 0L) { // first time  第一次進入記錄開始時間
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else { //第二次進入
                    long elapsed = System.nanoTime() - startTime; //加上 剛才使用的時間
                    if (elapsed >= nanos) {//如果已經超過等待時間直接退出了
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed; //計算還要等待的時間
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)//重新check 下狀态
                    LockSupport.parkNanos(this, parkNanos); //挂起線程剩下的時間  這裡是毫秒
            }
            else
                LockSupport.park(this); // 不設定時間,挂起線程 
        }
    }
           

梳理下流程,

1.先判斷狀态

2.把線程加入等待隊列

3. 有等待時間就 park 那麼多時間

4. 沒等待時間就直接 park

5. 其它線程喚醒 重新判斷狀态

這個方法會傳回這個狀态,計算等待結束後的狀态。可能是完成,異常等。然後會掉report方法

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
           

正常就傳回結果,否則會抛個異常出去。使用我們自己寫的方法出現了異常最終會在這裡抛出來。

###isCancelled

public boolean isCancelled() {
        return state >= CANCELLED;
    }
           

也很簡單了

###cancel

要注意一點,java是不能直接強制停掉線程的。這裡取消是使用中斷的方式 interrupt ,然後把狀态修改為INTERRUPTING或者CANCELLED

中斷一個線程并不會停掉線程,隻是告訴線程你要停掉了,被中斷的線程可以使用Thread.interrupted() 來檢視我是不是被中斷了,另外線程在堵塞時如果被中斷會從堵塞中恢複抛出一個中斷異常InterruptedException。

public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW && STATE.compareAndSet
              (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    STATE.setRelease(this, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();//喚醒等待隊列
        }
        return true;
    }

           

取消隻會在狀态為NEW時才有效,但是在任務計算完之前 狀态都是NEW

run方法的狀态判斷

if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread())) //如果狀态不是new或者将runner屬性指派為目前線程時失敗就直接傳回,這裡表明了我們的任務隻能啟動一次。
            return;
           

是以你送出的任務并不會在計算時停掉,如果在計算的run方法的狀态判斷前那麼不會計算。

如果在判斷後,那麼計算還是會進行,但結果不會存儲,并且在get時會抛這個異常。

if (s >= CANCELLED)
            throw new CancellationException();
           

這個方法的參數mayInterruptIfRunning說一下。

  • ture 代表會中斷線程,狀态會改為INTERRUPTING 并且中斷interrupt() 中斷後會把狀态改成INTERRUPTED
  • flase 則隻是會 把狀态修改為CANCELLED

#最後

其實裡面的方法不是很難,重點是在并發時的處理和設計,比如get的同時取消,多個線程同時get等,多個方法通過共享變量來通信和協調。這個jdk的類,是以可以當作一個标準優秀的案例來參考和學習。這是作者第一次寫文章,如果有不對的地方大家一定要提出來。