天天看點

Java8 ForkJoinTask 源碼解析

目錄

1、定義

2、fork 

3、join / quietlyJoin

4、invoke / quietlyInvoke

5、complete / quietlyComplete/ completeExceptionally

6、recordExceptionalCompletion / clearExceptionalCompletion / getException 

7、invokeAll

8、get

9、tryUnfork / reinitialize

      本篇部落格講解ForkJoinTask的fork,join,invoke等方法的實作細節及其使用。

1、定義

       ForkJoinTask表示在ForkJoinPool中執行的一個任務,其類繼承關系如下:

Java8 ForkJoinTask 源碼解析

該類是一個抽象類,有多個子類,如下:

Java8 ForkJoinTask 源碼解析

帶S的幾個大都是ForkJoinTask的内部類,其實作比較簡單,用于将Runnable等接口轉換成ForkJoinTask類,對應于ForkJoinPool的adapt方法,以AdaptedCallable的實作為例說明,如下:

public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
        return new AdaptedCallable<T>(callable);
    }

static final class AdaptedCallable<T> extends ForkJoinTask<T>
        implements RunnableFuture<T> {
        final Callable<? extends T> callable;
        T result;
        AdaptedCallable(Callable<? extends T> callable) {
            if (callable == null) throw new NullPointerException();
            this.callable = callable;
        }
        public final T getRawResult() { return result; }
        public final void setRawResult(T v) { result = v; }

        //改寫核心的exec方法
        public final boolean exec() {
            try {
                result = callable.call();
                return true;
            } catch (Error err) {
                throw err;
            } catch (RuntimeException rex) {
                throw rex;
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
        public final void run() { invoke(); }
        private static final long serialVersionUID = 2838392045355241008L;
    }
           

該類包含的屬性如下:

//儲存ExceptionNode的數組,單個數組元素對應一個ExceptionNode連結清單
 private static final ExceptionNode[] exceptionTable;

 //修改exceptionTable時的鎖
 private static final ReentrantLock exceptionTableLock;
 
 //儲存被清楚掉的弱引用的隊列   
 private static final ReferenceQueue<Object> exceptionTableRefQueue;

 //任務的狀态,初始值為0,大于等于0時表示任務未執行或者正在執行的過程中,小于0表示已執行完成
 volatile int status; // accessed directly by pool and workers
           

  靜态屬性通過static代碼塊初始化,如下:

Java8 ForkJoinTask 源碼解析

  該類定義的常量如下:

//最高位為1
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
    
    //正常完成
    static final int NORMAL      = 0xf0000000;  // must be negative
    
    //任務被取消了
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    
    //任務異常終止
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    
    //某個線程在等待目前任務執行完成,需要在任務結束時喚醒等待的線程
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    
    //擷取低16位的值
    static final int SMASK       = 0x0000ffff;  // short bits for tags

    private static final int EXCEPTION_MAP_CAPACITY = 32;
           

 其中ExceptionNode是一個繼承自WeakReference的内部類,其定義如下:

Java8 ForkJoinTask 源碼解析

多個ExceptionNode通過next屬性構成一個連結清單。

2、fork 

      fork方法并不是如其方法名會fork一個新線程來執行任務,隻是将任務送出到任務隊列中而已,然後立即傳回,不會等待任務執行完成,其實作如下:

//fork方法并不是如其命名會建立一個新線程來執行任務,隻是将任務送出到任務隊列中而已
public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            //如果目前線程是ForkJoinWorkerThread,将其送出到關聯的WorkQueue中
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            //如果是普通線程,則送出到common線程池的任務隊列中
            ForkJoinPool.common.externalPush(this);
        return this;
    }

           

其測試用例如下: 

@Test
    public void test() throws Exception {
        ForkJoinTask task=ForkJoinTask.adapt(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName()+" exit");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //送出任務到common線程池
        task.fork();
        //阻塞等待任務執行完成,get方法會将該任務從任務隊列中pop出來并執行
        //是以run方法列印出來的線程名就是main
        task.get();
        System.out.println("main thread exit");
    }
           

其輸出如下:

Java8 ForkJoinTask 源碼解析

可以在adapt方法對應的AdaptedRunnableAction打斷點,如下:

Java8 ForkJoinTask 源碼解析

然後debug,觀察調用鍊如下:

Java8 ForkJoinTask 源碼解析

3、join / quietlyJoin

     join方法用于阻塞目前線程,等待任務執行完成,部分情形下會通過目前線程執行任務,如果異常結束或者被取消需要抛出異常;quietlyJoin方法隻是阻塞目前線程等待任務執行完成,不會抛出異常;其實作如下:

//join方法等待任務執行完成并傳回結果,如果出現異常則報告異常
public final V join() {
        int s;
        //doJoin方法會阻塞目前線程直到任務執行完成并傳回任務的狀态
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            //如果不是正常完成的,則報告異常
            reportException(s);
        //傳回執行結果,該方法是抽象方法    
        return getRawResult();
    }

public final void quietlyJoin() {
        doJoin(); //隻是等待任務執行完成
    }


private void reportException(int s) {
        if (s == CANCELLED)
            //被取消了
            throw new CancellationException();
        if (s == EXCEPTIONAL)
            //重新抛出異常
            rethrow(getThrowableException());
    }

static void rethrow(Throwable ex) {
        if (ex != null)
            ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
    }

@SuppressWarnings("unchecked") static <T extends Throwable>
        void uncheckedThrow(Throwable t) throws T {
        throw (T)t; // rely on vacuous cast
    }



private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        //status小于0說明任務已結束,直接傳回
        return (s = status) < 0 ? s :
            //如果status大于等于0
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            //如果目前線程是ForkJoinWorkerThread,執行tryUnpush,傳回true以後執行doExec
            //如果tryUnpush傳回false或者doExec傳回大于等于0,則執行awaitJoin
            //如果this在任務隊列的頂端,tryUnpush會将其pop出來,傳回true,否則傳回false
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            //awaitJoin也是阻塞目前線程,直到任務執行完成
            wt.pool.awaitJoin(w, this, 0L) :
            //如果目前線程是普通的Java線程
            externalAwaitDone();
    }


//執行任務,doExec方法的傳回值取決于exec方法,如果exec傳回true,則doExec傳回值小于0
//如果傳回false,則doExec傳回值大于等于0
final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                //exec是子類實作的方法
                completed = exec();
            } catch (Throwable rex) {
                //執行異常
                return setExceptionalCompletion(rex);
            }
            if (completed)
                //正常完成
                s = setCompletion(NORMAL);
        }
        return s;
    }

//阻塞普通Java線程等待任務執行完成
 private int externalAwaitDone() {
        int s = ((this instanceof CountedCompleter) ? // try helping
                 //如果是CountedCompleter,則通過externalHelpComplete方法阻塞目前線程等待任務完成
                 ForkJoinPool.common.externalHelpComplete(
                     (CountedCompleter<?>)this, 0) :
                 //如果是普通的ForkJoinTask,則通過tryExternalUnpush嘗試将其從任務隊列中pop出來,如果該任務位于任務隊列頂端則pop成功并傳回true
                 //pop成功後執行doExec方法,即通過目前線程完成任務
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            do {
                //修改status,加上SIGNAL辨別,表示有線程等待了
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0) { //再次校驗狀态
                            try {
                                //0表示無期限等待,直到被喚醒
                                //任務執行完成,可通過setCompletion方法喚醒等待的線程
                                wait(0L);
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        else
                            //任務已執行完成,則喚醒所有等待的線程
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);
            if (interrupted)
                //等待時被中斷,将目前線程标記為已中斷
                Thread.currentThread().interrupt();
        }
        return s;
    }
           

其測試用例如下:

@Test
    public void test() throws Exception {
        ForkJoinTask task=ForkJoinTask.adapt(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName()+" exit");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //送出任務到common線程池
        task.fork();
        //阻塞等待任務執行完成,join方法會将該任務從任務隊列中pop出來并執行
        //是以run方法列印出來的線程名就是main
        task.join();
        System.out.println("main thread exit");
    }
           

4、invoke / quietlyInvoke

      invoke會立即執行目前任務,如果doExec方法傳回值大于等于0說明還有其他的子任務未完成,則等待其他子任務執行完成,典型的應用場景就是CountedCompleter,RecursiveAction和RecursiveTask通常doExec傳回值小于0,會在compute方法即執行exec方法時等待所有的子任務執行完成;quietlyInvoke和invoke 都是基于doInvoke實作,差別在于前者不關心執行的結果,不會抛出異常。其實作如下:

public final V invoke() {
        int s;
        //通過doExec立即執行任務,如果任務未完成則等待
        if ((s = doInvoke() & DONE_MASK) != NORMAL)
            reportException(s); //任務被取消或者異常終止則抛出異常
        return getRawResult();
    }

 public final void quietlyInvoke() {
        doInvoke(); //不需要抛出異常
    }

private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        //直接調用doExec方法執行任務,如果執行完成,則直接傳回
        return (s = doExec()) < 0 ? s :
             //如果doExec的結果大于等于0,說明未完成,典型的如CountedCompleter的子類應用
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            //如果是ForkJoinWorkerThread,通過awaitJoin方法等待任務執行完成
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this, 0L) :
            //普通的Java線程,通過externalAwaitDone等待任務執行完成
            externalAwaitDone();
    }
           

 測試用例如下:

@Test
    public void test3() throws Exception {
        Thread thread=Thread.currentThread();
        ForkJoinTask task=new ForkJoinTask() {
            @Override
            public Object getRawResult() {
                return null;
            }

            @Override
            protected void setRawResult(Object value) {

            }

            @Override
            protected boolean exec() {
                System.out.println(Thread.currentThread().getName()+" start run");
                if(thread==Thread.currentThread()){
                    return false;
                }else{
                    try {
                        Thread.sleep(2000);
                        System.out.println(Thread.currentThread().getName()+" exit");
                        return true;
                    } catch (InterruptedException e) {
                        return false;
                    }
                }
            }
        };
        ForkJoinPool.commonPool().submit(task);
        //阻塞目前線程,等待任務執行完成
        task.invoke();
        System.out.println("main thread exit");
    }
           

上述用例的運作結果有兩種,如下圖:

Java8 ForkJoinTask 源碼解析
Java8 ForkJoinTask 源碼解析

 第二種會無期限阻塞,第一種是正常退出,為啥會有這種情形了?main線程執行invoke方法,invoke方法調用doExec方法傳回值等于0後就執行externalAwaitDone方法了,如果執行ForkJoinPool.common.tryExternalUnpush方法傳回true,則再次執行doExec方法,因為傳回值還是0,則通過wait方法等待了,因為沒有其他線程喚醒該線程,就會無期限等待;如果執行ForkJoinPool.common.tryExternalUnpush方法傳回false,說明某個Worker線程已經将該任務從任務隊列中移走了,Worker線程會負責執行該任務并修改任務執行狀态,如果Worker線程正在執行的過程中則wait等待Worker線程執行完成,Worker執行完成會喚醒等待的main線程,main線程判斷任務已完成就正常退出了。

5、complete / quietlyComplete/ completeExceptionally

     這三個方法都是任務執行完成時調用的,其中complete方法用于儲存任務執行的結果并修改狀态,quietlyComplete方法隻修改狀态,completeExceptionally用于任務執行異常時儲存異常資訊并修改狀态,其中隻有quietlyComplete方法有調用方,都是CountedCompleter及其子類,其調用鍊如下:

Java8 ForkJoinTask 源碼解析

這三個方法的實作如下: 

//儲存任務執行的結果并修改任務狀态
public void complete(V value) {
        try {
            //儲存結果
            setRawResult(value);
        } catch (Throwable rex) {
            //出現異常,儲存關聯的異常
            setExceptionalCompletion(rex);
            return;
        }
        //修改狀态,正常完成
        setCompletion(NORMAL);
    }

 
public final void quietlyComplete() {
        //修改狀态正常完成
        setCompletion(NORMAL);
    }

//任務異常結束時,儲存異常資訊并修改狀态
public void completeExceptionally(Throwable ex) {
        //記錄異常資訊并更新任務狀态
        setExceptionalCompletion((ex instanceof RuntimeException) ||
                                 (ex instanceof Error) ? ex :
                                 new RuntimeException(ex)); //如果不是RuntimeException或者Error,則将其用RuntimeException包裝一層
    }

 private int setCompletion(int completion) {
        for (int s;;) {
            if ((s = status) < 0) //如果已完成,直接傳回
                return s;
            if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
                //cas修改狀态成功
                if ((s >>> 16) != 0) //如果status中有SIGNAL辨別,即有線程在等待目前任務執行完成
                    synchronized (this) { notifyAll(); } //喚醒等待的線程
                return completion;
            }
        }
    }

 private int setExceptionalCompletion(Throwable ex) {
        //記錄異常資訊并更新任務狀态
        int s = recordExceptionalCompletion(ex);
        if ((s & DONE_MASK) == EXCEPTIONAL) //如果狀态是異常完成,則執行鈎子方法
            internalPropagateException(ex); //預設是空實作
        return s;
    }

  void internalPropagateException(Throwable ex) {
    }
           

6、recordExceptionalCompletion / clearExceptionalCompletion / getException 

     這幾個方法都是異常處理的,recordExceptionalCompletion用于記錄異常資訊并修改任務狀态,getException方法擷取目前任務關聯的異常資訊,如果任務是正常結束的則傳回null,如果是被取消則傳回CancellationException,如果異常結束則傳回執行任務過程中抛出的異常,clearExceptionalCompletion是reinitialize調用的,用于清理掉目前任務關聯的異常資訊。

//儲存異常資訊,并設定狀态異常結束
final int recordExceptionalCompletion(Throwable ex) {
        int s;
        if ((s = status) >= 0) {
            //擷取hash值
            int h = System.identityHashCode(this);
            final ReentrantLock lock = exceptionTableLock;
            lock.lock(); //加鎖
            try {
                //清理掉已經被GC回收掉的ExceptionNode
                expungeStaleExceptions();
                ExceptionNode[] t = exceptionTable;
                //計算該節點的索引
                int i = h & (t.length - 1);
                for (ExceptionNode e = t[i]; ; e = e.next) {
                    if (e == null) {
                        //如果t[i]為null或者周遊完了沒有找到比對的,則建立一個新節點,插入到t[i]連結清單的前面
                        t[i] = new ExceptionNode(this, ex, t[i]);
                        break;
                    }
                    //找到目标節點
                    if (e.get() == this) // already present
                        break;
                }
            } finally {
                lock.unlock();
            }
            //設定狀态,異常終止
            s = setCompletion(EXCEPTIONAL);
        }
        return s;
    }

public final Throwable getException() {
        int s = status & DONE_MASK; 
        return ((s >= NORMAL)    ? null : //如果是正常完成,則傳回null
                (s == CANCELLED) ? new CancellationException() : //任務被取消,則傳回CancellationException
                getThrowableException()); //任務異常結束,擷取之前異常結束時儲存的異常資訊
    }

//将exceptionTableRefQueue中已經被GC回收掉的節點從exceptionTable中移除
private static void expungeStaleExceptions() {
         //poll方法移除并傳回連結清單頭,連結清單中的節點是已經被回收掉了
        for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
            if (x instanceof ExceptionNode) {
                int hashCode = ((ExceptionNode)x).hashCode;
                ExceptionNode[] t = exceptionTable;
                //計算該節點的索引
                int i = hashCode & (t.length - 1);
                ExceptionNode e = t[i];
                ExceptionNode pred = null;
                while (e != null) {
                    ExceptionNode next = e.next;
                    if (e == x) { //找到目标節點
                        if (pred == null) //x就是連結清單第一個節點
                            t[i] = next;
                        else  //x是連結清單中某個節點
                            pred.next = next;
                        break;
                    }
                    //周遊下一個節點
                    pred = e;
                    e = next;
                }
            }
        }
    }

 private Throwable getThrowableException() {
        if ((status & DONE_MASK) != EXCEPTIONAL) //不是異常結束,傳回null
            return null;
        int h = System.identityHashCode(this);
        ExceptionNode e;
        //加鎖
        final ReentrantLock lock = exceptionTableLock;
        lock.lock();
        try {
            //清理掉已經被GC回收掉的ExceptionNode
            expungeStaleExceptions();
            ExceptionNode[] t = exceptionTable;
            //計算所屬的數組元素
            e = t[h & (t.length - 1)];
            //周遊連結清單,e.get()方法傳回該ExceptionNode關聯的Task
            while (e != null && e.get() != this)
                e = e.next;
        } finally {
            lock.unlock();
        }
        Throwable ex;
        //沒有找到目前Task 或者ex為null
        if (e == null || (ex = e.ex) == null) 
            return null;
        if (e.thrower != Thread.currentThread().getId()) {
            //如果儲存異常資訊的線程不是目前線程,建立一個同類型的異常執行個體包裝原來的異常資訊,進而提供準确的異常調用鍊
            Class<? extends Throwable> ec = ex.getClass();
            try {
                Constructor<?> noArgCtor = null;
                //擷取構造函數
                Constructor<?>[] cs = ec.getConstructors();// public ctors only
                for (int i = 0; i < cs.length; ++i) {
                    Constructor<?> c = cs[i];
                    //擷取構造函數的參數類型
                    Class<?>[] ps = c.getParameterTypes();
                    if (ps.length == 0)
                        noArgCtor = c; //預設的構造函數
                    else if (ps.length == 1 && ps[0] == Throwable.class) {
                        //如果隻有一個參數,且參數類型是Throwable,則建立一個新異常執行個體
                        Throwable wx = (Throwable)c.newInstance(ex);
                        return (wx == null) ? ex : wx;
                    }
                }
                if (noArgCtor != null) {
                    //有預設的無參構造函數,建立一個執行個體并設定ex
                    Throwable wx = (Throwable)(noArgCtor.newInstance());
                    if (wx != null) {
                        wx.initCause(ex);
                        return wx;
                    }
                }
            } catch (Exception ignore) {
            }
        }
        return ex;
    }

//清理掉目前Task關聯的ExceptionNode
private void clearExceptionalCompletion() {
        //擷取hash值
        int h = System.identityHashCode(this);
        //加鎖
        final ReentrantLock lock = exceptionTableLock;
        lock.lock();
        try {
            ExceptionNode[] t = exceptionTable;
            //計算所屬的數組元素
            int i = h & (t.length - 1);
            ExceptionNode e = t[i];
            ExceptionNode pred = null;
            //周遊連結清單
            while (e != null) {
                ExceptionNode next = e.next;
                if (e.get() == this) {
                    if (pred == null) //this是連結清單第一個節點
                        t[i] = next;
                    else
                        pred.next = next; //this是連結清單中的一個節點
                    break;
                }
                //周遊下一個節點
                pred = e;
                e = next;
            }
            //清理掉已經被GC回收掉的ExceptionNode
            expungeStaleExceptions();
            status = 0;
        } finally {
            lock.unlock();
        }
    }
           

7、invokeAll

     invokeAll方法有三個重載版本,都是等待多個任務執行完成,其中第一個任務都是有目前線程執行,其他任務是送出到線程池執行,多個任務時,如果有一個任務執行異常,則會取消掉剩餘未執行的任務。其實作如下:

public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        int s1, s2;
        t2.fork(); //将t2送出到任務隊列
        if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) //執行t1并等待其執行完成
            t1.reportException(s1); //不是正常結束則抛出異常
        if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) //等待t2執行完成
            t2.reportException(s2);
    }

//此時tasks相當于一個ForkJoinTask數組
public static void invokeAll(ForkJoinTask<?>... tasks) {
        Throwable ex = null;
        int last = tasks.length - 1;
        //從數組末尾處往前周遊
        for (int i = last; i >= 0; --i) {
            ForkJoinTask<?> t = tasks[i];
            if (t == null) {
                //某個ForkJoinTask為null
                if (ex == null)
                    ex = new NullPointerException();
            }
            else if (i != 0) //i不等于0的,将其送出到任務隊列
                t.fork();
            //i等于0,立即執行并等待其執行完成    
            else if (t.doInvoke() < NORMAL && ex == null)
                ex = t.getException();
        }
        //從1開始往前周遊
        for (int i = 1; i <= last; ++i) {
            ForkJoinTask<?> t = tasks[i];
            if (t != null) {
                if (ex != null) //ex不為空,則取消任務
                    t.cancel(false); 
                else if (t.doJoin() < NORMAL) //等待任務執行完成,如果不是正常結束的則擷取抛出的異常
                    ex = t.getException();
            }
        }
        if (ex != null)
            rethrow(ex); //重新抛出異常
    }

public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
        if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
            //如果沒有實作RandomAccess接口或者不是List類型
            invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
            return tasks;
        }
        //是List類型且實作了RandomAccess接口
        @SuppressWarnings("unchecked")
        List<? extends ForkJoinTask<?>> ts =
            (List<? extends ForkJoinTask<?>>) tasks;
        Throwable ex = null;
        //邏輯同上
        int last = ts.size() - 1;
        //從last處往前周遊
        for (int i = last; i >= 0; --i) {
            ForkJoinTask<?> t = ts.get(i);
            if (t == null) {
                if (ex == null)
                    ex = new NullPointerException();
            }
            else if (i != 0)
                t.fork();
            else if (t.doInvoke() < NORMAL && ex == null)
                ex = t.getException();
        }
        //從1開始往後周遊
        for (int i = 1; i <= last; ++i) {
            ForkJoinTask<?> t = ts.get(i);
            if (t != null) {
                if (ex != null)
                    t.cancel(false);
                else if (t.doJoin() < NORMAL)
                    ex = t.getException();
            }
        }
        if (ex != null)
            rethrow(ex);
        return tasks;
    }

  public boolean cancel(boolean mayInterruptIfRunning) {
        return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
    }
           

8、get

      get方法是阻塞目前線程并等待任務執行完成,其效果和實作跟join方法基本一緻,最大的差別在于如果線程等待的過程中被中斷了,get方法會抛出異常InterruptedException,而join方法不會抛出異常,其實作如下:

public final V get() throws InterruptedException, ExecutionException {
        //如果是ForkJoinWorkerThread執行doJoin 否則執行externalInterruptibleAwaitDone
        int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
            doJoin() : externalInterruptibleAwaitDone();
        Throwable ex;
        if ((s &= DONE_MASK) == CANCELLED) //任務被取消
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) //異常終止
            throw new ExecutionException(ex);
        return getRawResult(); //傳回執行的結果
    }

public final V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        int s;
        long nanos = unit.toNanos(timeout);
        if (Thread.interrupted()) //被中斷抛出異常
            throw new InterruptedException();
        if ((s = status) >= 0 && nanos > 0L) {
            //擷取等待的終止時間
            long d = System.nanoTime() + nanos;
            long deadline = (d == 0L) ? 1L : d; // avoid 0
            Thread t = Thread.currentThread();
            if (t instanceof ForkJoinWorkerThread) {
                //如果是ForkJoinWorkerThread,通過awaitJoin方法等待任務執行完成
                ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
                s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
            }
            //如果是普通Java線程
            else if ((s = ((this instanceof CountedCompleter) ?
                           //如果是CountedCompleter,則通過externalHelpComplete等待其執行完成
                           ForkJoinPool.common.externalHelpComplete(
                               (CountedCompleter<?>)this, 0) :
                           //如果是普通的ForkJoinTask,嘗試将其從任務隊列中pop出來并執行    
                           ForkJoinPool.common.tryExternalUnpush(this) ?
                           doExec() : 0)) >= 0) {
                //如果tryExternalUnpush傳回false或者doExec方法傳回值大于等于0,即任務未執行完成           
                long ns, ms; // measure in nanosecs, but wait in millisecs
                while ((s = status) >= 0 &&  //任務已執行
                       (ns = deadline - System.nanoTime()) > 0L) {  //等待逾時
                    if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
                        U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                        //cas修改狀态加上SIGNAL
                        synchronized (this) {
                            if (status >= 0)
                                //阻塞目前線程指定時間,如果被中斷則抛出異常
                                wait(ms); // OK to throw InterruptedException
                            else
                                notifyAll();
                        }
                    }
                }
            }
        }
        if (s >= 0) 
            s = status; //再次讀取狀态
        if ((s &= DONE_MASK) != NORMAL) { //不是正常執行
            Throwable ex;
            if (s == CANCELLED) //被取消
                throw new CancellationException(); 
            if (s != EXCEPTIONAL) //不是異常終止,則是等待逾時
                throw new TimeoutException();
            if ((ex = getThrowableException()) != null) //異常終止
                throw new ExecutionException(ex);
        }
        return getRawResult();
    }

//邏輯同externalAwaitDone,差別在于如果被中斷抛出異常
//externalAwaitDone不會抛出異常,如果被中斷了會将目前線程标記為已中斷
private int externalInterruptibleAwaitDone() throws InterruptedException {
        int s;
        if (Thread.interrupted())
            throw new InterruptedException(); //被中斷則抛出異常
        if ((s = status) >= 0 &&
            (s = ((this instanceof CountedCompleter) ?
                  ForkJoinPool.common.externalHelpComplete(
                      (CountedCompleter<?>)this, 0) :
                  ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
                  0)) >= 0) {
            while ((s = status) >= 0) {
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0)
                            wait(0L);
                        else
                            notifyAll();
                    }
                }
            }
        }
        return s;
    }

           

9、tryUnfork / reinitialize

//嘗試将目前任務從任務隊列中pop出來,然後可以在目前線程執行
public boolean tryUnfork() {
        Thread t;
        return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : //如果Worker線程,嘗試将目前任務從其關聯的WorkQueue中pop出來
                ForkJoinPool.common.tryExternalUnpush(this)); //非Worker線程,嘗試将目前任務從probe屬性關聯的WorkQueue中pop出來
    }

//将任務恢複至初始狀态,然後可正常執行
public void reinitialize() {
        if ((status & DONE_MASK) == EXCEPTIONAL)
            clearExceptionalCompletion(); //如果是異常結束,則清除關聯的異常資訊
        else
            //狀态恢複成0
            status = 0;
    }