天天看點

Guava并發程式設計知多少前言入門案例(異步阻塞)異步非阻塞進階用法

原文連結 路過的兄弟覺得寫的還行,幫忙點個贊

前言

本篇文章我想給大家介紹guava并發包下的Future元件。說到Future,大家應該會想到JDK的Future元件,大家對這個元件一定不會陌生。如果你對這個壓根沒啥印象,建議你還是先補下課,再來看這篇文章。前置知識:

  • Future
  • FutureTask
  • Callable
  • Runnable

    建議大家這些基本的類庫還是要會用。

    嚴歸正傳,為什麼要介紹guava并發包的Future呢,原因有二:

  • JDK的Future元件是異步阻塞的,在擷取異步任務的結果的時候,會阻塞主線程。
  • guava Future提供了更加進階的複雜操作,比如鍊式調用。

異步阻塞這就很坑了,假如主線程裡面調用了一個異步任務,該任務的執行是由另外一個子線程去做,子線程做完會把結果傳回給主線程,子線程在傳回前,主線程就一直阻塞在這裡,等着子線程傳回結果,然後才能執行下一步的邏輯。這™跟同步阻塞有個屁的差別,我這暴脾氣,要是我,我是主線程直接自己去做了,還要你幹嘛?交由子線程去做反而會更浪費時間(線程上下文切換)。而guava的Future正好解決了這個問題。

其次,是上面說到的第二點,提供更加進階的複雜應用,業務往往沒有這麼簡單。舉個例子:主線程收到一些參數,主線程校驗完這些參數的合法性,就想把複雜計算過程交給子線程1去做,子線程1計算出結果需要把該結果持久化,是以子線程1把持久化的過程交個子線程2去做,至于怎麼持久化,子線程1不管,寫到檔案裡也好,寫到資料庫也好,子線程1不想管,也不該管。這樣滿足"最小原則,專業的人做專業的事"。

入門案例(異步阻塞)

先看一個最簡單的例子,主線程配置設定一個任務給子線程後,然後繼續運作,子線程運算出結果後把結果傳回給主線程(為了簡化代碼,這裡的“結果”在執行個體代碼裡是目前時間),在這段代碼裡主線程依舊是阻塞的。

代碼說明: ListenableFutureTask繼承了JDK的FutureTask,是以ListenableFutureTask隻是FutureTask的擴充。guava為了相容jdk的api,提供了

  • ListenableFutureTask.create(Callable)
  • ListenableFutureTask.create(Runnable, V)

    建立futureTask 的方法。

@Slf4j
public class Test2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final Date date = new Date(1976);
        // 這裡可以用lambda表達式,但是貼代碼的時候會很不直覺,不知道是Runnable還是Callable
        ListenableFutureTask<Object> futureTask = ListenableFutureTask.create(new Runnable() {
            @Override
            public void run() {
                log.info(Thread.currentThread().getName() + " Runnable任務啟動....");
                date.setTime(new Date().getTime());
            }
        }, date);

        new Thread(futureTask).start();

        // 睡眠一會 等待子線程執行完
        Thread.sleep(1000L);
        log.info(Thread.currentThread().getName() + "目前時間" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date));

        ListenableFutureTask<Object> futureTask2 = ListenableFutureTask.create(new Callable<Object>() {

            @Override
            public Object call() throws Exception {
                log.info(Thread.currentThread().getName() + " Callable任務啟動....");
                Thread.sleep(5000L);
                return "目前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            }
        });

        new Thread(futureTask2).start();
        log.info(Thread.currentThread().getName() + futureTask2.get());
        log.info(Thread.currentThread().getName() + "主線程繼續執行");

    }

}
           

運作結果:

注意觀察運作結果的線程名和日志輸出時間。需要注意的是由于沒有執行成功的異步回調,實際上我們的主線程依舊是阻塞的,必須等子線程運作完,才能拿到結果。

16:25:46.665 [Thread-1] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test2 - Thread-1 Runnable任務啟動....
16:25:47.663 [main] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test2 - main目前時間2020-05-30 16:25:46
16:25:47.699 [Thread-2] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test2 - Thread-2 Callable任務啟動....
16:25:52.792 [main] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test2 - main目前時間:2020-05-30 16:25:52
16:25:52.792 [main] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test2 - main主線程繼續執行
           

異步非阻塞

上一節介紹了異步阻塞的代碼,這節我們介紹異步非阻塞的代碼。

MoreExecutors.listeningDecorator是為了将JDK的ExecutorService 轉換為ListeningExecutorService,ListeningExecutorService總是會傳回Future。

與上一節代碼相比增加 Futures.addCallback方法,該方法會根據子線程運算後的狀态,成功或者失敗回調不同的邏輯。

@Slf4j
public class Test3 {
    public static void main(String[] args) {

        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

        ListenableFutureTask<Object> futureTask = ListenableFutureTask.create(new Callable<Object>() {

            @Override
            public Object call() throws Exception {
                log.info(Thread.currentThread().getName() + " Callable任務啟動....");
                Thread.sleep(5000L);
                return "目前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            }
        });

        Futures.addCallback(futureTask, new FutureCallback<Object>() {

            public void onSuccess(Object calCultorResult) {
                log.info(Thread.currentThread().getName() + "子線程執行成功,計算結果{}", calCultorResult);
            }

            public void onFailure(Throwable thrown) {

            }
        }, service);

        new Thread(futureTask).start();
        log.info(Thread.currentThread().getName() + "主線程繼續執行");

    }

}
           

分析列印的日志,可以看到主線程在送出任務過後就緊着執行,沒有被阻塞而停下來。還可以發現,計算的執行過程是由pool-1-thread-1執行的,回調邏輯是由線程池裡面的pool-1-thread-2處理的。理想中的“異步非阻塞”是不存在的,歸根接地還™的是線程池。我所說的理想中的異步非阻塞值得是什麼含義呢?把人類比成CPU,假如一個人先要去燒水,在燒水的過程中想把地掃一下,在水開的時候警報響起,“水壺通知”他去關火,這樣的事情在人類世界很簡單,但是在CPU卻是不可能實作的,換做是CPU,那麼理想中的異步非阻塞就該是這樣的?

// main線程 ,灌滿水壺,放在爐子上
// 子線程2 ,燒水的任務交給子線程2,它好比爐子
//  go to something... 
// 子線程2通知main線程,水燒開了,需要關火
// main線程得到通知,去關火
           

然而這樣的邏輯,在計算機世界裡完全行不通,main線程隻能不斷輪詢,守在那裡看火有麼有關掉。你也許會說網絡IO模型的異步非阻塞IO模型又是怎麼回事,那個不是非阻塞且異步的嗎?沒錯,異步非阻塞模型是針對前幾種模型來說的,這個模型和我們說的模型是不一樣的,該模型是核心準備好資料後,通過系統函數通知使用者程序去拿資料,而并不是使用者程序先去問“核心有資料了嗎”,核心答:“沒有”,然後使用者程序接着去幹别的事,跟核心說“我去幹别的了,好了通知我”,核心準備好後說“嗨 兄弟有了 放下你手中的事,有資料了”。請注意,并不是這樣的,異步非阻塞IO模型是由核心發起觸發的,而不是使用者程序觸發的,跟main線程燒水那個不一樣,而且這裡是程序之間的通信,使用者程序肯定不止一個線程啦。是以這裡你也就不難了解,回調過程為什麼是由線程池線程2處理的,這樣做也就解決了JDK裡面Future會阻塞主線程的問題。

17:10:20.027 [pool-1-thread-1] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test3 - pool-1-thread-1 Callable任務啟動....
17:10:20.050 [main] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test3 - main主線程繼續執行
17:10:21.116 [pool-1-thread-2] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test3 - pool-1-thread-2子線程執行成功,計算結果目前時間:2020-05-30 17:10:21

           

進階用法

如果我們有有一個大任務比較耗時,拆分成子任務1和子任務2,子任務2的執行又依賴于子任務1的計算結果,這種過程下我們該如何處理呢?看一段代碼咯

@Slf4j
public class FutureTest {
    public static void main(String[] args) {

        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

        ListenableFuture<Integer> task1Future = service.submit(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                log.info("任務1開始執行...");
                int washTime = new Random().nextInt(10) + 1;
                Thread.sleep(washTime);
                if (washTime > 7) {
                    throw new RuntimeException("任務1開始因執行時間過長而失敗");
                }
                return washTime;
            }
        });

        AsyncFunction<Integer, Boolean> asyncFunction = new AsyncFunction<Integer, Boolean>() {
            public ListenableFuture<Boolean> apply(Integer rowKey) {
                log.info("任務1執行成功,計算結果{}", rowKey);

                ListenableFuture<Boolean> hot = service.submit(new Callable<Boolean>() {

                    @Override
                    public Boolean call() throws Exception {
                        log.info("任務2開始執行,傳回固定結果true");
                        return true;
                    }
                });
                return hot;
            }
        };

        ListenableFuture<Boolean> queryFuture = Futures.transformAsync(task1Future, asyncFunction, service);

        Futures.addCallback(queryFuture, new FutureCallback<Boolean>() {
            public void onSuccess(Boolean explosion) {
                log.info("任務1,任務2均執行成功");
            }

            public void onFailure(Throwable thrown) {
                log.error("", thrown);
            }
        }, service);

    }

}
           
17:17:36.207 [pool-1-thread-1] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.FutureTest - 任務1開始執行...
17:17:36.243 [pool-1-thread-2] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.FutureTest - 任務1執行成功,計算結果1
17:17:36.246 [pool-1-thread-3] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.FutureTest - 任務2開始執行,傳回固定結果true
17:17:36.247 [pool-1-thread-4] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.FutureTest - 任務1,任務2均執行成功