天天看點

CompletableFuture(一)

CompletableFuture

完成異步任務回調其他任務

可以等待一個任務執行結束就回調,

也可以等待兩個任務都結束就回調,

或者等待其中一個結束就回調。

預設使用ForkJoinPool,也可自己配置。

CompletionStage

CompletableFuture實作了CompletionStage,CompletionStage 主要負責任務的執行,将任務分成一個一個stage在同一個stage會同時執行,執行結束會通知另一個stage執行

thenApply( Function )接收前一個任務傳回,并傳回目前任務結果

thenAccept( Consumer )接收前一個任務傳回

supplyAsync(Supplier )運作任務并傳回結果

//依賴單個階段
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);     // 預設執行方式
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);// 預設的異步執行方式
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor); //自定義的執行方式

//依賴兩個階段都完成
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);

//依賴兩個階段中的任何一個完成
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

//任意階段 傳回 則回調
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
           
public static void main(String[] args) throws Exception{
//        thenApply();
//        thenCombine();
        applyToEither();
    }

    //依賴單個階段
    public static void thenApply()throws Exception{
        //可完成的異步任務
        CompletableFuture<Void> completableFuture = CompletableFuture
                //Supplier 執行方法并傳回結果 不需要參數
                .supplyAsync(() -> {
                    System.out.println("aa1");
                    return "q";
                })

                //Function
//                .thenApplyAsync(a)-> { //異步
                .thenApply((a)-> {
                    System.out.println("等待前一個方法執行結束 回調該方法 接收 前一個傳回,同步執行目前方法(由前一個線程同步執行) 并傳回結果");
                    return 1;}
                )

                //Consumer
                .thenAcceptAsync(System.out::println)//等待前一個方法執行結束 回調該方法 接收前一個CompletableFuture傳回 異步執行目前方法
                // .thenAccept(System.out::println)//接收前一個CompletableFuture傳回 同步執行目前方法

                .thenRun(()-> System.out.println("等待前一個方法執行結束 回調該方法"))
                ;

        //阻塞等待任務結束,無傳回值
        completableFuture.get();
    }

    //依賴兩個階段都完成
    public static void thenCombine()throws Exception{
        CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {  //任務 1
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任務1 執行結束");
                    return Thread.currentThread().getName() + " s1";
                })
                .thenCombine(                         //綁定到 任務1 stage
                        CompletableFuture.supplyAsync(() -> { //任務 2
                            try {
                                Thread.sleep(2000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("任務2 執行結束");
                            return Thread.currentThread().getName() + " s1";
                        }),
                        (s1, s2) -> { //等待 兩個任務執行結束,傳回目前結果
                            System.out.println(s1);
                            System.out.println(s2);
                            return 1;
                        }
               );
        Integer integer = integerCompletableFuture.get();
        System.out.println(Thread.currentThread().getName() + " " +integer);
    }


    //依賴兩個階段中的任何一個完成
    public static void applyToEither()throws Exception{
        CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {  //任務 1
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任務1 執行結束");
            return Thread.currentThread().getName() + " s1";
        })
                .applyToEither(                         //綁定到 任務1 stage
                        CompletableFuture.supplyAsync(() -> { //任務 2
                            try {
                                Thread.sleep(2000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("任務2 執行結束");
                            return Thread.currentThread().getName() + " s1";
                        }),
                        (sany) -> { //等待 兩個任務執行結束,傳回目前結果
                            System.out.println(sany);
                            return 1;
                        }
                );
        Integer integer = integerCompletableFuture.get();
        System.out.println(Thread.currentThread().getName() + " " +integer);
    }
           

Completion stack

CompletableFuture内部有一個Completion stack,用于記錄回調

使用連結清單實作棧 為什麼?

因為連結清單空間不連續不需要擴容

數組實作 可以利用緩存行來加速讀取 但是增删會需要擴容

将ComplatableFuture嵌入runable 包裝成 Completion 放入線程池執行 最後将Completion 壓棧

public static void main(String[] args) {
        //異步執行
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> action("動作1"));

        /**
         * A CompletableFuture may have dependent completion actions,
         * collected in a linked stack.
         * 将回調動作 都添加到 stack中
         */
        //回調stage
        completableFuture.thenRun(()-> System.out.println(1));
        completableFuture.thenRun(()-> System.out.println(2));
        completableFuture.thenRun(()-> System.out.println(3));
        try {
            completableFuture.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void action(String act) {

        System.out.println(act+"start");

        //如果沒睡眠 動作1就直接執行完 後面任務 就不會被壓棧 是以睡眠就會倒叙輸出
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(act+"end");
    }