天天看點

completable java_java8 之CompletableFuture -- 如何建構異步應用

什麼是Future 接口

很多場景下,我們想去擷取線程運作的結果,而通常使用execute方法去送出任務是無法獲得結果的,這時候我們常常會改用submit方法去送出,以便獲得線程運作的結果。

而submit方法傳回的就是Future,一個未來對象。 使用future.get() 方法去擷取線程執行結果,包括如果出現異常,也會随get方法抛出。

Future 接口的缺陷

當我們使用future.get()方法去取得線程執行結果時,要知道get方法是阻塞的,也就是說為了拿到結果,當主線程執行到get()方法,目前線程會去等待異步任務執行完成,

換言之,異步的效果在我們使用get()拿結果時,會變得無效。示例如下

public static void main(String[] args) throwsException{

ExecutorService executorService=Executors.newSingleThreadExecutor();

Future future= executorService.submit(()->{try{

Thread.sleep(3000);

}catch(InterruptedException e) {

e.printStackTrace();

}

System.out.println("異步任務執行了");

});

future.get();

System.out.println("主線任務執行了");

}

列印結果是:異步任務執行了過後主線任務才執行。  就是因為get()在一直等待。

那麼如何解決我想要拿到結果,可以對結果進行處理,又不想被阻塞呢?

CompletableFuture 使一切變得可能

JDK1.8才新加入的一個實作類CompletableFuture,實作了Future, CompletionStage兩個接口。

實際開發中,我們常常面對如下的幾種場景:

1.  針對Future的完成事件,不想簡單的阻塞等待,在這段時間内,我們希望可以正常繼續往下執行,是以在它完成時,我們可以收到回調即可。

2. 面對Future集合來講,這其中的每個Future結果其實很難去描述它們之間的依賴關系,而往往我們希望等待所有的Future集合都完成,然後做一些事情。

3. 在異步計算中,兩個計算任務互相獨立,但是任務二又依賴于任務一的結果。

如上的幾種場景,單靠Future是解決不了的,而CompletableFuture則可以幫我們實作。

CompletableFuture 常見api 介紹

1、 runAsync 和 supplyAsync方法

它提供了四個方法來建立一個異步任務

public static CompletableFuturerunAsync(Runnable runnable)public static CompletableFuturerunAsync(Runnable runnable, Executor executor)public static CompletableFuture supplyAsync(Suppliersupplier)public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

runAsync類似于execute方法,不支援傳回值,而supplyAsync方法類似submit方法,支援傳回值。也是我們的重點方法。

沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執行異步代碼。

示例

//無傳回值

CompletableFuture future1 = CompletableFuture.runAsync(() ->{

System.out.println("runAsync無傳回值");

});

future1.get();//有傳回值

CompletableFuture future2 = CompletableFuture.supplyAsync(() ->{

System.out.println("supplyAsync有傳回值");return "111";

});

String s= future2.get();

2、 異步任務執行完時的回調方法  whenComplete 和 exceptionally

當CompletableFuture的計算結果完成,或者抛出異常的時候,可以執行特定的任務

public CompletableFuture whenComplete(BiConsumer super T,? super Throwable>action)public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable>action)public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable>action, Executor executor)public CompletableFuture exceptionally(Function fn)

這些方法都是上述建立的異步任務完成後 (也可能是抛出異常後結束) 所執行的方法。

whenComplete和whenCompleteAsync方法的差別在于:前者是由上面的線程繼續執行,而後者是将whenCompleteAsync的任務繼續交給線程池去做決定。

exceptionally則是上面的任務執行抛出異常後,所要執行的方法。

示例

CompletableFuture.supplyAsync(()->{int a = 10/0;return 1;

}).whenComplete((r, e)->{

System.out.println(r);

}).exceptionally(e->{

System.out.println(e);return 2;

});

值得注意的是:哪怕supplyAsync抛出了異常,whenComplete也會執行,意思就是,隻要supplyAsync執行結束,它就會執行,不管是不是正常執行完。exceptionally隻有在異常的時候才會執行。

其實,在whenComplete的參數内 e就代表異常了,判斷它是否為null,就可以判斷是否有異常,隻不過這樣的做法,我們不提倡。

whenComplete和exceptionally這兩個,誰在前,誰先執行。

此類的回調方法,哪怕主線程已經執行結束,已經跳出外圍的方法體,然後回調方法依然可以繼續等待異步任務執行完成再觸發,絲毫不受外部影響。

3、 thenApply 和 handle 方法

如果兩個任務之間有依賴關系,比如B任務依賴于A任務的執行結果,那麼就可以使用這兩個方法

public CompletableFuture thenApply(Function super T,? extends U>fn)public CompletableFuture thenApplyAsync(Function super T,? extends U>fn)public CompletableFuture thenApplyAsync(Function super T,? extends U>fn, Executor executor)public CompletionStage handle(BiFunction super T, Throwable, ? extends U>fn);public CompletionStage handleAsync(BiFunction super T, Throwable, ? extends U>fn);public CompletionStage handleAsync(BiFunction super T, Throwable, ? extends U> fn,Executor executor);

這兩個方法,效果是一樣的,差別在于,當A任務執行出現異常時,thenApply方法不會執行,而handle 方法一樣會去執行,因為在handle方法裡,我們可以處理異常,而前者不行。

示例

CompletableFuture.supplyAsync(()->{return 5;

}).thenApply((r)->{

r= r + 1;returnr;

});//出現了異常,handle方法可以拿到異常 e

CompletableFuture.supplyAsync(()->{int i = 10/0;return 5;

}).handle((r, e)->{

System.out.println(e);

r= r + 1;returnr;

});

這裡延伸兩個方法  thenAccept 和 thenRun。其實 和上面兩個方法差不多,都是等待前面一個任務執行完 再執行。差別就在于thenAccept接收前面任務的結果,且無需return。而thenRun隻要前面的任務執行完成,它就執行,不關心前面的執行結果如何

如果前面的任務抛了異常,非正常結束,這兩個方法是不會執行的,是以處理不了異常情況。

4、 合并操作方法  thenCombine 和 thenAcceptBoth

我們常常需要合并兩個任務的結果,在對其進行統一處理,簡言之,這裡的回調任務需要等待兩個任務都完成後再會觸發。

public CompletionStage thenCombine(CompletionStage extends U> other,BiFunction super T,? super U,? extends V>fn);public CompletionStage thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V>fn);public CompletionStage thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V>fn,Executor executor);public CompletionStage thenAcceptBoth(CompletionStage extends U> other,BiConsumer super T, ? super U>action);public CompletionStage thenAcceptBothAsync(CompletionStage extends U> other,BiConsumer super T, ? super U>action);public CompletionStage thenAcceptBothAsync(CompletionStage extends U> other,BiConsumer super T, ? super U> action, Executor executor);

這兩者的差別 在于 前者是有傳回值的,後者沒有(就是個消耗工作)

示例

private static void thenCombine() throwsException {

CompletableFuture future1 = CompletableFuture.supplyAsync(()->{try{

Thread.sleep(4000);

}catch(InterruptedException e) {

e.printStackTrace();

}return "future1";

});

CompletableFuture future2 = CompletableFuture.supplyAsync(()->{return "future2";

});

CompletableFuture result = future1.thenCombine(future2, (r1, r2)->{return r1 +r2;

});//這裡的get是阻塞的,需要等上面兩個任務都完成

System.out.println(result.get());

}

private static void thenAcceptBoth() throwsException {

CompletableFuture future1 = CompletableFuture.supplyAsync(()->{try{

Thread.sleep(4000);

}catch(InterruptedException e) {

e.printStackTrace();

}return "future1";

});

CompletableFuture future2 = CompletableFuture.supplyAsync(()->{return "future2";

});//值得注意的是,這裡是不阻塞的

future1.thenAcceptBoth(future2, (r1, r2)->{

System.out.println(r1+r2);

});

System.out.println("繼續往下執行");

}

這兩個方法 都不會形成阻塞。就是個回調方法。隻有get()才會阻塞。

4、 allOf (重點,個人覺得用的場景很多)

很多時候,不止存在兩個異步任務,可能有幾十上百個。我們需要等這些任務都完成後,再來執行相應的操作。那怎麼集中監聽所有任務執行結束與否呢? allOf方法可以幫我們完成。

public static CompletableFuture allOf(CompletableFuture>... cfs);

它接收一個可變入參,既可以接收CompletableFuture單個對象,可以接收其數組對象。

結合例子說明其作用。

public static void main(String[] args) throwsException{long start =System.currentTimeMillis();

CompletableFutureTest test= newCompletableFutureTest();//結果集

List list = new ArrayList<>();

List taskList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);//全流式處理轉換成CompletableFuture[]

CompletableFuture[] cfs =taskList.stream()

.map(integer-> CompletableFuture.supplyAsync(() ->test.calc(integer))

.thenApply(h->Integer.toString(h))

.whenComplete((s, e)->{

System.out.println("任務"+s+"完成!result="+s+",異常 e="+e+","+newDate());

list.add(s);

})

).toArray(CompletableFuture[]::new);

CompletableFuture.allOf(cfs).join();

System.out.println("list="+list+",耗時="+(System.currentTimeMillis()-start));

}public intcalc(Integer i) {try{if (i == 1) {

Thread.sleep(3000);//任務1耗時3秒

} else if (i == 5) {

Thread.sleep(5000);//任務5耗時5秒

} else{

Thread.sleep(1000);//其它任務耗時1秒

}

}catch(InterruptedException e) {

e.printStackTrace();

}returni;

}

全流式寫法,綜合了以上的一些方法,使用allOf集中阻塞,等待所有任務執行完成,取得結果集list。   這裡有些CountDownLatch的感覺。

CompletableFuture 總結

completable java_java8 之CompletableFuture -- 如何建構異步應用

圖檔出自

本文隻是簡述了CompletableFuture的常用用法。日常開發基本夠用,但是針對一些特殊場景,例如異常場景,取消場景,仍待研究。