天天看點

Reactor — 基于異步資料流實作Reactive響應式程式設計

1、概述

Reactor 是Reactive Programming規範的一個具體實作(rxjava也是規範的一個實作),可以概括為:響應式程式設計是一種涉及資料流和變化傳播的異步程式設計範例。這意味着可以通過所采用的程式設計語言輕松地表達靜态(例如陣列)或動态(例如事件發射器)資料流。

2、JDK8 Stream

2.1 jdk8 Stream

jdk stream總綱:

Reactor — 基于異步資料流實作Reactive響應式程式設計

Java 8 Stream 旨在有效地處理資料流(包括原始類型),這些資料流可以在沒有延遲或很少延遲的情況下通路。它是基于拉的,隻能使用一次,缺少與時間相關的操作,并且可以執行并行計算,但無法指定要使用的線程池。但是它還沒有設計用于處理延遲操作,例如I / O操作。其所不支援的特性就是Reactor或RxJava等Reactive API的用武之地。

2.2 異步程式設計模型

Reactor 或 Rxjava等反應性API也提供Java 8 Stream等運算符,但它們更适用于任何流序列(不僅僅是集合),并允許定義一個轉換操作的管道,該管道将應用于通過它的資料,這要歸功于友善的流暢API和使用lambdas。它們旨在處理同步或異步操作,并允許您緩沖,合并,連接配接或對資料應用各種轉換。

Java提供了兩種異步程式設計模型:

  • CallBacks:異步方法沒有傳回值,但需要額外的回調參數(lambda或匿名類),在結果可用時調用它們。
  • Futures:異步方法立即傳回Future 。異步線程計算任務結果,但Future對象包裝對它的通路。該值不會立即可用,并且可以輪詢對象,直到該值可用。例如,運作Callable 任務的ExecutorService使用Future對象。

但是上面兩種方法都有局限性。首先多個callback難以組合在一起,很快導緻代碼難以閱讀以及難以維護。

2.3 CallBacks異步模型

我們可以看下一個例子:在使用者的UI上展示使用者喜歡的top 5個商品的詳細資訊,如果不存在的話則調用推薦服務擷取5個;這個功能的實作需要三個服務支援:一個是擷取使用者喜歡的商品的ID的接口(userService.getFavorites),第二個是擷取商品詳情資訊接口(favoriteService.getDetails),第三個是推薦商品與商品詳情的服務(suggestionService.getSuggestions),基于callback模式實作上面功能代碼如下:

userService.getFavorites(userId, new Callback<List<String>>() { // 1 
  public void onSuccess(List<String> list) { // 2
    if (list.isEmpty()) { // 3 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) {  // 4 
          UiUtils.submitOnUiThread(() -> {  // 5 
            list.stream()
                .limit(5)
                .forEach(uiList::show);   // 6
            });
        }
        public void onError(Throwable error) {   // 7 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream()    // 8 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId,   // 9
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }
              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }
  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
           
  • 以上三個服務接口都是基于callback的,當異步任務執行完畢後,如果結果正常則會調用callback的onSuccess方法,如果結果異常則會調用onError方法。
  • 代碼1中我們調用了userService.getFavorites接口來擷取使用者userId的推薦商品id清單,如果擷取結果正常則會調用代碼2,如果失敗則會調用代碼7,通知使用者UI錯誤資訊。
  • 如果正常則會執行代碼3判斷推薦商品id清單是否為空,如果是的話則執行代碼4調用推薦商品與商品詳情的服務(suggestionService.getSuggestions),如果擷取商品詳情失敗則執行代碼7callback的OnError把錯誤資訊顯示到使用者UI,否則如果成功則執行代碼5切換線程到UI線程,在擷取的商品詳情清單上施加jdk8 stream運算使用limit擷取5個元素,然後顯示到UI上。
  • 代碼3如果判斷使用者推薦商品id清單不為空則執行代碼8,在商品id清單上使用JDK8 stream擷取流,然後使用limit擷取5個元素,然後執行代碼9調用favoriteService.getDetails服務擷取具體商品的詳情,這裡多個id擷取詳情是并發進行的,當擷取到詳情成功後會執行代碼10在UI線程上繪制出商品詳情資訊,如果失敗則執行代碼11顯示錯誤。
2.4 Reactor改造callback模型

如上為了實作該功能,我們寫了很多代碼,使用了大量callback,這些代碼比較晦澀難懂,并且存在代碼重複,下面我們使用Reactor來實作等價的功能:

userService.getFavorites(userId)   // 1
           .flatMap(favoriteService::getDetails)  // 2
           .switchIfEmpty(suggestionService.getSuggestions())  // 3 
           .take(5)  // 4 
           .publishOn(UiUtils.uiThreadScheduler())   // 5
           .subscribe(uiList::show, UiUtils::errorPopup);   // 6
           
  • 代碼1調用getFavorites服務擷取userId對應的商品清單,該方法會馬上傳回一個流對象,然後代碼2在流上施加flatMap運算把每個商品id轉換為商品Id對應的商品詳情資訊(通過調用服務favoriteService::getDetails),由于方法getDetails是異步的,是以flatmap實際上實作了同步轉異步,然後把所有商品詳情資訊組成新的流傳回。
  • 代碼3判斷如果傳回的流中沒有元素則調用suggestionService.getSuggestions()服務擷取推薦的商品詳情清單,代碼4則從代碼2或者代碼3傳回的流中擷取5個元素(5個商品詳細資訊),然後執行代碼5 publishOn把目前線程切換到UI排程器來執行,代碼6則通過subscribe方法激活整個流處理鍊,然後在UI線程上繪制商品詳情清單或者顯示錯誤。

另外如果你想在少于800毫秒的時間内檢索到喜歡的ID,或者如果花費更長的時間從緩存中擷取它們,在Reactor中,可以添加具有逾時和回退的Reactor代碼例子:

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) //如果沒有在800ms内響應,就抛出錯誤。
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) //如果發生錯誤,退回到cacheService
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
           

如上代碼可知基于reactor編寫的代碼邏輯屬于聲明式程式設計,比較通俗易懂,代碼量也比較少,不含有重複的代碼。

2.5 Futures異步模型

future模型相比callback模型要好一些,但盡管CompletableFuture在Java 8上進行了改進,但它們仍然表現不佳。Future通過調用get()方法很容易導緻對象的另一種阻塞情況,另外Future不支援惰性計算,并且缺乏對多個值和進階錯誤處理的支援。

例如下面例子:首先我們擷取一個id清單,然後根據id分别擷取對應的name和統計資料,然後組合每個id對應的name和統計資料為一個新的資料,最後輸出所有組合對的值,下面我們使用CompletableFuture來實作這個功能,以便保證整個過程是異步的,并且每個id對應的處理是并發的:

CompletableFuture<List<String>> ids = ifhIds(); // 1

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {  // 2 
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> {   // 3
				CompletableFuture<String> nameTask = ifhName(i);  // 3.1 
				CompletableFuture<Integer> statTask = ifhStat(i);  //  3.2
				return nameTask.
					thenCombineAsync(statTask, 
						(name, stat) -> "Name " + name + " has stats " + stat);  // 3.3
			});
	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());  // 4
	CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]); //5

	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); // 6
	return allDone.thenApply(v -> combinationList.stream() // 7
			.map(CompletableFuture::join)  
			.collect(Collectors.toList()));
});

List<String> results = result.join();  // 8
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
           
  • 如上代碼1我們調用ifhIds方法異步傳回了一個CompletableFuture對象,其内部儲存了id清單。
  • 代碼2調用ids的thenComposeAsync方法傳回一個新的CompletableFuture對象,新CompletableFuture對象的資料是代碼2中的lambda表達式執行結果,表達式内代碼3是擷取id清單的流對象,然後使用map操作把id元素轉換為name與統計資訊拼接的字元串,這裡是通過代碼3.1根據id擷取name對應的CompletableFuture對象,代碼3.2擷取統計資訊對應的CompletableFuture,然後使用代碼3.3把兩個CompletableFuture對象進行合并做到的。
  • 代碼3會傳回一個流對象,其中元素是所有id對應的name與統計資訊組合後的結果,然後代碼4把流中元素收集儲存到了combinationList清單裡面。代碼5把清單轉換為了數組,這是因為代碼2的allOf操作符的參數必須為數組。
  • 代碼6把combinationList清單中的所有CompletableFuture對象轉換為了一個allDone(等所有CompletableFuture對象的任務執行完畢),到這裡我們調用allDone的get()方法就可以等待所有異步處理執行完畢,但是我們目的是想擷取到所有異步任務的執行結果,是以代碼7在allDone上施加了thenApply運算,意在等所有任務處理完畢後調用所有CompletableFuture的join方法擷取每個任務的執行結果,然後收集為清單後傳回一個新的CompletableFuture對象,然後代碼8在新的CompletableFuture上調用join方法擷取所有執行結果清單。
2.6 Reactor改造future模型

由于Reactor提供了更多的組合運算符,是以可以簡化此過程,如下所示:

Flux<String> ids = ifhrIds();  // 1

Flux<String> combinations =
		ids.flatMap(id -> {   // 2
			Mono<String> nameTask = ifhrName(id);   // 2.1
			Mono<Integer> statTask = ifhrStat(id);   // 2.2 

			return nameTask.zipWith(statTask, 
					(name, stat) -> "Name " + name + " has stats " + stat);  // 3
		});

Mono<List<String>> result = combinations.collectList();   // 4

List<String> results = result.block();  //  5
assertThat(results).containsExactly( 
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
           
  • 代碼1 我們調用ifhIds方法異步傳回了一個Flux對象,其内部儲存了id清單
  • 代碼2 調用ids的flatMap方法對其中元素進行轉換,代碼2.1根據id擷取name資訊(傳回流對象Mono),代碼2.2 根據id擷取統計資訊(傳回流對象Mono),代碼3 結合兩個流為新的流元素。
  • 代碼4 調用新流的collectList方法把所有的流對象轉換為清單,然後傳回一個新的Mono流對象。
  • 代碼5 則調用新的Mono流對象的block方法阻塞擷取所有執行結果。

如上代碼使用reactor方式編寫的代碼相比使用CompletableFuture實作相同功能來說,更簡潔,更通俗易懂。

3、Reactor響應式程式設計總結

諸如Reactor之類的stream庫旨在解決JVM上“經典”異步方法的這些缺點,同時還關注一些其他方面:

  • 可組合性和可讀性
  • 資料作為一個用豐富的運算符詞彙表操縱的流程
  • 在您訂閱之前沒有任何事情發生
  • 背壓或消費者向生産者發出信号回報發出信号過快的能力
  • 進階但高價值的抽象,與并發無關

可組合性,指的是編排多個異步任務的能力,使用先前任務的結果作為後續任務的輸入或以fork-join方式執行多個任務。

編排任務的能力與代碼的可讀性和可維護性緊密相關。随着異步過程層數量和複雜性的增加,能夠編寫和讀取代碼變得越來越困難。正如我們所看到的,callback模型很簡單,但其主要缺點之一是,對于複雜的處理,您需要從回調執行回調,本身嵌套在另一個回調中,依此類推。那個混亂被稱為Callback Hell,正如你可以猜到的(或者從經驗中得知),這樣的代碼很難維護;

引用: https://projectreactor.io/docs/core/release/reference/index.html#intro-reactive