1、概述
Reactor 是Reactive Programming規範的一個具體實作(rxjava也是規範的一個實作),可以概括為:響應式程式設計是一種涉及資料流和變化傳播的異步程式設計範例。這意味着可以通過所采用的程式設計語言輕松地表達靜态(例如陣列)或動态(例如事件發射器)資料流。
2、JDK8 Stream
2.1 jdk8 Stream
jdk stream總綱:

Java 8 Stream 旨在有效地處理資料流(包括原始類型),這些資料流可以在沒有延遲或很少延遲的情況下通路。它是基于拉的,隻能使用一次,缺少與時間相關的操作,并且可以執行并行計算,但無法指定要使用的線程池。但是它還沒有設計用于處理延遲操作,例如I / O操作。其所不支援的特性就是Reactor或RxJava等Reactive API的用武之地。
2.2 異步程式設計模型
Reactor 或 Rxjava等反應性API也提供Java 8 Stream等運算符,但它們更适用于任何流序列(不僅僅是集合),并允許定義一個轉換操作的管道,該管道将應用于通過它的資料,這要歸功于友善的流暢API和使用lambdas。它們旨在處理同步或異步操作,并允許您緩沖,合并,連接配接或對資料應用各種轉換。
Java提供了兩種異步程式設計模型:
- CallBacks:異步方法沒有傳回值,但需要額外的回調參數(lambda或匿名類),在結果可用時調用它們。
- Futures:異步方法立即傳回Future 。異步線程計算任務結果,但Future對象包裝對它的通路。該值不會立即可用,并且可以輪詢對象,直到該值可用。例如,運作Callable 任務的ExecutorService使用Future對象。
但是上面兩種方法都有局限性。首先多個callback難以組合在一起,很快導緻代碼難以閱讀以及難以維護。
2.3 CallBacks異步模型
我們可以看下一個例子:在使用者的UI上展示使用者喜歡的top 5個商品的詳細資訊,如果不存在的話則調用推薦服務擷取5個;這個功能的實作需要三個服務支援:一個是擷取使用者喜歡的商品的ID的接口(userService.getFavorites),第二個是擷取商品詳情資訊接口(favoriteService.getDetails),第三個是推薦商品與商品詳情的服務(suggestionService.getSuggestions),基于callback模式實作上面功能代碼如下:
userService.getFavorites(userId, new Callback<List<String>>() { // 1
public void onSuccess(List<String> list) { // 2
if (list.isEmpty()) { // 3
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) { // 4
UiUtils.submitOnUiThread(() -> { // 5
list.stream()
.limit(5)
.forEach(uiList::show); // 6
});
}
public void onError(Throwable error) { // 7
UiUtils.errorPopup(error);
}
});
} else {
list.stream() // 8
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId, // 9
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
- 以上三個服務接口都是基于callback的,當異步任務執行完畢後,如果結果正常則會調用callback的onSuccess方法,如果結果異常則會調用onError方法。
- 代碼1中我們調用了userService.getFavorites接口來擷取使用者userId的推薦商品id清單,如果擷取結果正常則會調用代碼2,如果失敗則會調用代碼7,通知使用者UI錯誤資訊。
- 如果正常則會執行代碼3判斷推薦商品id清單是否為空,如果是的話則執行代碼4調用推薦商品與商品詳情的服務(suggestionService.getSuggestions),如果擷取商品詳情失敗則執行代碼7callback的OnError把錯誤資訊顯示到使用者UI,否則如果成功則執行代碼5切換線程到UI線程,在擷取的商品詳情清單上施加jdk8 stream運算使用limit擷取5個元素,然後顯示到UI上。
- 代碼3如果判斷使用者推薦商品id清單不為空則執行代碼8,在商品id清單上使用JDK8 stream擷取流,然後使用limit擷取5個元素,然後執行代碼9調用favoriteService.getDetails服務擷取具體商品的詳情,這裡多個id擷取詳情是并發進行的,當擷取到詳情成功後會執行代碼10在UI線程上繪制出商品詳情資訊,如果失敗則執行代碼11顯示錯誤。
2.4 Reactor改造callback模型
如上為了實作該功能,我們寫了很多代碼,使用了大量callback,這些代碼比較晦澀難懂,并且存在代碼重複,下面我們使用Reactor來實作等價的功能:
userService.getFavorites(userId) // 1
.flatMap(favoriteService::getDetails) // 2
.switchIfEmpty(suggestionService.getSuggestions()) // 3
.take(5) // 4
.publishOn(UiUtils.uiThreadScheduler()) // 5
.subscribe(uiList::show, UiUtils::errorPopup); // 6
- 代碼1調用getFavorites服務擷取userId對應的商品清單,該方法會馬上傳回一個流對象,然後代碼2在流上施加flatMap運算把每個商品id轉換為商品Id對應的商品詳情資訊(通過調用服務favoriteService::getDetails),由于方法getDetails是異步的,是以flatmap實際上實作了同步轉異步,然後把所有商品詳情資訊組成新的流傳回。
- 代碼3判斷如果傳回的流中沒有元素則調用suggestionService.getSuggestions()服務擷取推薦的商品詳情清單,代碼4則從代碼2或者代碼3傳回的流中擷取5個元素(5個商品詳細資訊),然後執行代碼5 publishOn把目前線程切換到UI排程器來執行,代碼6則通過subscribe方法激活整個流處理鍊,然後在UI線程上繪制商品詳情清單或者顯示錯誤。
另外如果你想在少于800毫秒的時間内檢索到喜歡的ID,或者如果花費更長的時間從緩存中擷取它們,在Reactor中,可以添加具有逾時和回退的Reactor代碼例子:
userService.getFavorites(userId)
.timeout(Duration.ofMillis(800)) //如果沒有在800ms内響應,就抛出錯誤。
.onErrorResume(cacheService.cachedFavoritesFor(userId)) //如果發生錯誤,退回到cacheService
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
如上代碼可知基于reactor編寫的代碼邏輯屬于聲明式程式設計,比較通俗易懂,代碼量也比較少,不含有重複的代碼。
2.5 Futures異步模型
future模型相比callback模型要好一些,但盡管CompletableFuture在Java 8上進行了改進,但它們仍然表現不佳。Future通過調用get()方法很容易導緻對象的另一種阻塞情況,另外Future不支援惰性計算,并且缺乏對多個值和進階錯誤處理的支援。
例如下面例子:首先我們擷取一個id清單,然後根據id分别擷取對應的name和統計資料,然後組合每個id對應的name和統計資料為一個新的資料,最後輸出所有組合對的值,下面我們使用CompletableFuture來實作這個功能,以便保證整個過程是異步的,并且每個id對應的處理是并發的:
CompletableFuture<List<String>> ids = ifhIds(); // 1
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { // 2
Stream<CompletableFuture<String>> zip =
l.stream().map(i -> { // 3
CompletableFuture<String> nameTask = ifhName(i); // 3.1
CompletableFuture<Integer> statTask = ifhStat(i); // 3.2
return nameTask.
thenCombineAsync(statTask,
(name, stat) -> "Name " + name + " has stats " + stat); // 3.3
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); // 4
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]); //5
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); // 6
return allDone.thenApply(v -> combinationList.stream() // 7
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
List<String> results = result.join(); // 8
assertThat(results).contains(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
- 如上代碼1我們調用ifhIds方法異步傳回了一個CompletableFuture對象,其内部儲存了id清單。
- 代碼2調用ids的thenComposeAsync方法傳回一個新的CompletableFuture對象,新CompletableFuture對象的資料是代碼2中的lambda表達式執行結果,表達式内代碼3是擷取id清單的流對象,然後使用map操作把id元素轉換為name與統計資訊拼接的字元串,這裡是通過代碼3.1根據id擷取name對應的CompletableFuture對象,代碼3.2擷取統計資訊對應的CompletableFuture,然後使用代碼3.3把兩個CompletableFuture對象進行合并做到的。
- 代碼3會傳回一個流對象,其中元素是所有id對應的name與統計資訊組合後的結果,然後代碼4把流中元素收集儲存到了combinationList清單裡面。代碼5把清單轉換為了數組,這是因為代碼2的allOf操作符的參數必須為數組。
- 代碼6把combinationList清單中的所有CompletableFuture對象轉換為了一個allDone(等所有CompletableFuture對象的任務執行完畢),到這裡我們調用allDone的get()方法就可以等待所有異步處理執行完畢,但是我們目的是想擷取到所有異步任務的執行結果,是以代碼7在allDone上施加了thenApply運算,意在等所有任務處理完畢後調用所有CompletableFuture的join方法擷取每個任務的執行結果,然後收集為清單後傳回一個新的CompletableFuture對象,然後代碼8在新的CompletableFuture上調用join方法擷取所有執行結果清單。
2.6 Reactor改造future模型
由于Reactor提供了更多的組合運算符,是以可以簡化此過程,如下所示:
Flux<String> ids = ifhrIds(); // 1
Flux<String> combinations =
ids.flatMap(id -> { // 2
Mono<String> nameTask = ifhrName(id); // 2.1
Mono<Integer> statTask = ifhrStat(id); // 2.2
return nameTask.zipWith(statTask,
(name, stat) -> "Name " + name + " has stats " + stat); // 3
});
Mono<List<String>> result = combinations.collectList(); // 4
List<String> results = result.block(); // 5
assertThat(results).containsExactly(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
- 代碼1 我們調用ifhIds方法異步傳回了一個Flux對象,其内部儲存了id清單
- 代碼2 調用ids的flatMap方法對其中元素進行轉換,代碼2.1根據id擷取name資訊(傳回流對象Mono),代碼2.2 根據id擷取統計資訊(傳回流對象Mono),代碼3 結合兩個流為新的流元素。
- 代碼4 調用新流的collectList方法把所有的流對象轉換為清單,然後傳回一個新的Mono流對象。
- 代碼5 則調用新的Mono流對象的block方法阻塞擷取所有執行結果。
如上代碼使用reactor方式編寫的代碼相比使用CompletableFuture實作相同功能來說,更簡潔,更通俗易懂。
3、Reactor響應式程式設計總結
諸如Reactor之類的stream庫旨在解決JVM上“經典”異步方法的這些缺點,同時還關注一些其他方面:
- 可組合性和可讀性
- 資料作為一個用豐富的運算符詞彙表操縱的流程
- 在您訂閱之前沒有任何事情發生
- 背壓或消費者向生産者發出信号回報發出信号過快的能力
- 進階但高價值的抽象,與并發無關
可組合性,指的是編排多個異步任務的能力,使用先前任務的結果作為後續任務的輸入或以fork-join方式執行多個任務。
編排任務的能力與代碼的可讀性和可維護性緊密相關。随着異步過程層數量和複雜性的增加,能夠編寫和讀取代碼變得越來越困難。正如我們所看到的,callback模型很簡單,但其主要缺點之一是,對于複雜的處理,您需要從回調執行回調,本身嵌套在另一個回調中,依此類推。那個混亂被稱為Callback Hell,正如你可以猜到的(或者從經驗中得知),這樣的代碼很難維護;
引用: https://projectreactor.io/docs/core/release/reference/index.html#intro-reactive