CompletableFuture
是jdk1.8引入的一個新特性。 它主要是為了解決多個
Future
結果之間的依賴關系。比如:
-
将兩個異步計算合并為一個——這兩個異步計算之間互相獨立,同時第二個又依賴于第
一個的結果。
- 等待
集合中的所有任務都完成。Future
- 僅等待
Future
集合中最快結束的任務完成(有可能因為它們試圖通過不同的方式計算同
一個值),并傳回它的結果。
- 通過程式設計方式完成一個
任務的執行(即以手工設定異步操作結果的方式)。Future
- 應對Future的完成事件(即當
的完成事件發生時會收到通知,并能使用Future
計算的結果進行下一步的操作,不隻是簡單地阻塞等待操作的結果)。Future
runAsync 和 supplyAsync方法
CompletableFuture
提供了四個靜态方法來建立一個異步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
-
方法不支援傳回值runAsync
-
可以支援傳回值supplyAsync
如果
executor
參數沒有設定值,那麼會使用
ForkJoinPool.commonPool
預設線程池執行任務。
示例
@Test
public void testRunAsync() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName() + " 執行異步任務 runAsync"), executor);
String result = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync");
return "有結果";
}, executor).get();
System.out.println(result);
}
pool-1-thread-1 執行異步任務 runAsync
pool-1-thread-2 執行異步任務 supplyAsync
有結果
計算結果完成時的回調方法
當
CompletableFuture
的計算結果完成,或者抛出異常的時候,可以執行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的類型是
BiConsumer<? super T,? super Throwable>
它可以處理正常的計算結果,或者異常情況。
whenComplete
和
whenCompleteAsync
的差別:
-
:是執行目前任務的線程執行繼續執行 whenComplete 的任務。whenComplete
-
:是執行把 whenCompleteAsync 這個任務繼續送出給線程池來進行執行。whenCompleteAsync
示例
@Test
public void testWhenComplete() {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync");
return "有結果";
}, executor).whenComplete((s, throwable) -> {
System.out.println(Thread.currentThread().getName() + " 執行了 whenComplete");
if (!StringUtils.isEmpty(s)) {
System.out.println(Thread.currentThread().getName() + " 真的有結果诶! 結果是:" + s);
}
}).whenCompleteAsync((s, throwable) -> {
System.out.println(Thread.currentThread().getName() + " 執行了 whenCompleteAsync");
System.out.println(Thread.currentThread().getName() + " 列印結果值 :" + s);
}, executor).exceptionally(throwable -> {
System.out.println(Thread.currentThread().getName() + " 執行了 exceptionally");
System.out.println(Thread.currentThread().getName() + " 異常了 :" + throwable.getMessage());
return "異常了";
});
}
pool-1-thread-1 執行異步任務 supplyAsync
pool-1-thread-1 執行了 whenComplete
pool-1-thread-1 真的有結果诶! 結果是:有結果
pool-1-thread-2 執行了 whenCompleteAsync
pool-1-thread-2 列印結果值 :有結果
thenApply 方法
當一個線程依賴另一個線程時,可以使用 thenApply 方法來把這兩個線程串行化。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Function<? super T,? extends U>
- T:上一個任務傳回結果的類型
- U:目前任務的傳回值類型
thenApply
和
thenApplyAsync
的差別:
-
:使用目前線程來執行任務thenApply
-
:如果設定了thenApplyAsync
則使用設定的線程池執行任務,如果沒有設定則使用executor
線程池執行ForkJoinPool.commonPool
示例
@Test
public void testThenApply() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
Integer integer = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync");
return "有結果";
}, executor).thenApplyAsync(s -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 thenApply");
if (!StringUtils.isEmpty(s)) {
return s.length();
}
return 0;
}, executor).get();
System.out.println(Thread.currentThread().getName() + " " + integer);
}
pool-1-thread-1 執行異步任務 supplyAsync
pool-1-thread-2 執行異步任務 thenApply
main 3
handle 方法
handle
是執行任務完成時對結果的處理。
handle
方法和
thenApply
方法處理方式基本一樣。不同的是
handle
是在任務完成後再執行,還可以處理異常的任務。
thenApply
隻可以執行正常的任務,任務出現異常則不執行
thenApply
方法。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
示例
@Test
public void testHandle() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
Integer integer = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync");
return 10/0;
}, executor).handleAsync((s, throwable) -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 handleAsync");
if (Objects.nonNull(throwable)) {
System.out.println(Thread.currentThread().getName() + " 異常資訊 " + throwable.getMessage());
return 1;
}
return s;
}, executor).get();
System.out.println(Thread.currentThread().getName() + " " + integer);
}
pool-1-thread-1 執行異步任務 supplyAsync
pool-1-thread-2 執行異步任務 handleAsync
pool-1-thread-2 異常資訊 java.lang.ArithmeticException: / by zero
main 1
從示例中可以看出,在
handle
中可以根據任務是否有異常來進行做相應的後續處理操作。而
thenApply
方法,如果上個任務出現錯誤,則不會執行
thenApply
方法。
thenAccept 消費處理結果
接收任務的處理結果,并消費處理,無傳回結果。
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
示例
@Test
@Test
public void testThenAccept() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync");
return 10;
}, executor).thenAcceptAsync(s -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 thenAccept");
System.out.println(Thread.currentThread().getName() + " " + s);
}, executor).get();
}
pool-1-thread-1 執行異步任務 supplyAsync
pool-1-thread-2 執行異步任務 thenAccept
pool-1-thread-2 10
從示例代碼中可以看出,該方法隻是消費執行完成的任務,并可以根據上面的任務傳回的結果進行處理。如果第一個任務發生異常,那麼
thenAccept
方法不會被執行。
thenRun 方法
跟
thenAccept
方法不一樣的是,
thenRun
不關心任務的處理結果。隻要上面的任務執行完成,就開始執行
thenRun
裡面的任務。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
示例
@Test
public void testThenRun() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync");
return 10;
}, executor).thenRunAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 thenRun");
}, executor).get();
}
pool-1-thread-1 執行異步任務 supplyAsync
pool-1-thread-2 執行異步任務 thenRun
thenCombine 合并任務
thenCombine
會把 兩個
CompletionStage
的任務都執行完成後,把兩個任務的結果一塊交給
thenCombine
來處理。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
示例
@Test
public void testThenCombine() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
Integer result = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync1");
return 10;
}, executor).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync2");
return 15;
}, executor), (result1, result2) -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 thenCombine");
return result1 + result2;
}).get();
System.out.println(Thread.currentThread().getName() + " " + result);
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync1");
return 10;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync2");
return 15;
}, executor);
result = future1.thenCombine(future2, (result1, result2) -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 thenCombine");
return result1 + result2;
}).get();
System.out.println(Thread.currentThread().getName() + " " + result);
}
pool-1-thread-1 執行異步任務 supplyAsync1
pool-1-thread-2 執行異步任務 supplyAsync2
pool-1-thread-2 執行異步任務 thenCombine
main 25
pool-1-thread-3 執行異步任務 supplyAsync1
pool-1-thread-4 執行異步任務 supplyAsync2
main 執行異步任務 thenCombine
main 25
thenAcceptBoth
當兩個
CompletionStage
都執行完成後,把結果一塊交給
thenAcceptBoth
來進行消耗。
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
示例
@Test
public void testThenAcceptBoth() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync1");
return 10;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync2");
return 15;
}, executor);
future1.thenAcceptBoth(future2, (result1, result2) -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 thenCombine");
System.out.println(Thread.currentThread().getName() + " " + (result1 + result2));
}).get();
}
pool-1-thread-1 執行異步任務 supplyAsync1
pool-1-thread-2 執行異步任務 supplyAsync2
pool-1-thread-2 執行異步任務 thenAcceptBoth
pool-1-thread-2 25
applyToEither 方法
兩個
CompletionStage
,誰執行傳回的結果快,我就用那個
CompletionStage
的結果進行下一步的轉化操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
示例
@Test
public void testApplyToEither() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync2 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
String result = future1.applyToEither(future2, integer -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 applyToEither");
return integer + "";
}).get();
System.out.println(Thread.currentThread().getName() + " 執行異步任務 " + result);
}
pool-1-thread-1 執行異步任務 supplyAsync1 301
pool-1-thread-2 執行異步任務 supplyAsync2 422
pool-1-thread-1 執行異步任務 applyToEither
main 執行異步任務 301
acceptEither 方法
兩個
CompletionStage
,誰執行傳回的結果快,我就用那個
CompletionStage
的結果進行下一步的消耗操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
示例
@Test
public void testAcceptEither() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync2 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
future1.acceptEither(future2, integer -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 acceptEither" + integer);
}).get();
}
pool-1-thread-1 執行異步任務 supplyAsync1 507
pool-1-thread-2 執行異步任務 supplyAsync2 167
pool-1-thread-2 執行異步任務 acceptEither 167
runAfterEither 方法
兩個
CompletionStage
,任何一個完成了都會執行下一步的操作(
Runnable
)
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例
@Test
public void testRunAfterEither() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync2 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
future1.runAfterEither(future2, () -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 runAfterEither ");
}).get();
System.out.println(Thread.currentThread().getName() + " 任務結束 耗時:" + (System.currentTimeMillis() - start));
}
pool-1-thread-1 執行異步任務 supplyAsync1 23
pool-1-thread-2 執行異步任務 supplyAsync2 704
pool-1-thread-1 執行異步任務 runAfterEither
main 任務結束 耗時:32
runAfterBoth 方法
兩個
CompletionStage
,都完成了計算才會執行下一步的操作(Runnable)
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例
@Test
public void testRunAfterBoth () throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync2 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
future1.runAfterBoth(future2, () -> {
System.out.println(Thread.currentThread().getName() + " 執行異步任務 runAfterBoth ");
}).get();
System.out.println(Thread.currentThread().getName() + " 任務結束 耗時:" + (System.currentTimeMillis() - start));
}
pool-1-thread-1 執行異步任務 supplyAsync1 576
pool-1-thread-2 執行異步任務 supplyAsync2 941
pool-1-thread-2 執行異步任務 runAfterBoth
main 任務結束 耗時:959
thenCompose 方法
thenCompose
方法允許你對兩個
CompletionStage
進行流水線操作,第一個操作完成時,将其結果作為參數傳遞給第二個操作。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
示例
@Test
public void testThenCompose() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
// 分解一下好看些
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return probe;
}, executor);
String result = future1.thenCompose(integer -> CompletableFuture.supplyAsync(() -> {
int probe = ThreadLocalRandom.current().nextInt(1000);
System.out.println(Thread.currentThread().getName() + " 執行異步任務 supplyAsync1 " + probe);
CompletableFutureTest.sleep(probe);
return integer + " " + probe;
}, executor)).get();
System.out.println(Thread.currentThread().getName() + " " + result);
System.out.println(Thread.currentThread().getName() + " 任務結束 耗時:" + (System.currentTimeMillis() - start));
}
pool-1-thread-1 執行異步任務 supplyAsync1 460
pool-1-thread-2 執行異步任務 supplyAsync1 320
main 460 320
main 任務結束 耗時:788
總結
- 如果沒有指定線程池
會使用
CompletableFuture
作為它的線程池執行異步代碼,如果指定線程池,則使用指定的線程池運作。
ForkJoinPool.commonPool()
參考
《Java 8實戰》
https://www.jianshu.com/p/6bac52527ca4
源碼
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases