天天看點

java8多線程異步調用 CompletableFuture 詳解

CompletableFuture 詳解

CompletableFuture類實作了CompletionStage和Future接口。Future是Java 5添加的類,用來描述一個異步計算的結果,但是擷取一個結果時方法較少,要麼通過輪詢isDone,确認完成後,調用get()擷取值,要麼調用get()設定一個逾時時間。但是這個get()方法會阻塞住調用線程,這種阻塞的方式顯然和我們的異步程式設計的初衷相違背。

為了解決這個問題,JDK吸收了guava的設計思想,加入了Future的諸多擴充功能形成了CompletableFuture。

CompletionStage是一個接口,從命名上看得知是一個完成的階段,它裡面的方法也标明是在某個運作階段得到了結果之後要做的事情。

進行變換

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
           

首先說明一下已Async結尾的方法都是可以異步執行的,如果指定了線程池,會在指定的線程池中執行,如果沒有指定,預設會在ForkJoinPool.commonPool()中執行,下文中将會有好多類似的,都不詳細解釋了。關鍵的入參隻有一個Function,它是函數式接口,是以使用Lambda表示起來會更加優雅。它的入參是上一個階段計算後的結果,傳回值是經過轉化後結果。

例如:

@Test
    public void thenApply() {
        String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
        System.out.println(result);
    }
           

結果為:

hello world
           

進行消耗

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
           

thenAccept是針對結果進行消耗,因為他的入參是Consumer,有入參無傳回值。

例如:

@Test
public void thenAccept(){    
       CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world"));
}
           

結果為:

hello world
           

對上一步的計算結果不關心,執行下一個操作。

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
           

thenRun它的入參是一個Runnable的執行個體,表示當得到上一步的結果時的操作。

例如:

@Test
public void thenRun(){
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }).thenRun(() -> System.out.println("hello world"));
    while (true){}
}
           

結果為:

hello world
           

4.結合兩個CompletionStage的結果,進行轉化後傳回

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
           

它需要原來的處理傳回值,并且other代表的CompletionStage也要傳回值之後,利用這兩個傳回值,進行轉換後傳回指定類型的值。

例如:

@Test
public void thenCombine() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }).thenCombine(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "world";
    }), (s1, s2) -> s1 + " " + s2).join();
    System.out.println(result);
}
           

結果為:

hello world
           

結合兩個CompletionStage的結果,進行消耗

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);
           

它需要原來的處理傳回值,并且other代表的CompletionStage也要傳回值之後,利用這兩個傳回值,進行消耗。

例如:

@Test
public void thenAcceptBoth() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "world";
    }), (s1, s2) -> System.out.println(s1 + " " + s2));
    while (true){}
}
           

結果為:

hello world
           

在兩個CompletionStage都運作完執行。

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
           

不關心這兩個CompletionStage的結果,隻關心這兩個CompletionStage執行完畢,之後在進行操作(Runnable)。

例如:

@Test
public void runAfterBoth(){
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s2";
    }), () -> System.out.println("hello world"));
    while (true){}
}
           

結果為

hello world

6.兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的轉化操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
           

我們現實開發場景中,總會碰到有兩種管道完成同一個事情,是以就可以調用這個方法,找一個最快的結果進行處理。

例如:

@Test
public void applyToEither() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).applyToEither(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello world";
    }), s -> s).join();
    System.out.println(result);
}
           

結果為:

hello world
兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的消耗操作。
 
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
           

例如:

@Test
public void acceptEither() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).acceptEither(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello world";
    }), System.out::println);
    while (true){}
}
           

結果為:

hello world
兩個CompletionStage,任何一個完成了都會執行下一步的操作(Runnable)。
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
           

例如:

@Test
public void runAfterEither() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).runAfterEither(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s2";
    }), () -> System.out.println("hello world"));
    while (true) {
    }
}
           

結果為:

hello world
           

當運作時出現了異常,可以通過exceptionally進行補償。

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
           

例如:

@Test
public void exceptionally() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (1 == 1) {
            throw new RuntimeException("測試一下異常情況");
        }
        return "s1";
    }).exceptionally(e -> {
        System.out.println(e.getMessage());
        return "hello world";
    }).join();
    System.out.println(result);
}
           

結果為:

java.lang.RuntimeException: 測試一下異常情況
hello world
           

當運作完成時,對結果的記錄。這裡的完成時有兩種情況,一種是正常執行,傳回值。另外一種是遇到異常抛出造成程式的中斷。這裡為什麼要說成記錄,因為這幾個方法都會傳回CompletableFuture,當Action執行完畢後它的結果傳回原始的CompletableFuture的計算結果或者傳回異常。是以不會對結果産生任何的作用。

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
           

例如:

@Test
public void whenComplete() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (1 == 1) {
            throw new RuntimeException("測試一下異常情況");
        }
        return "s1";
    }).whenComplete((s, t) -> {
        System.out.println(s);
        System.out.println(t.getMessage());
    }).exceptionally(e -> {
        System.out.println(e.getMessage());
        return "hello world";
    }).join();
    System.out.println(result);
}
           

結果為:

null
java.lang.RuntimeException: 測試一下異常情況
java.lang.RuntimeException: 測試一下異常情況
hello world
           

這裡也可以看出,如果使用了exceptionally,就會對最終的結果産生影響,它沒有口子傳回如果沒有異常時的正确的值,這也就引出下面我們要介紹的handle。

運作完成時,對結果的處理。這裡的完成時有兩種情況,一種是正常執行,傳回值。另外一種是遇到異常抛出造成程式的中斷。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
           

例如:

出現異常時

@Test
public void handle() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //出現異常
        if (1 == 1) {
            throw new RuntimeException("測試一下異常情況");
        }
        return "s1";
    }).handle((s, t) -> {
        if (t != null) {
            return "hello world";
        }
        return s;
    }).join();
    System.out.println(result);
}
           

結果為:

hello world
           

未出現異常時

@Test
public void handle() {
    String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).handle((s, t) -> {
        if (t != null) {
            return "hello world";
        }
        return s;
    }).join();
    System.out.println(result);
}
           

結果為:

s1
           

上面就是CompletionStage接口中方法的使用執行個體,CompletableFuture同樣也同樣實作了Future,是以也同樣可以使用get進行阻塞擷取值,總的來說,CompletableFuture使用起來還是比較爽的,看起來也比較優雅一點。