天天看點

Java新特性:異步編排CompletableFuture

作者:JAVA全棧攻城獅

CompletableFuture由Java 8提供,是實作異步化的工具類,上手難度較低,且功能強大,支援通過函數式程式設計的方式對各類操作進行組合編排。 CompletableFuture實作了CompletionStage接口和Future接口,前者是對後者的一個擴充,增加了異步回調、流式處理、多個Future組合處理的能力,使Java在處理多任務的協同工作時更加順暢便利。

1.背景

随着業務項目數量的增大,系統服務面臨的壓力也越來越大,這時候系統吞吐量會下降,但是一些核心功能的接口必須保證高吞吐量,低延遲。這時候我們就需要對接口進行優化,提升性能,進而保證高吞吐量。這時候CompletableFuture就用很大的用武之地了,我們一些核心接口處理資料都是串行執行的,但是其實接口的某些資料擷取、處理封裝并沒有前後依賴關系,我們大可并行處理,這樣就可以充分利用cpu資源。

一般我們的接口調用執行分為同步或者異步:

1.1 同步執行

通常我們的接口資料查詢多次資料庫擷取資料然後進行處理,封裝傳回,或者是多次rpc調用其他服務擷取資料,但是無論什麼擷取資料的操作,都是串行執行的,也就是操作2必須要等操作1完成之後在執行,即使操作1和操作2之間沒有任何聯系

Java新特性:異步編排CompletableFuture

在同步調用的場景下,接口耗時長、性能差,接口響應時長T = T1+T2+T3+……+Tn,這時為了縮短接口的響應時間,一般會使用線程池的方式并行擷取資料

1.2 異步執行

Java新特性:異步編排CompletableFuture

使用并行擷取資料,大大降低了接口對資料擷取,處理的時間

2.CompletableFuture使用

下面我們通過一個例子來講解CompletableFuture如何使用,商品詳情接口傳回資料使用CompletableFuture進行資料封裝任務進行異步編排:

ini複制代碼    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("product-pool-%d").build();

    private static ExecutorService fixedThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
            Runtime.getRuntime().availableProcessors() * 40,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors() * 20),
            namedThreadFactory);

/**
     * 使用completableFuture執行多線程任務安排,提高速度,completableFuture可以讓某些異步線程任務串行化順序執行
     * 如果不要求某些異步任務串行化順序執行,那麼也可以JUC裡面另一個countDownLatch實作
     *
     * @param skuId
     * @return
     */
    @Override
    public SkuInfo getSkuDetail(Long skuId) {
        SkuInfo skuInfo = new SkuInfo();
        // 擷取sku資訊
        CompletableFuture<ProductSku> skuFuture = CompletableFuture.supplyAsync(() -> {
            ProductSku sku = productSkuDAO.selectById(skuId);
            skuInfo.setSku(sku);
            return sku;
        }, fixedThreadPool);
        // 異步擷取spu資訊
        CompletableFuture<ProductSpu> spuFuture = skuFuture.thenApplyAsync(sku -> {
            ProductSpu spu = productSpuDAO.selectById(sku.getSpuId());
            skuInfo.setSpu(spu);
            return spu;
        }, fixedThreadPool);
        // 異步擷取品牌資訊
        CompletableFuture<BrandDTO> brandFuture = skuFuture.thenApplyAsync(sku -> {
            BrandDTO brandDTO = brandService.getBrandDetail(sku.getBrandId());
            skuInfo.setBrand(brandDTO);
            return brandDTO;
        }, fixedThreadPool);
       // 異步擷取分類資訊
        CompletableFuture<CategoryDTO> categoryFuture = skuFuture.thenApplyAsync(sku -> {
            CategoryDTO categoryDTO = categoryService.getCategoryDetail(sku.getCategoryId());
            skuInfo.setCategory(categoryDTO);
            return categoryDTO;
        }, fixedThreadPool);
        try {
            // 最後等待所有異步任務執行完成傳回封裝結果
            CompletableFuture.allOf(skuFuture, spuFuture, brandFuture, categoryFuture).get();
        } catch (Exception e) {
            log.error("<=======等候所有任務執行過程報錯:======>", e);
        }
        return skuInfo;
    }
            
            
           

2.1 supplyAsync / runAsync

supplyAsync表示建立帶傳回值的異步任務的,相當于ExecutorService submit(Callable task) 方法,runAsync表示建立無傳回值的異步任務,相當于ExecutorService submit(Runnable task)方法,這兩方法的效果跟submit是一樣的,測試用例如下:

csharp複制代碼    /**
     * 測試方法CompletableFuture.runAsync:無傳回值,
     */
    private static void testRunAsync() {
        CompletableFuture.runAsync(() ->{
            System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
            System.out.println("supplyAsync 是否為守護線程 " + Thread.currentThread().isDaemon());
            int result = 10/2;
            System.out.println("計算結果為:"+ result);
        }, fixedThreadPool);
    }

    /**
     * 測試方法CompletableFuture.supplyAsync:有傳回值
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private static void testSupplyAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
            int result = 10 / 2;
            return result;
        }, fixedThreadPool);
        Integer res = future.get();
        System.out.println("傳回結果值為:"+res);
    }
           

這兩方法各有一個重載版本,可以指定執行異步任務的Executor實作,如果不指定,預設使用ForkJoinPool.commonPool(),如果機器是單核的,則預設使用ThreadPerTaskExecutor,該類是一個内部類,每次執行execute都會建立一個新線程,具體可以看CompletableFuture源碼。

2.2 thenApply / thenApplyAsync

thenApply 表示某個任務執行完成後執行的動作,即回調方法,會将該任務的執行結果即方法傳回值作為入參傳遞到回調方法中,

csharp複制代碼 /**
     * 線程串行化
     * 1、thenRun:不能擷取上一步的執行結果
     * 2、thenAcceptAsync:能接受上一步結果,但是無傳回值
     * 3、thenApplyAsync:能接受上一步結果,有傳回值
     *
     */
    private static void testThenApplyAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運作結果:" + i);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return i;
        }, executor);
        CompletableFuture<String> future2 = future1.thenApplyAsync(res -> {
            System.out.println("======任務2啟動了..." + res*20);
            return "Hello" + res;
        }, executor);

        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            System.out.println("======任務3執行了");
        }, executor);

        CompletableFuture.allOf(future1, future2, future3).get();
        System.out.println("=======測試結束");

    }
           

thenApplyAsync與thenApply的差別在于,前者是将job2送出到線程池中異步執行,實際執行future2的線程可能是另外一個線程,後者是由執行future1的線程立即執行future2,即兩個future都是同一個線程執行的

2.3 exceptionally/whenComplete/handle

exceptionally方法指定某個任務執行異常時執行的回調方法,會将抛出異常作為參數傳遞到回調方法中,如果該任務正常執行則會exceptionally方法傳回的CompletionStage的result就是該任務正常執行的結果;whenComplete是當某個任務執行完成後執行的回調方法,會将執行結果或者執行期間抛出的異常傳遞給回調方法,如果是正常執行則異常為null,回調方法對應的CompletableFuture的result和該任務一緻,如果該任務正常執行,則get方法傳回執行結果,如果是執行異常,則get方法抛出異常

csharp複制代碼 /**
     * 測試whenComplete和exceptionally: 異步方法執行完的處理
     */
    private static void testWhenCompleteAndExceptionally() throws ExecutionException, InterruptedException {
         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
             System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
             Integer num = 10;
             int i = num / 2;
             String s = String.valueOf(null);
             System.out.println("運作結果:" + i);
             return i;
         }, executor).whenComplete((res,exception) -> {
             //雖然能得到異常資訊,但是沒法修改傳回資料
             System.out.println("<=====異步任務成功完成了=====結果是:" + res + "=======異常是:" + exception);
         }).exceptionally(throwable -> {
             //可以感覺異常,同時傳回預設值
             System.out.println("<=====異步任務成功發生異常了======"+throwable);
             return 10;
         });
        Integer result = future.get();
        System.out.println("<=====最終傳回結果result=" + result + "======>");

    }

    /**
     * 測試handle方法:它是whenComplete和exceptionally的結合
     */
    private static void testHandle() {
         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
             System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
             int i = 10 / 2;
             System.out.println("運作結果:" + i);
             return i;
         }, executor).handle((result,thr) -> {
             if (result != null) {
                 return result * 2;
             }
             if (thr != null) {
                 System.out.println("異步任務成功完成了...結果是:" + result + "異常是:" + thr);
                 return 0;
             }
             return 0;
         });
    }
           

2.4 組合處理 thenCombine / thenAcceptBoth / runAfterBoth

這三個方法都是将兩個CompletableFuture組合起來,隻有這兩個都正常執行完了才會執行某個任務,差別在于,thenCombine會将兩個任務的執行結果作為方法入參傳遞到指定方法中,且該方法有傳回值;thenAcceptBoth同樣将兩個任務的執行結果作為方法入參,但是無傳回值;runAfterBoth沒有入參,也沒有傳回值。注意兩個任務中隻要有一個執行異常,則将該異常資訊作為指定任務的執行結果

csharp複制代碼    private static void thenCombine() throws Exception {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello1", fixedThreadPool);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello2", fixedThreadPool);
        CompletableFuture<String> result = future1.thenCombine(future2, (t, u) -> t+" "+u);
        System.out.println(result.get());
    }
    
        private static void thenAcceptBoth() throws Exception {
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        },fixedThreadPool);

        CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        },fixedThreadPool);
    }
           

2.5 applyToEither / acceptEither / runAfterEither

這三個方法都是将兩個CompletableFuture組合起來,隻要其中一個執行完了就會執行某個任務,其差別在于applyToEither會将已經執行完成的任務的執行結果作為方法入參,并有傳回值;acceptEither同樣将已經執行完成的任務的執行結果作為方法入參,但是沒有傳回值;runAfterEither沒有方法入參,也沒有傳回值。注意兩個任務中隻要有一個執行異常,則将該異常資訊作為指定任務的執行結果

csharp複制代碼    private static void applyToEither() throws Exception {
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        },fixedThreadPool);
        CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        },fixedThreadPool);

        CompletableFuture<Integer> result = f1.applyToEither(f2, t -> {
            System.out.println("applyEither:"+t);
            return t * 2;
        });

    }

    private static void acceptEither() throws Exception {
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        },fixedThreadPool);
        CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        },fixedThreadPool);

        CompletableFuture<Void> result = f1.acceptEither(f2, t -> {
            System.out.println("acceptEither:"+t);
        });

    }
           

2.6 allOf / anyOf

allOf傳回的CompletableFuture是多個任務都執行完成後才會執行,隻有有一個任務執行異常,則傳回的CompletableFuture執行get方法時會抛出異常,如果都是正常執行,則get傳回null。

csharp複制代碼    private static void testThenApplyAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運作結果:" + i);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return i;
        }, executor);
        CompletableFuture<String> future2 = future1.thenApplyAsync(res -> {
            System.out.println("======任務2啟動了..." + res*20);
            return "Hello" + res;
        }, executor);

        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            System.out.println("======任務3執行了");
        }, executor);

        CompletableFuture.allOf(future1, future2, future3).get();
        System.out.println("=======測試結束");

    }
           

注意,使用CompletableFuture可能有某些異步任務不執行,示例如下:

csharp複制代碼    private static void testNotExecute() {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
            System.out.println("supplyAsync 是否為守護線程 " + Thread.currentThread().isDaemon());
            int i = 10 / 2;
            System.out.println("運作結果:" + i);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 下面不列印
            System.out.println("return之前的列印");
            return i;
        });
    }
           

造成這個原因是因為Daemon。因為completableFuture這套使用異步任務的操作都是建立成了守護線程。那麼我們沒有調用get方法不阻塞這個主線程的時候。主線程執行完畢。所有線程執行完畢就會導緻一個問題,就是守護線程退出。那麼我們沒有執行的代碼就是因為主線程不再跑任務而關閉導緻的。

3.CompletableFuture的實作原理

從CompletableFuture源碼可知,CompletableFuture中包含兩個字段:result和stack。result用于存儲目前CF的結果,stack(Completion)表示目前CF完成後需要觸發的依賴動作(Dependency Actions),去觸發依賴它的CF的計算,依賴動作可以有多個(表示有多個依賴它的CF),以棧(Treiber stack)的形式存儲,stack表示棧頂元素。

作者:shepherd111

連結:https://juejin.cn/post/7242113868553306172