天天看點

java8中多線程異步調用方法CompletableFuture的特性及方法

在java8以前,我們使用java的多線程程式設計,一般是通過Runnable中的run方法來完成,這種方式,有個很明顯的缺點,就是,沒有傳回值,這時候,大家可能會去嘗試使用Callable中的call方法,然後用Future傳回結果,如下:

使用CompletableFuture必須自己定義一個線程池,如果不定義就會用預設的線程池,資料量太大會導緻調用失敗的,使用者需要自己定義線程池

public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> stringFuture = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "async thread";
            }
        });
        Thread.sleep(1000);
        System.out.println("main thread");
        System.out.println(stringFuture.get());

    }
           

建立任務:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}

public static CompletableFuture<Void> runAsync(Runnable runnable){..}

public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){..} 
           
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    //....執行任務
    return "hello";}, executor)
           
V get();
V get(long timeout,Timeout unit);
T getNow(T defaultValue);
T join();
           

thenAccept()目前任務正常完成以後執行,目前任務的執行結果可以作為下一任務的輸入參數,無傳回值.

場景:執行任務A,同時異步執行任務B,待任務B正常傳回之後,用B的傳回值執行任務C,任務C無傳回值。

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任務A");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "任務B");
CompletableFuture<String> futureC = futureB.thenApply(b -> {
      System.out.println("執行任務C.");
      System.out.println("參數:" + b);//參數:任務B
      return "a";
});
           

thenRun(..)功能:對不關心上一步的計算結果,執行下一個操作

場景:執行任務A,任務A執行完以後,執行任務B,任務B不接受任務A的傳回值(不管A有沒有傳回值),也無傳回值

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任務A");
futureA.thenRun(() -> System.out.println("執行任務B"));
           

thenApply(..)功能:目前任務正常完成以後執行,目前任務的執行的結果會作為下一任務的輸入參數,有傳回值

場景:多個任務串聯執行,下一個任務的執行依賴上一個任務的結果,每個任務都有輸入和輸出

執行個體1:異步執行任務A,當任務A完成時使用A的傳回結果resultA作為入參進行任務B的處理,可實作任意多個任務的串聯執行

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");

CompletableFuture<String> futureB = futureA.thenApply(s->s + " world");

CompletableFuture<String> future3 = futureB.thenApply(String::toUpperCase);

System.out.println(future3.join());
           

上面的代碼,我們當然可以先調用future.join()先得到任務A的傳回值,然後再拿傳回值做入參去執行任務B,而thenApply的存在就在于幫我簡化了這一步,我們不必因為等待一個計算完成而一直阻塞着調用線程,而是告訴CompletableFuture你啥時候執行完就啥時候進行下一步. 就把多個任務串聯起來了.。

thenCombine(..)  thenAcceptBoth(..)  runAfterBoth(..)功能:結合兩個CompletionStage的結果,進行轉化後傳回

場景:需要根據商品id查詢商品的目前價格,分兩步,查詢商品的原始價格和折扣,這兩個查詢互相獨立,當都查出來的時候用原始價格乘折扣,算出目前價格. 使用方法:thenCombine(..)

CompletableFuture<Double> futurePrice = CompletableFuture.supplyAsync(() -> 100d);
 CompletableFuture<Double> futureDiscount = CompletableFuture.supplyAsync(() -> 0.8);
 CompletableFuture<Double> futureResult = futurePrice.thenCombine(futureDiscount, (price, discount) -> price * discount);
 System.out.println("最終價格為:" + futureResult.join()); //最終價格為:80.0
           

thenCombine(..)是結合兩個任務的傳回值進行轉化後再傳回,那如果不需要傳回呢,那就需要thenAcceptBoth(..),同理,如果連兩個任務的傳回值也不關心呢,那就需要runAfterBoth了,如果了解了上面三個方法,thenApply,thenAccept,thenRun,這裡就不需要單獨再提這兩個方法了,隻在這裡提一下.。

thenCompose(..)功能:這個方法接收的輸入是目前的CompletableFuture的計算值,傳回結果将是一個新的CompletableFuture

thenApply():它的功能相當于将CompletableFuture<T>轉換成CompletableFuture<U>,改變的是同一個CompletableFuture中的泛型類型。thenCompose():用來連接配接兩個CompletableFuture,傳回值是一個新的CompletableFuture

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");

CompletableFuture<String> futureB = futureA.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world"));

CompletableFuture<String> future3 = futureB.thenCompose(s -> CompletableFuture.supplyAsync(s::toUpperCase));

System.out.println(future3.join());
           

applyToEither(..)  acceptEither(..)  runAfterEither(..)功能:執行兩個CompletionStage的結果,那個先執行完了,就是用哪個的傳回值進行下一步操作

場景:假設查詢商品a,有兩種方式,A和B,但是A和B的執行速度不一樣,我們希望哪個先傳回就用那個的傳回值.

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "通過方式A擷取商品a";
        });
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "通過方式B擷取商品a";
        });
CompletableFuture<String> futureC = futureA.applyToEither(futureB, product -> "結果:" + product);
System.out.println(futureC.join()); //結果:通過方式A擷取商品a
           

同樣的道理,applyToEither的兄弟方法還有acceptEither(),runAfterEither()

exceptionally(..)功能:當運作出現異常時,調用該方法可進行一些補償操作,如設定預設值.相當于catch功能,在發生異常情況時執行的邏輯。

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "執行結果:" + (100 / 0))
                .thenApply(s -> "futureA result:" + s)
                .exceptionally(e -> {
                    System.out.println(e.getMessage()); //java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                });
CompletableFuture<String> futureB = CompletableFuture.
                supplyAsync(() -> "執行結果:" + 50)
                .thenApply(s -> "futureB result:" + s)
                .exceptionally(e -> "futureB result: 100");
System.out.println(futureA.join());//futureA result: 100
System.out.println(futureB.join());//futureB result:執行結果:50
           

whenComplete(..)功能:當CompletableFuture的計算結果完成,或者抛出異常的時候,都可以進入whenComplete方法執行,舉個栗子

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "執行結果:" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .whenComplete((s, e) -> {
                    if (s != null) {
                        System.out.println(s);//未執行
                    }
                    if (e == null) {
                        System.out.println(s);//未執行
                    } else {
                        System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
                    }
                })
                .exceptionally(e -> {
                    System.out.println("ex"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero
           
             return "futureA result: 100"; }); 
System.out.println(futureA.join());//futureA result: 100
           

根據控制台,我們可以看出執行流程是這樣,supplyAsync->whenComplete->exceptionally,可以看出并沒有進入thenApply執行,原因也顯而易見,在supplyAsync中出現了異常,thenApply隻有當正常傳回時才會去執行.而whenComplete不管是否正常執行,還要注意一點,whenComplete是沒有傳回值的.

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "執行結果:" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .exceptionally(e -> {
                    System.out.println("ex:"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                })
                .whenComplete((s, e) -> {
                    if (e == null) {
                        System.out.println(s);//futureA result: 100
                    } else {
                        System.out.println(e.getMessage());//未執行
                    }
                })
                ;
System.out.println(futureA.join());//futureA result: 100
           

代碼先執行了exceptionally後執行whenComplete,可以發現,由于在exceptionally中對異常進行了處理,并傳回了預設值,whenComplete中接收到的結果是一個正常的結果,被exceptionally美化過的結果。

handle(..)功能:當CompletableFuture的計算結果完成,或者抛出異常的時候,可以通過handle方法對結果進行處理

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "執行結果:" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .exceptionally(e -> {
                    System.out.println("ex:" + e.getMessage()); //java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                })
                .handle((s, e) -> {
                    if (e == null) {
                        System.out.println(s);//futureA result: 100
                    } else {
                        System.out.println(e.getMessage());//未執行
                    }
                    return "handle result:" + (s == null ? "500" : s);
                });
System.out.println(futureA.join());//handle result:futureA result: 100
           

通過控制台,我們可以看出,最後列印的是handle result:futureA result: 100,執行exceptionally後對異常進行了"美化",傳回了預設值,那麼handle得到的就是一個正常的傳回,我們再試下,先調用handle再調用exceptionally的情況.

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "執行結果:" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .handle((s, e) -> {
                    if (e == null) {
                        System.out.println(s);//未執行
                    } else {
                        System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
                    }
                    return "handle result:" + (s == null ? "500" : s);
                })
                .exceptionally(e -> {
                    System.out.println("ex:" + e.getMessage()); //未執行
                    return "futureA result: 100";
                });
System.out.println(futureA.join());//handle result:500
           

根據控制台輸出,可以看到先執行handle,列印了異常資訊,并對接過設定了預設值500,exceptionally并沒有執行,因為它得到的是handle傳回給它的值,由此我們大概推測handle和whenComplete的差別

   1.都是對結果進行處理,handle有傳回值,whenComplete沒有傳回值

   2.由于1的存在,使得handle多了一個特性,可在handle裡實作exceptionally的功能

allOf(..)  anyOf(..)

allOf:當所有的

CompletableFuture

都執行完後執行計算

anyOf:最快的那個CompletableFuture執行完之後執行計算

場景二:查詢一個商品詳情,需要分别去查商品資訊,賣家資訊,庫存資訊,訂單資訊等,這些查詢互相獨立,在不同的服務上,假設每個查詢都需要一到兩秒鐘,要求總體查詢時間小于2秒.

public static void main(String[] args) throws Exception {

        ExecutorService executorService = Executors.newFixedThreadPool(4);

        long start = System.currentTimeMillis();
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品詳情";
        },executorService);

        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "賣家資訊";
        },executorService);

        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "庫存資訊";
        },executorService);

        CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "訂單資訊";
        },executorService);

        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureA, futureB, futureC, futureD);
        allFuture.join();

        System.out.println(futureA.join() + futureB.join() + futureC.join() + futureD.join());
        System.out.println("總耗時:" + (System.currentTimeMillis() - start));
           

繼續閱讀