天天看點

CompletableFuture異步編排

CompletableFuture異步編排

什麼是CompletableFuture#

CompletableFuture是JDK8提供的Future增強類。CompletableFuture異步任務執行線程池,預設是把異步任務都放在ForkJoinPool中執行。

在這種方式中,主線程不會被阻塞,不需要一直等到子線程完成。主線程可以并行的執行其他任務。

Future存在的問題#

Future實際采用FutureTask實作,該對象相當于是消費者和生産者的橋梁,消費者通過 FutureTask 存儲任務的處理結果,更新任務的狀态:未開始、正在處理、已完成等。而生産者拿到的 FutureTask 被轉型為 Future 接口,可以阻塞式擷取任務的處理結果,非阻塞式擷取任務處理狀态。

使用#

runAsync 和 supplyAsync方法#

CompletableFuture 提供了四個靜态方法來建立一個異步操作。

Copy

public static CompletableFuture runAsync(Runnable runnable)

public static CompletableFuture runAsync(Runnable runnable, Executor executor)

public static CompletableFuture supplyAsync(Supplier supplier)

public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執行異步代碼。如果指定線程池,則使用指定的線程池運作。以下所有的方法都類同。

runAsync方法不支援傳回值。

supplyAsync可以支援傳回值。

計算完成時回調方法#

當CompletableFuture的計算結果完成,或者抛出異常的時候,可以執行特定的Action。主要是下面的方法:

public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action);

public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);

public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

public CompletableFuture exceptionally(Function fn);

whenComplete可以處理正常和異常的計算結果,exceptionally處理異常情況。BiConsumer<? super T,? super Throwable>可以定義處理業務

whenComplete 和 whenCompleteAsync 的差別:

whenComplete:是執行目前任務的線程執行繼續執行 whenComplete 的任務。

whenCompleteAsync:是執行把 whenCompleteAsync 這個任務繼續送出給線程池來進行執行。

方法不以Async結尾,意味着Action使用相同的線程執行,而Async可能會使用其他線程執行(如果是使用相同的線程池,也可能會被同一個線程選中執行)

代碼示例:

public class CompletableFutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
        @Override
        public Object get() {
            System.out.println(Thread.currentThread().getName() + "\t completableFuture");
            int i = 10 / 0;
            return 1024;
        }
    }).whenComplete(new BiConsumer<Object, Throwable>() {
        @Override
        public void accept(Object o, Throwable throwable) {
            System.out.println("-------o=" + o.toString());
            System.out.println("-------throwable=" + throwable);
        }
    }).exceptionally(new Function<Throwable, Object>() {
        @Override
        public Object apply(Throwable throwable) {
            System.out.println("throwable=" + throwable);
            return 6666;
        }
    });
    System.out.println(future.get());
}           

}

handle 方法#

handle 是執行任務完成時對結果的處理。

handle 是在任務完成後再執行,還可以處理異常的任務。

public CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn);

public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);

public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

線程串行化方法#

thenApply 方法:當一個線程依賴另一個線程時,擷取上一個任務傳回的結果,并傳回目前任務的傳回值。

thenAccept方法:消費處理結果。接收任務的處理結果,并消費處理,無傳回結果。

thenRun方法:隻要上面的任務執行完成,就開始執行thenRun,隻是處理完任務後,執行 thenRun的後續操作

帶有Async預設是異步執行的。這裡所謂的異步指的是不在目前線程内執行。

public CompletableFuture thenApply(Function<? super T,? extends U> fn)

public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)

public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage thenAccept(Consumer<? super T> action);

public CompletionStage thenAcceptAsync(Consumer<? super T> action);

public CompletionStage thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage thenRun(Runnable action);

public CompletionStage thenRunAsync(Runnable action);

public CompletionStage thenRunAsync(Runnable action,Executor executor);

Function<? super T,? extends U>

T:上一個任務傳回結果的類型

U:目前任務的傳回值類型

代碼示範:

public static void main(String[] args) throws ExecutionException, InterruptedException {

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        System.out.println(Thread.currentThread().getName() + "\t completableFuture");
        //int i = 10 / 0;
        return 1024;
    }
}).thenApply(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer o) {
        System.out.println("thenApply方法,上次傳回結果:" + o);
        return  o * 2;
    }
}).whenComplete(new BiConsumer<Integer, Throwable>() {
    @Override
    public void accept(Integer o, Throwable throwable) {
        System.out.println("-------o=" + o);
        System.out.println("-------throwable=" + throwable);
    }
}).exceptionally(new Function<Throwable, Integer>() {
    @Override
    public Integer apply(Throwable throwable) {
        System.out.println("throwable=" + throwable);
        return 6666;
    }
}).handle(new BiFunction<Integer, Throwable, Integer>() {
    @Override
    public Integer apply(Integer integer, Throwable throwable) {
        System.out.println("handle o=" + integer);
        System.out.println("handle throwable=" + throwable);
        return 8888;
    }
});
System.out.println(future.get());           

兩任務組合 - 都要完成#

兩個任務必須都完成,觸發該任務。

thenCombine:組合兩個future,擷取兩個future的傳回結果,并傳回目前任務的傳回值

thenAcceptBoth:組合兩個future,擷取兩個future任務的傳回結果,然後處理任務,沒有傳回值。

runAfterBoth:組合兩個future,不需要擷取future的結果,隻需兩個future處理完任務後,處理該任務。

public CompletableFuture thenCombine(

CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
           

public CompletableFuture thenCombineAsync(

CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
           
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor);

           

public CompletableFuture thenAcceptBoth(

CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
           

public CompletableFuture thenAcceptBothAsync(

CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
           
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);
           

public CompletableFuture runAfterBoth(CompletionStage<?> other,

Runnable action);
           

public CompletableFuture runAfterBothAsync(CompletionStage<?> other,

Runnable action);
           
Runnable action,
                                             Executor executor);           

測試案例:

public static void main(String[] args) {

CompletableFuture.supplyAsync(() -> {
    return "hello";
}).thenApplyAsync(t -> {
    return t + " world!";
}).thenCombineAsync(CompletableFuture.completedFuture(" CompletableFuture"), (t, u) -> {
    return t + u;
}).whenComplete((t, u) -> {
    System.out.println(t);
});           

輸出:hello world! CompletableFuture

兩任務組合 - 一個完成#

當兩個任務中,任意一個future任務完成的時候,執行任務。

applyToEither:兩個任務有一個執行完成,擷取它的傳回值,處理任務并有新的傳回值。

acceptEither:兩個任務有一個執行完成,擷取它的傳回值,處理任務,沒有新的傳回值。

runAfterEither:兩個任務有一個執行完成,不需要擷取future的結果,處理任務,也沒有傳回值。

public CompletableFuture applyToEither(

CompletionStage<? extends T> other, Function<? super T, U> fn);
           

public CompletableFuture applyToEitherAsync(

CompletionStage<? extends T> other, Function<? super T, U> fn);
           
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor);
           

public CompletableFuture acceptEither(

CompletionStage<? extends T> other, Consumer<? super T> action);
           

public CompletableFuture acceptEitherAsync(

CompletionStage<? extends T> other, Consumer<? super T> action);
           
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor);
           

public CompletableFuture runAfterEither(CompletionStage<?> other,

Runnable action);
           

public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,

Runnable action);
           
Runnable action,
                                               Executor executor);           

多任務組合#

public static CompletableFuture allOf(CompletableFuture<?>... cfs);

public static CompletableFuture

anyOf:隻要有一個任務完成

List<CompletableFuture> futures = Arrays.asList(CompletableFuture.completedFuture("hello"),
                                                CompletableFuture.completedFuture(" world!"),
                                                CompletableFuture.completedFuture(" hello"),
                                                CompletableFuture.completedFuture("java!"));
final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
allCompleted.thenRun(() -> {
    futures.stream().forEach(future -> {
        try {
            System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    });
});           

作者: ingxx

出處:

https://www.cnblogs.com/ingxx/p/12598414.html