本系列其他文章見:《響應式Spring的道法術器》。
前情提要: 什麼是響應式程式設計
1.2 響應式流
上一節留了一個坑——為啥不用Java Stream來進行資料流的操作? 原因在于,若将其用于響應式程式設計中,是有局限性的。比如如下兩個需要面對的問題:
- Web 應用具有I/O密集的特點,I/O阻塞會帶來比較大的性能損失或資源浪費,我們需要一種異步非阻塞的響應式的庫,而Java Stream是一種同步API。
- 假設我們要搭建從資料層到前端的一個變化傳遞管道,可能會遇到資料層每秒上千次的資料更新,而顯然不需要向前端傳遞每一次更新,這時候就需要一種流量控制能力,就像我們家裡的水龍頭,可以控制開關流速,而Java Stream不具備完善的對資料流的流量控制的能力。
具備“異步非阻塞”特性和“流量控制”能力的資料流,我們稱之為響應式流(Reactive Stream)。
目前有幾個實作了響應式流規範的Java庫,這裡簡單介紹兩個:RxJava和Reactor。
要介紹RxJava,就不得不提ReactiveX(Reactive Extensions,Rx),它最初是LINQ的一個擴充,由微軟的架構師Erik Meijer上司的團隊開發,在2012年11月開源,Rx是一個程式設計模型,目标是提供一緻的程式設計接口,幫助開發者更友善的處理異步資料流,Rx庫支援.NET、JavaScript和C++,Rx近幾年越來越流行了,現在已經支援幾乎全部的流行程式設計語言了,包括RxJS、RxJava等。
後來,Java社群的一些大牛湊到一起制定了一個響應式流規範。RxJava團隊随後對1版本進行了重構,形成了相容該響應流規範的RxJava 2。
Reactor是Pivotal旗下的項目,與大名鼎鼎的Spring是兄弟關系,是以是Spring近期推出的響應式子產品WebFlux的“禦用”響應式流。Reactor支援響應式流規範,與RxJava相比,它沒有任何曆史包袱,專注于Server端的響應式開發,而RxJava更多傾向于Android端的響應式開發。
在Java 9版本中,響應式流的規範被納入到了JDK中,相應的API接口是java.util.concurrent.Flow。
Spring WebFlux也是本系列文章後邊的重點内容。由于WebFlux首選Reactor作為其響應式技術棧的一部分,我們下邊也主要以Reactor為主,目前的版本是Reactor3。
我們繼續回到主線,讨論“異步非阻塞”和“流量控制”。注意,本節請不必關注Reactor的代碼細節,僅體會使用響應式流的“感覺”就好。
1.2.1 異步非阻塞
在如今網際網路時代的大背景下,Web應用通常要面對高并發、海量資料的挑戰,性能從來都是必須要考量的核心因素。
阻塞便是性能殺手之一。
從調用者和服務提供者的角度來看,阻塞、非阻塞以及同步、異步可以這麼了解:舉個例子,老劉買了個洗衣機,當他啟動了洗衣機後如果一直在等待洗衣機工作結束好晾衣服,那他就是阻塞的;如果他啟動洗衣機之後就去看電視了,估摸快洗完了就時不時來看看,那他就是非阻塞的,因為老劉可以去做另一件事。但老劉不能知道這洗衣機啥時候洗完/是否洗完,那麼這台洗衣機就是同步方式工作的;老劉後來換了一台可以在洗完衣服播放音樂的洗衣機,這樣就不用時不時來看了,雖然啟動之後洗衣機不能立刻傳回給老劉幹淨的衣服,但是可以在工作完成之後通知在看電視的老劉,是以新的洗衣機就是異步工作的。
- 阻塞和非阻塞反映的是調用者的狀态,當調用者調用了服務提供者的方法後,如果一直在等待結果傳回,否則無法執行後續的操作,那就是阻塞狀态;如果調用之後直接傳回,進而可以繼續執行後續的操作,那可以了解為非阻塞的。
- 同步和異步反映的是服務提供者的能力,當調用者調用了服務提供者的方法後,如果服務提供者能夠立馬傳回,并在處理完成後通過某種方式通知到調用者,那可以了解為異步的;否則,如果隻是在處理完成後才傳回,或者需要調用者再去主動查詢處理是否完成,就可以了解為是同步的。
Http服務本質上是對資源的操作,尤其是RESTful興起之後,更是如此。所謂的資源,對應在伺服器端就是檔案和資料。
- 檔案方面,随着網際網路基礎設施的提升,Web 應用正在接納、處理和傳遞越來越多的包括圖檔和音視訊等形式在内的檔案。檔案的存取可能造成阻塞。
- 資料方面,随着大資料技術的推進,網際網路公司越來越熱衷于收集來自使用者的操作、位置、社會關系等等各種各樣的資訊。資料越來越具有流動性,資料量也顯著增長。資料的存取也可能造成阻塞。
- 此外,随着微服務架構的日趨火熱,各個微服務之間的通信不再像“巨石型”應用中通過對象的引用和方法的調用,而是經由網絡傳輸序列化的資料來實作,網絡的延遲也可能造成阻塞。
- 除了I/O方面的阻塞之外,一些複雜的業務邏輯由于處理時間比較長,也會造成調用者的阻塞。
多數人不認為阻塞是一個比較大的問題,至少覺得除了網絡I/O之外,讀寫檔案和資料庫還是很快的,許多的開發者也一直在寫阻塞的代碼。那麼我們就先來關注一下I/O的阻塞問題,對其嚴重性有一個直覺感性的認識。
1.2.1.1 I/O到底有多慢?
很多情況下,在大的空間和時間次元上,數量級經常會大到超出我們的認知範疇,我們的直覺總是不可靠的。

一、舉兩個空間次元的例子:
在大的尺度上,印象中,銀河系的中心位置群星閃耀,就像趕集一樣:
但實際上,如果把恒星們縮小到沙粒的大小,那麼密度也就相當于一個體育場有一兩粒沙子。曾經看科幻片,總是擔心光速飛行的飛船來不及拐彎或刹車撞到星球上,實際上,想要撞上才是相當不容易的。
而在小的尺度上,原子核具有原子絕大部分的品質,印象中,應該是這樣的吧:
但實際上,如果把一個原子也放大到體育場那麼大,原子核才僅僅相當于一個乒乓球那麼大,空曠的很!
二、從時間次元上:
往大了說,如果地球45億年曆史縮短為一年,那麼人類有記錄的浩瀚文明史不過相當于幾秒鐘而已。
往小了說,“一瞬間”與“一瞬間”可能會差好幾個數量級。我們就從微觀時間次元來了解一下“CPU眼中的時間”,你會發現,發生在計算機中的阻塞也許比你直覺印象中誇張的多。
CPU眼中的時間——
CPU絕對稱得上是“閃電俠”,因為它們做事都有自己的一套時鐘。我們故事的主人公是一個主頻為2.5GHz的CPU,如果它的世界也有“秒”的概念,并且它的時鐘跳一下為一秒,那麼在CPU(CPU的一個核心)眼中的時間概念是啥樣的呢?
CPU先生所在的組是硬體部計算組。對它來說,與其一起緊密合作的幾個小夥伴還能跟的上它的節奏:
- CPU先生很利索,隻需要一秒就可以完成一個指令,複雜的動作可能需要多個指令。
- 好在“貼身秘書”一級緩存反應比較快,能夠秒懂CPU先生的意思。
- 來自“秘書組”的二級緩存雖然要十幾秒才能“get”到CPU先生的點,但也不算太遲鈍。
- 和記憶體組的合作已經習以為常了,跟記憶體請求的資料通常要4-5分鐘才能找到(記憶體尋址),不過也還好啦,畢竟一級緩存那裡能拿到80%想要的資料,其餘的二級緩存也能搞定一大部分,不怎麼耽誤事兒。
CPU先生是典型的工作狂,任務多的時候,通宵達旦也毫無怨言,但是有什麼事情讓它等,那簡直要他命了。恰恰一起共事的其他組(尤其是I/O組的磁盤和網卡)相對來說那效率是低的離譜啊:
- 關于I/O組的同僚,CPU先生已經抱怨很久了,每次找SSD要東西,都要花費4-5天才能找到(尋址),等到資料傳送過來,幾周都過去了。機械磁盤更是過分地離譜,跟他要個資料,竟然要平均花費10個月才能找到,如果要讀取1M的資料,竟然要20個月!這種員工怎麼還不下崗?!
- 關于網卡,CPU先生知道它們也盡力了,畢竟萬兆網絡成本頗高。與機房内的其他小夥伴們用千兆網絡互相溝通也算順暢,給另一台機器的CPU朋友發送1K的書信,最快七八個小時就可以送過去了。但是1K的書信經過層層包裹,實際也寫不了多少話。更要命的是,網卡們的溝通手續繁雜,每次網絡溝通前的 “你好能聽到我嗎?——我能聽到,你那邊能聽到我嗎?——我也能聽到你,那我們開始吧!” 這樣的握手确認都要花掉很長的時間,不過不能當面溝通,也隻能這樣了。這還好,最恐怖的是與其他城市的小夥伴溝通,有時候傳遞消息要花費好幾年呢!
由此可見,對于CPU先生來說,想要讓工作充實起來實在不容易,不過多虧了記憶體組的小夥伴幫忙分批緩存往返于I/O組的資料,沖突才有所緩解。
這個圖隻能明顯看出涉及I/O的時間條,我們轉換成對數刻度的圖看一下:
這個圖并不是直覺的比例,橫軸上每個刻度是一個數量級,可見I/O的速度與CPU和記憶體相比是要差幾個數量級的。由此可見,對于大型高并發場景下的Web應用,緩存有多重要,更高的緩存命中率就意味着性能。
(以上時間資料來自http://cizixs.com/2017/01/03/how-slow-is-disk-and-network)
對于阻塞造成的性能損失,我們通常有兩種思路來解決:
- 并行化:使用更多的線程和硬體資源;
- 異步化:基于現有的資源來提高執行效率。
1.2.1.2 解決方案之一:多線程
由于I/O組的同僚實在太墨迹,有時候CPU先生眯上一覺再回來工作都不耽誤事兒。
如上圖,藍色的是CPU執行指令的時間,灰色的是等待I/O回報結果的時間。請不要糾結這張圖中時間的比例,這麼畫已經是給了I/O組面子的了。
作業系統部大牛很多,為了讓CPU先生的工作飽和起來,設計了多線程的工作方式。
但“多線程并非銀彈”,存在一些固有的弊端,并且有時候難以駕馭(具體見“附1”):
- 高并發環境下,多線程的切換會消耗CPU資源(上圖中CPU的時間條中深褐色的為上下文切換的時間,可以想見,高并發情況下,線程數會非常多,那麼上下文切換對資源的消耗也會變得明顯起來。況且在切換過程中,CPU并未執行任何業務上的或有意義的計算邏輯);
- 應對高并發環境的多線程開發相對比較難(需要掌握線程同步的原理與工具、ExecutorService、Fork/Join架構、并發集合和原子類等的使用),并且有些問題難以發現或重制(比如指令重排);
- 高并發環境下,更多的線程意味着更多的記憶體占用(JVM預設為每個線程配置設定1M的線程棧空間)。
這裡并非是否定多線程的功勞,相反,多線程在高并發方面發揮了重要作用。況且,多線程仍然是目前主流的高并發方案,在Servlet 3.1以前,Servlet 容器會為每一個接收到的請求配置設定一個單獨的線程來進行處理和響應。
随着Java版本的疊代,其對并發程式設計的支援越來越給力。大家仿佛覺得多線程就是最自然而然地處理高并發的方式,況且Http協定是無狀态的,在把session單獨放到分布式緩存中之後,Web伺服器的橫向擴充易如反掌,當使用者數迅速攀升的時候,橫向增加伺服器數量即可。尤其是随着雲計算技術和DevOps的普及,擴容與縮容已經可以自動化解決。
一切都還過得去,直到Node.js的出現,為Java Web開發帶來了新的啟示。因為畢竟在大家的印象中,JavaScript是一門隻活躍在浏覽器端的解釋型的性能并不高的語言,跟“伺服器端、高并發”這兩個詞仿佛八竿子打不着啊。更令人驚奇的是,Node.js隻需要單線程(引擎内部多線程)就可以應對高并發的請求,這是何等的骨骼驚奇!
Java可不可以這麼搞呢?答案是可以!秘訣同Node.js一樣——“異步非阻塞”。
1.2.1.3 解決方案之二:非阻塞
就像Node.js,使用“異步非阻塞”的代碼可以在不改變執行線程的情況下切換要執行的任務,基于現在Java語言的特性和SDK,我們通常有兩種方案:
- 回調。
-
。CompletableFuture
1)非阻塞的回調
我們知道,前端JavaScript代碼運作在浏覽器上的時候被限制為單線程的,是以JavaScript早早就練就了非阻塞的能力,對于需要較長時間才能傳回結果的調用通常采用異步方式,要不怎麼說“窮人的孩子早當家”呢。
我們最常見的異步調用的例子就是Ajax,如基于Jquery的Ajax調用的代碼:
$.ajax({
type: "POST",
url: "/url/path",
data: "name=John&location=Boston",
success: function(msg){
alert( "Data Saved: " + msg );
}
});
... // 後邊的代碼
這裡我們發出了一個
POST
請求出去,然後注冊了一個回調方法給
success
響應事件,然後就可以繼續執行後邊的代碼了,響應成功傳回的話會回調注冊的方法。OK,完美,沒有阻塞。
在Java開發過程中,我們也會時不時用到回調,但是對于複雜的邏輯,會導緻“callback hell”。什麼是callback hell呢,如圖:
圖裡這個還算比較好閱讀的,再比如下邊這個(本示例來自Reactor 3 Reference Guide)。需求是找到針對某個使用者的TOP5的Favorite,如果沒有傳回針對該使用者的任何Favorite,就預設給出5個建議。
userService.getFavorites(userId, new Callback<List<String>>() { // <1>
public void onSuccess(List<String> list) { // <2>
if (list.isEmpty()) { // <3>
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) { // <4>
UiUtils.submitOnUiThread(() -> { // <5>
list.stream()
.limit(5)
.forEach(uiList::show); // <6>
});
}
public void onError(Throwable error) { // <7>
UiUtils.errorPopup(error);
}
});
} else {
list.stream() // <8>
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId, // <9>
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
這确實是一段比較複雜的邏輯,有多處回調,難以閱讀。即使使用了lambda,代碼行數仍然不少。
- 基于回調的服務使用一個匿名
Callback
作為參數。後者的兩個方法分别在異步執行成功
或異常時被調用。
- 擷取到Favorite ID的list後調用第一個服務的回調方法
。onSuccess
- 如果 list 為空, 調用
。suggestionService
- 服務
傳遞suggestionService
給第二個回調。List<Favorite>
- 既然是處理 UI,我們需要確定消費代碼運作在 UI 線程。
- 使用 Java 8
來限制建議數量為5,然後在 UI 中顯示。Stream
- 在每一層,我們都以同樣的方式處理錯誤:在一個 popup 中顯示錯誤資訊。
- 回到Favorite ID這一層,如果傳回 list,我們需要使用
來擷取favoriteService
對象。由于隻想要5個,是以使用 stream 。Favorite
- 再一次回調。這次對每個ID,擷取
對象在 UI 線程中推送到前端顯示。Favorite
如果用響應式流怎麼編寫呢?下邊用Reactor3的庫來表達:
userService.getFavorites(userId) // <1>
.flatMap(favoriteService::getDetails) // <2>
.switchIfEmpty(suggestionService.getSuggestions()) // <3>
.take(5) // <4>
.publishOn(UiUtils.uiThreadScheduler()) // <5>
.subscribe(uiList::show, UiUtils::errorPopup); // <6>
- 我們擷取到Favorite ID的流。
- 我們 異步地轉換 它們(ID) 為
對象(使用Favorite
),現在我們有了flatMap
流。Favorite
- 一旦
為空,切換到Favorite
。suggestionService
- 我們隻關注流中的最多5個元素。
- 最後,我們希望在 UI 線程中進行處理。
- 通過描述對資料的最終處理(在 UI 中顯示)和對錯誤的處理(顯示在 popup 中)來觸發(
)。subscribe
如果你想確定“Favorite ID”的資料在800ms内獲得(如果逾時,從緩存中擷取)呢?在基于回調的代碼中,
想想就覺得複雜。但 Reactor3 中就很簡單,在處理鍊中增加一個
timeout
的操作符即可。
Reactor3 中增加逾時控制的例子
userService.getFavorites(userId)
.timeout(Duration.ofMillis(800)) // <1>
.onErrorResume(cacheService.cachedFavoritesFor(userId)) // <2>
.flatMap(favoriteService::getDetails) // <3>
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
- 如果流在逾時時限沒有發出(emit)任何值,則發出錯誤(error)信号。
- 一旦收到錯誤信号,交由
處理。cacheService
- 處理鍊後邊的内容與上例類似。
可見,響應式流的程式設計方式,不僅有效減少了代碼量,還大大提高了代碼的可閱讀性。
2)異步的CompletableFuture
CompletableFuture
也是在Java 8中新增的,相對于原來的
Future
,它有兩方面的亮點:
- 異步回調,它提供了五十多種方法,可以異步的調用而不會導緻阻塞;
- 聲明式,對于
的非靜态方法方法,多多少少可以看到類似上邊Reactor代碼的“聲明式程式設計”的感覺,比如CompletableFuture
。completableFuture.thenApplyAsync(...).thenApplyAsync(...).thenAcceptAsync(...)
比如我們在咖啡店買咖啡,點餐之後我們首先會拿到一張發票,這個發票就是
Future
,代表你憑此票在咖啡做好之後就可以去拿了。但是
Future.get()
方法仍然是同步和阻塞的,意味着你拿着票可以去找朋友聊會天,但是并不知道自己的咖啡什麼時候做好,可能去櫃台拿的時候還是要等一會兒。而提供
CompletableFuture
服務的咖啡廳,不僅有發票,還有一個号牌,我們點餐之後找個桌坐下就好,這個訂單的咖啡一旦做好就會送到我們手中。
相對于回調和
Future
來說,
CompletableFuture
的功能強大了不少,我們來嘗試使用它來實作這樣一個需求(本示例來自Reactor 3 Reference Guide):我們首先得到 ID 的清單,然後對每一個ID進一步擷取到“ID對應的name和statistics”這樣一對屬性的組合為元素的清單,整個過程用異步方式來實作。
CompletableFuture<List<String>> ids = ifhIds(); // <1>
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { // <2>
Stream<CompletableFuture<String>> zip =
l.stream().map(i -> { // <3>
CompletableFuture<String> nameTask = ifhName(i); // <4>
CompletableFuture<Integer> statTask = ifhStat(i); // <5>
return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); // <6>
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); // <7>
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); // <8>
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join) // <9>
.collect(Collectors.toList()));
});
List<String> results = result.join(); // <10>
assertThat(results).contains(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
- 以一個 Future 開始,其中封裝了後續将擷取和處理的 ID 的 list。
- 擷取到 list 後邊進一步對其啟動異步處理任務。
- 對于 list 中的每一個元素:
- 異步地得到相應的 name。
- 異步地得到相應的 statistics。
- 将兩個結果一一組合。
- 我們現在有了一個 list,元素是 Future(表示組合的任務,類型是
CompletableFuture
),為了執行這些任務,
我們需要将這個 list(元素構成的流) 轉換為數組(
)。List
- 将這個數組傳遞給
,傳回一個CompletableFuture.allOf
,當是以任務都完成了,那麼這個Future
也就完成了。Future
- 有點麻煩的地方在于
傳回的是allOf
,是以我們周遊這個 Future 的CompletableFuture<Void>
List
,
,然後使用
來手機它們的結果(不會導緻阻塞,因為join()
確定這些 Future 全部完成)AllOf
- 一旦整個異步流水線被觸發,我們等它完成處理,然後傳回結果清單。
可以看到
CompletableFuture
也盡力了,雖然使出渾身解數,但對于集合的操作還略顯吃力。由于 Reactor 内置許多組合操作,是以以上例子可以簡單地實作為:
Flux<String> ids = ifhrIds(); // <1>
Flux<String> combinations =
ids.flatMap(id -> { // <2>
Mono<String> nameTask = ifhrName(id); // <3>
Mono<Integer> statTask = ifhrStat(id); // <4>
return nameTask.zipWith(statTask, // <5>
(name, stat) -> "Name " + name + " has stats " + stat);
});
Mono<List<String>> result = combinations.collectList(); // <6>
List<String> results = result.block(); // <7>
assertThat(results).containsExactly( // <8>
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
- 這一次,我們從一個異步方式提供的
序列(ids
)開始。Flux<String>
- 對于序列中的每一個元素,我們異步地處理它(
方法内)兩次。flatMap
- 擷取相應的 name。
- 擷取相應的 statistic.
- 異步地組合兩個值。
- 随着序列中的元素值“到位”,它們收集一個
中。List
- 在生成流的環節,我們可以繼續異步地操作
Flux
流,對其進行組合和訂閱(subscribe)。
最終我們很可能得到一個
。由于是測試,我們阻塞住(Mono
block()
),等待流處理過程結束,
然後直接傳回集合。
- Assert 結果。
這種非阻塞資料流的感覺,讓我想起來了《讓子彈飛》裡邊最經典的一段:姜文飾演的張麻子朝新來縣長那“馬拉的火車啪啪啪連續打了N槍,旁邊兄弟問“打中沒有”,張麻子說“讓子彈飛一會兒~”,稍後就見拉火車的馬缰繩全都被子彈打斷,馬匹四散,非常6+1!如果張麻子每打一槍都看看前一槍有沒有射中的話,還怎麼裝X呢?
通過上邊的例子可見,回調或 CompletableFuture在處理複雜邏輯時會遇到的相似的窘境,反觀Reactor3提供的API,卻可以顯著減少代碼量,提高代碼可閱讀性,尤其是還可以提供一些不錯的功能。
1.2.2 流量控制——回壓
在響應式流中,資料流的發出者叫做
Publisher
,監聽者叫做
Subscriber
。我們後續就統一直譯叫做“釋出者”和“訂閱者”吧。
問題來了,假如釋出者發出資料的速度和訂閱者處理資料的速度不同的時候,怎麼辦呢?訂閱者處理速度快的話,那還好說,但是如果處理速度跟不上資料發出的速度,就像這樣:
如果沒有流量控制,那麼訂閱者會被釋出者快速産生的資料流淹沒。就像在一個流水線上,如果某個工位處理比較慢,而上遊下料比較快的話,這個工位的勞工師傅就吃不消了,這個時候他需要一種途徑來告訴上遊下料慢一些。
同樣的,訂閱者也需要有一種能夠向上遊回報流量需求的機制:
這種能夠向上遊回報流量請求的機制就叫做回壓(backpressure,也有翻譯為“背壓”的)。
在具體的使用過程中,回壓的處理會涉及不同的政策。舉兩個例子以便于了解:
舉例:緩存的政策
如圖,訂閱者處理完一個元素的時候通過
request(1)
跟釋出者再請求一個元素。由于釋出者的資料不能很快被訂閱者處理掉,那麼釋出者會将未處理的資料元素緩存起來。
這種處理方式與消息隊列有些相似之處,釋出者需要維護一個隊列用來緩存還沒有被處理的元素。通常用于對資料準确性要求比較高的場景,比如釋出者這兒是突然到來的資料高峰,都是要儲存到資料庫的,作為訂閱者的資料持久層沒有那麼快的處理速度,那麼釋出者就需要将資料暫時緩存起來。
舉例:丢棄的政策
如圖,釋出者不需要緩存來不及處理的資料,而是直接丢棄,當訂閱者請求資料的時候,會拿到釋出者那裡最近的一個資料元素。比如我們在做一個監控系統,背景的監控資料以每秒10個的速度産生,而前端界面隻需要每秒鐘更新一下監控資料即可,那作為釋出者的背景就不用緩存資料了,因為這種時效性強的場景,用不到的資料直接丢掉即可。
在後續的實戰階段,我們還會再深入了解回壓的作用原理。
1.2.3 總結
以上就是響應式流的兩個核心特點:異步非阻塞,以及基于“回壓”機制的流量控制。
這樣我們有了基于響應式流的“更新版”的響應式程式設計:
Reactor3和RxJava2都是具有以上特點的響應式流的具體實作庫。
響應式程式設計通常作為面向對象程式設計中的“觀察者模式”(Observer design pattern)的一種擴充。 響應式流(reactive streams)與“疊代子模式”(Iterator design pattern)也有相通之處, 因為其中也有 Iterable-Iterator 這樣的對應關系。主要的差別在于,Iterator 是基于 “拉取”(pull)方式的,而響應式流是基于“推送”(push)方式的。
使用 iterator 是一種“指令式”(imperative)程式設計範式,因為什麼時候擷取下一個元素取決于開發者。在響應式流中,相對應的角色是“釋出者 - 訂閱者”(Publisher-Subscriber),當有新的值到來的時候,反過來由釋出者(Publisher) 通知訂閱者(Subscriber),這種“推送”模式是響應式的關鍵。此外,對推送來的資料的操作 是通過一種聲明式(declaratively)而不是指令式(imperatively)的方式表達的:開發者通過 描述“處理流程”來定義對資料流的處理邏輯。
非常抱歉,前兩節羅裡吧嗦了這麼多卻沒有實戰,想必你也早就看煩了,那麼我們就一起來coding一下熱熱身吧。