天天看點

AQS同步元件-FutureTask解析和用例

FutureTask原理

AQS同步元件-FutureTask解析和用例

FutureTask間接實作了runnable接口和future接口,說明了futureTask是runnable與callnable的集合體,即是有傳回值的runnable方法。是以,FutureTask可以交給Executor執行,也可以由調用線程直接執行(FutureTask.run())。

源碼分析

構造函數

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        //當構造方法傳入參數為Runnable,會通過Executors.callable方法将其轉換成Callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
           
FutureTask建立提供兩個構造方法來封裝Callable和Runnable,當構造方法傳入參數為Runnable,會通過Executors.callable方法将其轉換成Callable。
           

常用方法

/**
* 可能的狀态轉換::
* 建立 -> 已完成 -> 正常
* 建立 -> 已完成 -> 異常
* 建立 -> 已取消
* 建立 -> 中斷ing -> 已中斷
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

boolean cancel(boolean mayInterruptIfRunning);//取消任務
boolean isCancelled();//是否被取消
boolean isDone();//計算是否完成
//get方法,擷取執行結果,如果目前線程還沒有執行完成, get方法會被阻塞。
public V get()
//可以設定逾時時間并擷取執行結果,如果目前線程還沒有執行完成, get方法會被阻塞。
public V get(long timeout, TimeUnit unit)
    
/**
  * awaitDone方法其實是個死循環,直到task狀态變為已完成狀态或者等待時間超過
  *逾時時間或者線程中斷才會跳出循環,程式結束;
  *為了節省開銷,線程不會一直自旋等待,而是會阻塞,使用LockSupport的park系列方法實作線程阻塞
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //如果線程中斷,将目前線程從等待隊列waiters中移除,抛出中斷異常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        //如果線程已完成,設為null
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //如果正在執行,讓出cpu
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
       	//如果節點為空,則初始化節點
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            //CAS
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //逾時将節點移除隊列。否則阻塞到逾時。
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            //阻塞自己
            LockSupport.park(this);
    }
}
           

根據FutureTask的run方法執行的時機,FutureTask可以處于以下三種執行狀态:

  1. 未啟動:在FutureTask.run()還沒執行之前,FutureTask處于未啟動狀态。當建立一個FutureTask對象,并且run()方法未執行之前,FutureTask處于未啟動狀态。
  2. 已啟動:FutureTask對象的run方法啟動并執行的過程中,FutureTask處于已啟動狀态。
  3. 已完成:FutureTask正常執行結束,或者FutureTask執行被取消(FutureTask對象cancel方法),或者FutureTask對象run方法執行抛出異常而導緻中斷而結束,FutureTask都處于已完成狀态。
AQS同步元件-FutureTask解析和用例
  • 當FutureTask處于未啟動或已啟動狀态時,執行FutureTask.get()方法将導緻調用線程阻塞
  • 當FutureTask處于已完成狀态時,執行FutureTask.get()方法将導緻調用線程立即傳回結果或抛出異常
  • 當FutureTask處于未啟動狀态時,執行FutureTask.cancel()方法将導緻此任務永遠不會被執行
  • 當FutureTask處于已啟動狀态時,執行FutureTask.cancel(true)方法将以中斷執行此任務線程的方式來試圖停止任務
  • 當FutureTask處于已啟動狀态時,執行FutureTask.cancel(false)方法将不會對正在執行此任務的線程産生影響(讓正在執行的任務運作完成)
  • 當FutureTask處于已完成狀态時,執行FutureTask.cancel(…)方法将傳回false。

使用案例

FutureTask、Runnable、Callable

public static void main(String[] args) throws Exception {
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("callable耗時任務開始");
                //耗時任務
                Thread.sleep(5000);
                log.info("callable耗時任務完成");
                return "耗時任務:報告!我已完成";
            }
        });

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(futureTask);
   		//executor.execute(futureTask);
        log.info("主線程任務開始");
        Thread.sleep(1000);
        log.info("主線程任務完成");
        log.info("等待耗時任務完成。。。");
        //擷取耗時任務的傳回結果,如果未傳回,主線程将阻塞,處于等待狀态
        String result = futureTask.get();
        log.info("result:{}", result);
    }
           

輸出結果如下:

12:22:06.250 [main] INFO com.zjq.aqs.FutureTaskExample - 主線程任務開始
12:22:06.250 [Thread-0] INFO com.zjq.aqs.FutureTaskExample - callable耗時任務開始
12:22:07.254 [main] INFO com.zjq.aqs.FutureTaskExample - 主線程任務完成
12:22:07.254 [main] INFO com.zjq.aqs.FutureTaskExample - 等待耗時任務完成。。。
12:22:11.254 [Thread-0] INFO com.zjq.aqs.FutureTaskExample - callable耗時任務完成
12:22:11.254 [main] INFO com.zjq.aqs.FutureTaskExample - result:耗時任務:報告!我已完成
           
  • 可以把FutureTask交給Executor執行
  • 也可以通過ExecutorService.submit(…)方法傳回一個FutureTask,然後執行FutureTask.get()方法或FutureTask.cancel(…)方法
  • 除此以外,還可以單獨使用FutureTask

Future、Callable

public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();
        Future<String> future = executor.submit(new MyCallable());
        log.info("主線程任務開始");
        Thread.sleep(1000);
        log.info("主線程任務完成");
        log.info("等待耗時任務完成。。。");
        //擷取耗時任務的傳回結果,如果未傳回,主線程将阻塞,處于等待狀态
        String result = future.get();
        log.info("result:{}", result);
    }

    static class MyCallable implements Callable<String> {

        @Override
        public String call() throws Exception {
            log.info("callable耗時任務開始");
            //耗時任務
            Thread.sleep(5000);
            log.info("callable耗時任務完成");
            return "耗時任務:報告!我已完成";
        }
    }
           

當一個線程需要等待另一個線程把某個任務執行完後它才能繼續執行,此時可以使用FutureTask.

繼續閱讀