天天看點

Jdk1.6 JUC源碼解析(16)-FutureTask

Jdk1.6 JUC源碼解析(16)-FutureTask

作者:大飛

功能簡介:

  • FutureTask是一種異步任務(或異步計算),舉個栗子,主線程的邏輯中需要使用某個值,但這個值需要複雜的運算得來,那麼主線程可以提前建立一個異步任務來計算這個值(在其他的線程中計算),然後去做其他事情,當需要這個值的時候再通過剛才建立的異步任務來擷取這個值,有點并行的意思,這樣可以縮短整個主線程邏輯的執行時間。
  • FutureTask也是基于AQS來建構的,使用共享模式,使用AQS的狀态來表示異步任務的運作狀态。

源碼分析:

  • 先來看下FutureTask都實作了哪些接口。首先實作了RunnableFuture接口,先看下這個接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
           

       RunnableFuture又擴充了Runnable和Future:

public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used 
     * to create a thread, starting the thread causes the object's 
     * <code>run</code> method to be called in that separately executing 
     * thread. 
     * <p>
     * The general contract of the method <code>run</code> is that it may 
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}
           
public interface Future<V> {
    /**
     * 嘗試取消任務的執行,如果任務已經完成或者已經被取消或者由于某種原因
     * 無法取消,方法傳回false。如果任務取消成功,或者任務開始執行之前調用
     * 了取消方法,那麼任務就永遠不會執行了。mayInterruptIfRunning參數決定 
     * 了是否要中斷執行任務的線程。
     */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
     * 判斷任務是否在完成之前被取消。
     */
    boolean isCancelled();
    /**
     * 判斷任務是否完成。
     */
    boolean isDone();
    /**
     * 等待,直到擷取任務的執行結果。如果任務還沒執行完,這個方法會阻塞。
     */
    V get() throws InterruptedException, ExecutionException;
    /**
     * 等待,在給定的時間内擷取任務的執行結果。
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
           

       Runnable接口經常寫多線程程式的話一定非常熟悉了,這裡不說了。看下Future接口,它提供了取消任務接口,并提供了檢視任務狀态的接口,最重要的是提供了有阻塞行為的擷取任務執行結果的接口。  

  • 接下來看下FutureTask的實作,由于其基于AQS實作,那先看一下内部的同步機制:
private final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7828117401763700385L;
        /** 表示任務正在執行 */
        private static final int RUNNING   = 1;
        /** 表示任務已經運作完畢 */
        private static final int RAN       = 2;
        /** 表示任務被取消 */
        private static final int CANCELLED = 4;
        /** 内部的callable */
        private final Callable<V> callable;
        /** 執行結果 */
        private V result;
        /** 執行過程中發生的異常 */
        private Throwable exception;
        /**
         * 執行目前任務的線程。在set/cancel之後置空,說明可以
         * 了。必須使用volatile來修飾,以確定任務完成後的可見性。 
         */
        private volatile Thread runner;
        Sync(Callable<V> callable) {
            this.callable = callable;
        }
           

       内部同步器接中使用了一個callable來儲存要執行的任務,看下這個接口: 

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
           

       這個接口的行為和Runnable類似,不同的是有傳回值,且能抛出異常,算是對Runnable的補充。  

       繼續看下同步器的innerRun方法,這個方法用于支援FutureTask的run方法: 

void innerRun() {
            //嘗試設定任務運作狀态為正在執行。
            if (!compareAndSetState(0, RUNNING))
                return; //如果設定失敗,直接傳回。
            try {
                runner = Thread.currentThread(); //設定執行線程。
                if (getState() == RUNNING) //再次檢測任務狀态
                    innerSet(callable.call()); //執行任務,然後設定執行結果。
                else
                    releaseShared(0); //說明任務已取消。
            } catch (Throwable ex) {
                innerSetException(ex); //如果執行任務過程中發生異常,設定異常。
            }
        }
           

       看下設定執行結果的innerSet方法: 

void innerSet(V v) {
	       for (;;) {
		      int s = getState(); //擷取任務執行狀态。
		      if (s == RAN)
		          return; //如果任務已經執行完畢,退出。
              if (s == CANCELLED) {
		          //這裡釋放AQS控制權并設定runner為null, 
		          //為了避免正在和一個試圖中斷線程的取消請求競
                  releaseShared(0);
                  return;
              }
              //嘗試将任務狀态設定為執行完成。
		      if (compareAndSetState(s, RAN)) {
                  result = v; //設定執行結果。
                  releaseShared(0); //釋放AQS控制權。
                  done(); //這裡調用一下done方法,子類可覆寫這個方法,做一些定制處理。
		          return;
              }
            }
        }
           

       AQS分析過,releaseShared方法中會調用tryReleaseShared方法,看一下目前同步器中這個方法的實作: 

protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;
        }
           

       innerRun方法中在執行抛異常後會調用innerSetException:

void innerSetException(Throwable t) {
	        for (;;) {
		        int s = getState();
		        if (s == RAN)
		            return;
                if (s == CANCELLED) {
		            // aggressively release to set runner to null,
		            // in case we are racing with a cancel request
		            // that will try to interrupt runner
                    releaseShared(0);
                    return;
                 }
		         if (compareAndSetState(s, RAN)) {
                    exception = t;
                    result = null;
                    releaseShared(0);
                    done();
		            return;
                 }
	         }
        }
           

       過程和innerSet類似,隻不過最後要設定異常,清空result。

       和innerRun類似還有innerRunAndReset方法,看下實作: 

boolean innerRunAndReset() {
            if (!compareAndSetState(0, RUNNING))
                return false;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING)
                    callable.call(); // don't set result
                runner = null;
                return compareAndSetState(RUNNING, 0);
            } catch (Throwable ex) {
                innerSetException(ex);
                return false;
            }
        }
           

       和innerRun的差別是不設定執行結果,最後執行完畢後重置異步任務狀态為0。  

       再看下同步器的innerGet方法,這個方法用于支援FutureTask的get方法: 

V innerGet() throws InterruptedException, ExecutionException {
            //擷取共享鎖,無法擷取時阻塞等待。
            acquireSharedInterruptibly(0);
            if (getState() == CANCELLED)
                throw new CancellationException(); //如果任務狀态為取消,那麼抛出CancellationException
            if (exception != null)
                throw new ExecutionException(exception);//如果任務執行異常,抛出ExecutionException,并傳遞異常。
            return result; //成功執行完成,傳回執行結果。
        }
           

       類似的有帶逾時的innerGet:

V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
           

       AQS分析過,acquireSharedInterruptibly和tryAcquireSharedNanos方法中會調用tryAcquireShared方法,看一下目前同步器中這個方法的實作:

protected int tryAcquireShared(int ignore) {
            return innerIsDone()? 1 : -1;
        }
        boolean innerIsDone() {
            return ranOrCancelled(getState()) && runner == null;
        }
        private boolean ranOrCancelled(int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }
           

       可見innerGet中會首先判斷任務是否完成,要依據任務(完成或取消)狀态來判斷。  

       最後看下同步器的innerCancel方法,這個方法用于支援FutureTask的cancel方法:

boolean innerCancel(boolean mayInterruptIfRunning) {
	        for (;;) {
		        int s = getState();
		        if (ranOrCancelled(s))
		            return false; //如果任務已經執行完畢或者取消。
		        if (compareAndSetState(s, CANCELLED))//否則嘗試設定任務狀态為取消。
		            break;
	        }
            if (mayInterruptIfRunning) {
                Thread r = runner;
                if (r != null)
                    r.interrupt(); //如果設定了mayInterruptIfRunning為true,需要中斷線程,
            }
            releaseShared(0); //釋放AQS的控制權。
            done(); //這裡也會調用done,定制子類時需要注意下。
            return true;
        }
           
  • 有了内部同步機制,FutureTask的實作起來就很容易了,看下代碼:
public class FutureTask<V> implements RunnableFuture<V> {
    /** 内部同步器 */
    private final Sync sync;

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }

    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }
    public boolean isCancelled() {
        return sync.innerIsCancelled();
    }
    public boolean isDone() {
        return sync.innerIsDone();
    }
    public boolean cancel(boolean mayInterruptIfRunning) {
        return sync.innerCancel(mayInterruptIfRunning);
    }

    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return sync.innerGet(unit.toNanos(timeout));
    }
    /**
     * 本類中是一個空實作,子類可以覆寫這個方法,做回調或一些記錄工作。
     * 可以來實作裡面通過任務狀态來判斷任務是否被取消。
     */
    protected void done() { }

    protected void set(V v) {
        sync.innerSet(v);
    }

    protected void setException(Throwable t) {
        sync.innerSetException(t);
    }

    public void run() {
        sync.innerRun();
    }

    protected boolean runAndReset() {
        return sync.innerRunAndReset();
    }
    ...
           

       實作都是基于上面分析過的方法,這裡不啰嗦了。注意構造方法中有一個Runnable到callable的轉換,使用了Executors中的方法,這個類後續會分析,這裡簡單看一下:

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable  task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
           

       就是簡單的通過一個适配類在适配Runnable和Callable。           小總結一下:               1.目前線程建立異步任務後,異步任務處于初始狀态(内部有一個數值表示狀态,初始為0),一般交由其他線程執行任務(比如送出給線程池處理)。目前線程通過異步任務的get方法來擷取執行結果時,如果異步任務此時還沒執行完畢(内部狀态既不是完成,也不是取消),那麼目前線程會在get方法處阻塞。               2.當其他線程(比如線程池中的工作線程)執行了異步任務,那麼會将異步任務的狀态改成"完成"(根據情況也可能是取消),同時将在get除等待的線程喚醒。  

       FutureTask的代碼解析完畢!

       參見:Jdk1.6 JUC源碼解析(6)-locks-AbstractQueuedSynchronizer