CompletableFuture由Java 8提供,是實作異步化的工具類,上手難度較低,且功能強大,支援通過函數式程式設計的方式對各類操作進行組合編排。 CompletableFuture實作了CompletionStage接口和Future接口,前者是對後者的一個擴充,增加了異步回調、流式處理、多個Future組合處理的能力,使Java在處理多任務的協同工作時更加順暢便利。
1.背景
随着業務項目數量的增大,系統服務面臨的壓力也越來越大,這時候系統吞吐量會下降,但是一些核心功能的接口必須保證高吞吐量,低延遲。這時候我們就需要對接口進行優化,提升性能,進而保證高吞吐量。這時候CompletableFuture就用很大的用武之地了,我們一些核心接口處理資料都是串行執行的,但是其實接口的某些資料擷取、處理封裝并沒有前後依賴關系,我們大可并行處理,這樣就可以充分利用cpu資源。
一般我們的接口調用執行分為同步或者異步:
1.1 同步執行
通常我們的接口資料查詢多次資料庫擷取資料然後進行處理,封裝傳回,或者是多次rpc調用其他服務擷取資料,但是無論什麼擷取資料的操作,都是串行執行的,也就是操作2必須要等操作1完成之後在執行,即使操作1和操作2之間沒有任何聯系
在同步調用的場景下,接口耗時長、性能差,接口響應時長T = T1+T2+T3+……+Tn,這時為了縮短接口的響應時間,一般會使用線程池的方式并行擷取資料
1.2 異步執行
使用并行擷取資料,大大降低了接口對資料擷取,處理的時間
2.CompletableFuture使用
下面我們通過一個例子來講解CompletableFuture如何使用,商品詳情接口傳回資料使用CompletableFuture進行資料封裝任務進行異步編排:
ini複制代碼 private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("product-pool-%d").build();
private static ExecutorService fixedThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 40,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors() * 20),
namedThreadFactory);
/**
* 使用completableFuture執行多線程任務安排,提高速度,completableFuture可以讓某些異步線程任務串行化順序執行
* 如果不要求某些異步任務串行化順序執行,那麼也可以JUC裡面另一個countDownLatch實作
*
* @param skuId
* @return
*/
@Override
public SkuInfo getSkuDetail(Long skuId) {
SkuInfo skuInfo = new SkuInfo();
// 擷取sku資訊
CompletableFuture<ProductSku> skuFuture = CompletableFuture.supplyAsync(() -> {
ProductSku sku = productSkuDAO.selectById(skuId);
skuInfo.setSku(sku);
return sku;
}, fixedThreadPool);
// 異步擷取spu資訊
CompletableFuture<ProductSpu> spuFuture = skuFuture.thenApplyAsync(sku -> {
ProductSpu spu = productSpuDAO.selectById(sku.getSpuId());
skuInfo.setSpu(spu);
return spu;
}, fixedThreadPool);
// 異步擷取品牌資訊
CompletableFuture<BrandDTO> brandFuture = skuFuture.thenApplyAsync(sku -> {
BrandDTO brandDTO = brandService.getBrandDetail(sku.getBrandId());
skuInfo.setBrand(brandDTO);
return brandDTO;
}, fixedThreadPool);
// 異步擷取分類資訊
CompletableFuture<CategoryDTO> categoryFuture = skuFuture.thenApplyAsync(sku -> {
CategoryDTO categoryDTO = categoryService.getCategoryDetail(sku.getCategoryId());
skuInfo.setCategory(categoryDTO);
return categoryDTO;
}, fixedThreadPool);
try {
// 最後等待所有異步任務執行完成傳回封裝結果
CompletableFuture.allOf(skuFuture, spuFuture, brandFuture, categoryFuture).get();
} catch (Exception e) {
log.error("<=======等候所有任務執行過程報錯:======>", e);
}
return skuInfo;
}
2.1 supplyAsync / runAsync
supplyAsync表示建立帶傳回值的異步任務的,相當于ExecutorService submit(Callable task) 方法,runAsync表示建立無傳回值的異步任務,相當于ExecutorService submit(Runnable task)方法,這兩方法的效果跟submit是一樣的,測試用例如下:
csharp複制代碼 /**
* 測試方法CompletableFuture.runAsync:無傳回值,
*/
private static void testRunAsync() {
CompletableFuture.runAsync(() ->{
System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
System.out.println("supplyAsync 是否為守護線程 " + Thread.currentThread().isDaemon());
int result = 10/2;
System.out.println("計算結果為:"+ result);
}, fixedThreadPool);
}
/**
* 測試方法CompletableFuture.supplyAsync:有傳回值
* @throws ExecutionException
* @throws InterruptedException
*/
private static void testSupplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
int result = 10 / 2;
return result;
}, fixedThreadPool);
Integer res = future.get();
System.out.println("傳回結果值為:"+res);
}
這兩方法各有一個重載版本,可以指定執行異步任務的Executor實作,如果不指定,預設使用ForkJoinPool.commonPool(),如果機器是單核的,則預設使用ThreadPerTaskExecutor,該類是一個内部類,每次執行execute都會建立一個新線程,具體可以看CompletableFuture源碼。
2.2 thenApply / thenApplyAsync
thenApply 表示某個任務執行完成後執行的動作,即回調方法,會将該任務的執行結果即方法傳回值作為入參傳遞到回調方法中,
csharp複制代碼 /**
* 線程串行化
* 1、thenRun:不能擷取上一步的執行結果
* 2、thenAcceptAsync:能接受上一步結果,但是無傳回值
* 3、thenApplyAsync:能接受上一步結果,有傳回值
*
*/
private static void testThenApplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運作結果:" + i);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}, executor);
CompletableFuture<String> future2 = future1.thenApplyAsync(res -> {
System.out.println("======任務2啟動了..." + res*20);
return "Hello" + res;
}, executor);
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
System.out.println("======任務3執行了");
}, executor);
CompletableFuture.allOf(future1, future2, future3).get();
System.out.println("=======測試結束");
}
thenApplyAsync與thenApply的差別在于,前者是将job2送出到線程池中異步執行,實際執行future2的線程可能是另外一個線程,後者是由執行future1的線程立即執行future2,即兩個future都是同一個線程執行的
2.3 exceptionally/whenComplete/handle
exceptionally方法指定某個任務執行異常時執行的回調方法,會将抛出異常作為參數傳遞到回調方法中,如果該任務正常執行則會exceptionally方法傳回的CompletionStage的result就是該任務正常執行的結果;whenComplete是當某個任務執行完成後執行的回調方法,會将執行結果或者執行期間抛出的異常傳遞給回調方法,如果是正常執行則異常為null,回調方法對應的CompletableFuture的result和該任務一緻,如果該任務正常執行,則get方法傳回執行結果,如果是執行異常,則get方法抛出異常
csharp複制代碼 /**
* 測試whenComplete和exceptionally: 異步方法執行完的處理
*/
private static void testWhenCompleteAndExceptionally() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
Integer num = 10;
int i = num / 2;
String s = String.valueOf(null);
System.out.println("運作結果:" + i);
return i;
}, executor).whenComplete((res,exception) -> {
//雖然能得到異常資訊,但是沒法修改傳回資料
System.out.println("<=====異步任務成功完成了=====結果是:" + res + "=======異常是:" + exception);
}).exceptionally(throwable -> {
//可以感覺異常,同時傳回預設值
System.out.println("<=====異步任務成功發生異常了======"+throwable);
return 10;
});
Integer result = future.get();
System.out.println("<=====最終傳回結果result=" + result + "======>");
}
/**
* 測試handle方法:它是whenComplete和exceptionally的結合
*/
private static void testHandle() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運作結果:" + i);
return i;
}, executor).handle((result,thr) -> {
if (result != null) {
return result * 2;
}
if (thr != null) {
System.out.println("異步任務成功完成了...結果是:" + result + "異常是:" + thr);
return 0;
}
return 0;
});
}
2.4 組合處理 thenCombine / thenAcceptBoth / runAfterBoth
這三個方法都是将兩個CompletableFuture組合起來,隻有這兩個都正常執行完了才會執行某個任務,差別在于,thenCombine會将兩個任務的執行結果作為方法入參傳遞到指定方法中,且該方法有傳回值;thenAcceptBoth同樣将兩個任務的執行結果作為方法入參,但是無傳回值;runAfterBoth沒有入參,也沒有傳回值。注意兩個任務中隻要有一個執行異常,則将該異常資訊作為指定任務的執行結果
csharp複制代碼 private static void thenCombine() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello1", fixedThreadPool);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello2", fixedThreadPool);
CompletableFuture<String> result = future1.thenCombine(future2, (t, u) -> t+" "+u);
System.out.println(result.get());
}
private static void thenAcceptBoth() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
},fixedThreadPool);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
},fixedThreadPool);
}
2.5 applyToEither / acceptEither / runAfterEither
這三個方法都是将兩個CompletableFuture組合起來,隻要其中一個執行完了就會執行某個任務,其差別在于applyToEither會将已經執行完成的任務的執行結果作為方法入參,并有傳回值;acceptEither同樣将已經執行完成的任務的執行結果作為方法入參,但是沒有傳回值;runAfterEither沒有方法入參,也沒有傳回值。注意兩個任務中隻要有一個執行異常,則将該異常資訊作為指定任務的執行結果
csharp複制代碼 private static void applyToEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
},fixedThreadPool);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
},fixedThreadPool);
CompletableFuture<Integer> result = f1.applyToEither(f2, t -> {
System.out.println("applyEither:"+t);
return t * 2;
});
}
private static void acceptEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
},fixedThreadPool);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
},fixedThreadPool);
CompletableFuture<Void> result = f1.acceptEither(f2, t -> {
System.out.println("acceptEither:"+t);
});
}
2.6 allOf / anyOf
allOf傳回的CompletableFuture是多個任務都執行完成後才會執行,隻有有一個任務執行異常,則傳回的CompletableFuture執行get方法時會抛出異常,如果都是正常執行,則get傳回null。
csharp複制代碼 private static void testThenApplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運作結果:" + i);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}, executor);
CompletableFuture<String> future2 = future1.thenApplyAsync(res -> {
System.out.println("======任務2啟動了..." + res*20);
return "Hello" + res;
}, executor);
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
System.out.println("======任務3執行了");
}, executor);
CompletableFuture.allOf(future1, future2, future3).get();
System.out.println("=======測試結束");
}
注意,使用CompletableFuture可能有某些異步任務不執行,示例如下:
csharp複制代碼 private static void testNotExecute() {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("<======目前線程:" + Thread.currentThread().getName() + "=====線程id: " + Thread.currentThread().getId());
System.out.println("supplyAsync 是否為守護線程 " + Thread.currentThread().isDaemon());
int i = 10 / 2;
System.out.println("運作結果:" + i);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 下面不列印
System.out.println("return之前的列印");
return i;
});
}
造成這個原因是因為Daemon。因為completableFuture這套使用異步任務的操作都是建立成了守護線程。那麼我們沒有調用get方法不阻塞這個主線程的時候。主線程執行完畢。所有線程執行完畢就會導緻一個問題,就是守護線程退出。那麼我們沒有執行的代碼就是因為主線程不再跑任務而關閉導緻的。
3.CompletableFuture的實作原理
從CompletableFuture源碼可知,CompletableFuture中包含兩個字段:result和stack。result用于存儲目前CF的結果,stack(Completion)表示目前CF完成後需要觸發的依賴動作(Dependency Actions),去觸發依賴它的CF的計算,依賴動作可以有多個(表示有多個依賴它的CF),以棧(Treiber stack)的形式存儲,stack表示棧頂元素。
作者:shepherd111
連結:https://juejin.cn/post/7242113868553306172