boot最新為:2.4.2
裡面:spring-boot-starter-webflux 引用的即是,最新的 reactor 3.4.2
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.2</version>
<scope>compile</scope>
</dependency>
找的路徑為:
<artifactId>spring-boot-starter-webflux</artifactId>
<artifactId>spring-boot-starsster-reactor-netty</artifactId>
<artifactId>reactor-netty-http</artifactId>
<artifactId>reactor-core</artifactId>
(4)Reactor 3快速上手——響應式Spring的道法術器
1.3.2 Project Reactor
Project Reactor(以下簡稱“Reactor”)與Spring是兄弟項目,側重于Server端的響應式程式設計,
- 主要 artifact 是 reactor-core,
- 這是一個基于 Java 8 的實作了響應式流規範 (Reactive Streams specification)的響應式庫。
本文對Reactor的介紹以基本的概念和簡單的使用為主,深度以能夠滿足基本的Spring WebFlux使用為準。
- 在下一章,我會結合Reactor的設計模式、并發排程模型 等原理層面的内容系統介紹Reactor的使用。
光說不練假把式,我們先把練習用的項目搭起來。先建立一個maven項目,然後添加依賴:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.4.RELEASE</version>
</dependency>
最新版本可到 http://search.maven.org 查詢,複制過來即可。另外出于測試的需要,添加如下依賴:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.1.4.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
好了,我們開始Coding吧。
1.3.2.1 Flux與Mono
Reactor中的釋出者(Publisher)
- 由
和Flux
兩個類定義,它們都提供了豐富的操作符(operator)。Mono
- 一個Flux對象代表一個包含 0…N 個元素的 響應式序列,
- 而一個Mono對象代表一個 包含 零/一個(0…1)元素的結果。
既然是“資料流”的釋出者,Flux和Mono都可以發出三種“資料信号”:
- 元素值、
- 錯誤信号、
- 完成信号,
- 錯誤信号和完成信号都是終止信号,
- 完成信号用于告知 下遊訂閱者 該資料流 正常結束,
- 錯誤信号 終止資料流的同時将 錯誤傳遞給 下遊訂閱者。
下圖所示就是一個Flux類型的資料流,黑色箭頭是時間軸。
- 它連續發出“1” - “6”共6個元素值,
- 以及一個完成信号(圖中⑥後邊的加粗豎線來表示),
- 完成信号告知訂閱者資料流已經結束。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-h2rZSlG0-1612524630318)(https://leanote.com/api/file/getImage?fileId=5a90f960ab64414490000a22)]
下圖所示是一個Mono類型的資料流,它發出一個元素值後,又發出一個完成信号。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-GmLP9fE1-1612524630320)(https://leanote.com/api/file/getImage?fileId=5a90f9e2ab64414699000ad6)]
既然Flux具有釋出一個資料元素的能力,為什麼還要專門定義一個Mono類呢?
- 舉個例子,一個HTTP請求産生一個響應,是以對其進行“count”操作是沒有多大意義的。
- 表示這樣一個結果的話,應該用
而不是Mono<HttpResponse>
,Flux<HttpResponse>
- 對于的操作通常隻用于處理 0/1 個元素。
- 它們從語義上就原生包含着元素個數的資訊,進而避免了對Mono對象進行多元素場景下的處理。
有些操作可以改變基數,進而需要切換類型。比如,count操作用于Flux,
但是操作傳回的結果是
。
Mono<Long>
我們可以用如下代碼聲明上邊兩幅圖所示的Flux和Mono:
Flux.just(1, 2, 3, 4, 5, 6);
Mono.just(1);
Flux和Mono提供了多種建立資料流的方法,
-
就是一種比較直接的 聲明資料流 的方式,其參數就是 資料元素。just
對于圖中的Flux,還可以通過如下方式聲明(分别基于數組、集合和Stream生成):
```java
Integer[] array = new Integer[]{1,2,3,4,5,6};
Flux.fromArray(array);//基于數組
List list = Arrays.asList(array);
Flux.fromIterable(list);//基于集合
Stream stream = list.stream();
Flux.fromStream(stream);//基于Stream
不過,這三種信号都不是一定要具備的:
- 首先,錯誤信号和 完成信号都是終止信号,二者不可能同時共存;
- 如果沒有 發出任何一個元素值,而是 直接發出完成/錯誤信号,表示這是一個空資料流;
- 如果沒有 錯誤信号和 完成信号,那麼就是一個無限資料流。
比如,對于隻有完成/錯誤信号的資料流:
```java
// 隻有完成信号的空資料流
Flux.just();
Flux.empty();
Mono.empty();
Mono.justOrEmpty(Optional.empty());
// 隻有錯誤信号的資料流
Flux.error(new Exception("some error"));
Mono.error(new Exception("some error"));
你可能會納悶,空的資料流有什麼用?舉個例子,當我們從響應式的 DB 中擷取結果的時候(假設DAO層是
ReactiveRepository<User>
),就有可能為空:
Mono<User> findById(long id);
Flux<User> findAll();
無論是空還是發生異常,都需要通過完成/錯誤信号告知訂閱者,已經查詢完畢,但是抱歉沒有得到值,禮貌問題嘛~
1.3.2.2 訂閱前什麼都不會發生
資料流有了,假設我們想把每個資料元素原封不動地列印出來:
Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
System.out.println();
Mono.just(1).subscribe(System.out::println);
輸出如下:
123456
1
可見,
subscribe
方法中的lambda表達式作用在了每一個資料元素上。此外,Flux和Mono還提供了多個
subscribe
方法的變體:
可見,
subscribe
方法中的lambda表達式作用在了每一個資料元素上。此外,Flux和Mono還提供了多個
subscribe
方法的變體:
// 訂閱并觸發資料流
subscribe();
// 訂閱并指定對正常資料元素如何處理
subscribe(Consumer<? super T> consumer);
// 訂閱并定義對正常資料元素和 錯誤信号的處理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 訂閱并定義對正常資料元素、錯誤信号和 完成信号的處理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 訂閱并定義對正常資料元素、錯誤信号和 完成信号的處理,以及訂閱發生時的處理邏輯
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
1)如果是訂閱上邊聲明的Flux:
Flux.just(1, 2, 3, 4, 5, 6).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Completed!"));
輸出如下:
1
2
3
4
5
6
Completed!
2)再舉一個有錯誤信号的例子:
Mono.error(new Exception("some error")).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Completed!")
);
輸出如下:
java.lang.Exception: some error //紅字
列印出了錯誤信号,沒有輸出
Completed!
表明沒有發出完成信号。
這裡需要注意的一點是,
Flux.just(1, 2, 3, 4, 5, 6)
僅僅聲明了這個資料流,此時資料元素并未發出,隻有
subscribe()
方法調用的時候才會觸發資料流。是以,訂閱前什麼都不會發生。
1.3.2.3 測試與調試
從指令式 和 同步式程式設計切換到 響應式和 異步式程式設計有時候是令人生畏的。學習曲線中最陡峭的地方就是出錯時如何分析和調試。
在指令式世界,調試 通常都是非常直覺的:
- 直接看 stack trace 就可以找到問題出現的位置, 以及其他資訊:是否問題責任全部出在你自己的代碼?問題是不是發生在某些庫代碼?
- 如果是, 那你的哪部分代碼調用了庫,是不是傳參不合适導緻的問題?等等。
當你切換到響應式的異步代碼,事情就變得複雜的多了。不過我們先不接觸過于複雜的内容,
- 先了解一個基本的單元測試工具——
。Step Ve ri fi er
最常見的測試 Reactor 序列的場景就是定義一個 Flux 或 Mono,然後在訂閱它的時候測試它的行為。
當你的測試關注于每一個資料元素的時候,就非常貼近使用 StepVerifier 的測試場景:
- 下一個期望的資料或信号是什麼?
- 你是否期望使用 Flux 來發出某一個特别的值?
- 或者是否接下來 300ms 什麼都不做?——所有這些都可以使用 StepVerifier API 來表示。
還是以那個1-6的Flux以及會發出錯誤信号的Mono為例:
private Flux<Integer> generateFluxFrom1To6() {
//傳回一個 正常的 Flux 流
return Flux.just(1, 2, 3, 4, 5, 6);
}
private Mono<Integer> generateMonoWithError() {
//傳回一個錯誤的 Mono流
return Mono.error(new Exception("some error"));
}
@Test
public void testViaStepVerifier() {
//建立 一個流,内容是否是 1-6,執行完畢 核實
StepVerifier.create(generateFluxFrom1To6())
.expectNext(1, 2, 3, 4, 5, 6)
.expectComplete()
.verify();
// 建立一個流 是否是 錯誤的,錯誤消息是否是 some error
StepVerifier.create(generateMonoWithError())
.expectErrorMessage("some error")
.verify();
}//執行結果為 測試成功
- 其中,
用于測試下一個期望的資料元素,expectNext
-
用于校驗下一個元素是否為錯誤信号,expectErrorMessage
用于測試下一個元素是否為完成信号。expectComplete
StepVerifier
還提供了其他豐富的測試方法,我們會在後續的介紹中陸續接觸到。
1.3.2.4 操作符(Operator)
- 通常情況下,我們需要對源釋出者發出的原始資料流進行多個階段的處理,并最終得到我們需要的資料。
- 這種感覺就像是一條流水線,從流水線的源頭進入傳送帶的是原料,經過流水線上各個工位的處理,逐漸由原料變成半成品、零件、元件、成品,最終成為消費者需要的包裝品。
- 這其中,流水線源頭的下料機就相當于源釋出者,消費者就相當于訂閱者,流水線上的一道道工序就相當于一個一個的操作符(Operator)。
下面介紹一些我們常用的操作符。
1)map - 元素映射為新元素
map
操作可以将資料元素進行轉換/映射,得到一個新元素。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-uAg3XcFc-1612524630322)(https://leanote.com/api/file/getImage?fileId=5a91310eab6441449000118c)]
public final <V> Flux<V> map(Function<? super T,? extends V> mapper)
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper)
上圖是Flux的map操作示意圖,上方的箭頭是原始序列的時間軸,下方的箭頭是經過map處理後的資料序列時間軸。
map
接受一個
Function
的 函數式接口為參數,這個函數式的作用是 定義轉換操作 的政策。舉例說明:
Flux.range(1, 6).map(i -> i * i).subscribe(System.out::println);
StepVerifier.create(Flux.range(1, 6) // 1
.map(i -> i * i)) // 2
.expectNext(1, 4, 9, 16, 25, 36) //3
.expectComplete(); // 4
//有bug,如果把36 改成 37,還是不會報錯
-
用于生成從“1”開始的,自增為1的“6”個 整型資料;Flux.range(1, 6)
-
接受lambdamap
為參數,表示對每個資料進行平方;i -> i * i
- 驗證新的序列的資料;
-
相當于verifyComplete()
。expectComplete().verify()
2)flatMap - 元素映射為流
flatMap
操作可以将每個資料元素轉換/映射為一個流,然後将這些流合并為一個大的資料流。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-b6Vv7gi5-1612524630324)(https://leanote.com/api/file/getImage?fileId=5a913829ab64414699001334)]
注意到,流的合并是異步的,先來先到,并非是嚴格按照原始序列的順序(如圖藍色和紅色方塊是交叉的)。
public final <R> Flux<R> flatMap
(Function<? super T, ? extends Publisher<? extends R>> mapper)
public final <R> Mono<R> flatMap
(Function<? super T, ? extends Mono<? extends R>> transformer)
flatMap
也是接收一個
Function
的函數式接口為參數,
- 這個函數式的輸入為一個T類型資料值,對于Flux來說輸出可以是Flux和Mono,
- 對于Mono來說輸出隻能是Mono。舉例說明:
StepVerifier.create(
Flux.just("flux", "mono")
.flatMap(s -> Flux.fromArray(s.split("\\s*")) // 1
.delayElements(Duration.ofMillis(100))) // 2
.doOnNext(System.out::print)) // 3
.expectNextCount(8) // 4
.verifyComplete();
//輸出的内容為:fmlounox
//如果 s 換成 a,依然是:mfolunxo,這是正則,現在不懂
- 對于每一個字元串
,将其拆分為包含一個字元的字元串流;s
- 對每個元素延遲100ms;
- 對每個元素進行列印(注
方法是“偷窺式”的方法,不會消費資料流);doOnNext
- 驗證是否發出了8個元素。
列印結果為
mfolnuox
,原因在于各個拆分後的小字元串都是間隔100ms發出的,是以會交叉。
flatMap
通常用于每個元素又會引入資料流的情況,
- 比如我們有一串url資料流,需要請求每個url并收集response資料。
- 假設響應式的請求方法如下:
而url資料流為一個
Flux<String> urlFlux
,那麼為了得到所有的HttpResponse,就需要用到flatMap:
urlFlux.flatMap(url -> requestUrl(url));
其傳回内容為
Flux<HttpResponse>
類型的HttpResponse流。
3)filter - 過濾
filter
操作可以對資料元素進行篩選。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-EpnpEbkG-1612524630325)(https://leanote.com/api/file/getImage?fileId=5a913515ab644146990012ea)]
public final Flux<T> filter(Predicate<? super T> tester)
public final Mono<T> filter(Predicate<? super T> tester)
filter
接受一個
Predicate
的函數式接口為參數,這個函數式的作用是進行判斷并傳回boolean。舉例說明:
StepVerifier.create(Flux.range(1, 6)
.filter(i -> i % 2 == 1) // 1
.map(i -> i * i))
.expectNext(1, 9, 25) // 2
.verifyComplete();
//過濾過後 為 1 3 5
-
的lambda參數表示過濾操作将保留奇數;filter
- 驗證僅得到奇數的平方。
4)zip - 一對一合并
看到
zip
這個詞可能會聯想到拉鍊,它能夠将多個流一對一的合并起來。
- zip有多個方法變體,我們介紹一個最常見的二合一的。
zip
英 /zɪp/ 美 /zɪp/ 全球(美國)
簡明 牛津 新牛津 韋氏 柯林斯 例句 百科
n. 拉鍊;活力,精力;尖嘯聲,撕裂聲;一種程式壓縮的檔案檔案格式
n. (Zip)茲普(人名)
vi. 拉開或拉上;以尖嘯聲行進
vt. 給...以速度;拉上或拉開拉鍊
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-eRnDyQb4-1612524630327)(https://leanote.com/api/file/getImage?fileId=5a914413ab64414699001476)]
它對兩個Flux/Mono流每次各取一個元素,合并為一個二進制組(
Tuple2
):
public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2)
public static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)
-
的Flux
方法接受Flux或Mono為參數,zip
-
的Mono
方法隻能接受Mono類型的參數。zip
舉個例子,假設我們有一個關于
zip
方法的說明:“Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.”,我們希望将這句話拆分為一個一個的單詞并以每200ms一個的速度發出,除了前面flatMap的例子中用到的
delayElements
,可以如下操作:
private Flux<String> getZipDescFlux() {
//定義字元串
String desc = "Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.";
//字元串 以空格 剪切
return Flux.fromArray(desc.split("\\s+")); // 1
}
@Test
public void testSimpleOperators() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1); // 2
Flux.zip(
getZipDescFlux(),
Flux.interval(Duration.ofMillis(200))
) // 3等 200毫秒
.subscribe(
t -> System.out.println(t.getT1()),
null,
countDownLatch::countDown); // 4
countDownLatch.await(10, TimeUnit.SECONDS); // 5
}
- 将英文說明用空格拆分為字元串流;
- 定義一個
,初始為1,CountDownLatch
- 則會等待執行1次
方法後結束,countDown
- 不使用它的話,測試方法所在的線程會直接傳回 而不會等待資料流發出完畢;
- 則會等待執行1次
- 使用
聲明一個每200ms發出一個元素的long資料流;因為zip操作是一對一的,故而将其與字元串流zip之後,字元串流也将具有同樣的速度;Flux.interval
- zip之後的流中元素類型為
,使用Tuple2
方法拿到字元串流的元素;定義完成信号的處理為getT1
;countDown
-
會等待countDownLatch.await(10, TimeUnit.SECONDS)
倒數至0,最多等待10秒鐘。countDown
除了
zip
靜态方法之外,還有
zipWith
等非靜态方法,效果與之類似:
getZipDescFlux().zipWith(Flux.interval(Duration.ofMillis(200)))
在異步條件下,資料流的流速不同,使用zip能夠一對一地将兩個或多個資料流的元素對齊發出。
5)更多
Reactor中提供了非常豐富的操作符,除了以上幾個常見的,還有:
- 用于程式設計方式自定義生成資料流的
和create
等及其變體方法;generate
- 用于“無副作用的peek”場景的
、doOnNext
、doOnError
、doOncomplete
、doOnSubscribe
等及其變體方法;doOnCancel
- 用于資料流轉換的
、when
、and/or
、merge
、concat
、collect
、count
等及其變體方法;repeat
- 用于過濾/揀選的
、take
、first
、last
、sample
、skip
等及其變體方法;limitRequest
- 用于錯誤處理的
、timeout
、onErrorReturn
、onErrorResume
、doFinally
等及其變體方法;retryWhen
- 用于分批的
、window
、buffer
等及其變體方法;group
- 用于線程排程的
和publishOn
方法。subscribeOn
使用這些操作符,你幾乎可以搭建出能夠進行任何業務需求的資料處理管道/流水線。
抱歉以上這些暫時不能一一介紹,更多詳情請參考JavaDoc,在下一章我們還會回頭對Reactor從更深層次進行系統的分析。
此外,也可閱讀我翻譯的Reactor參考文檔,我會盡量及時更新翻譯的内容。文檔源碼位于github,如有翻譯不當,歡迎送出Pull-Request。
學不動了。
https://blog.51cto.com/liukang/2090191
1.3.2.8 總結
以上關于Reactor的介紹主要是概念層面和使用層面的介紹,不過應該也足以應對常見的業務環境了。
從指令式程式設計到響應式程式設計的切換并不是一件容易的事,需要一個适應的過程。不過相信你通過本節的了解和實操,已經可以體會到使用Reactor程式設計的一些特點:
- 相對于傳統的基于回調和Future的異步開發方式,響應式程式設計更加具有可編排性和可讀性,配合lambda表達式,代碼更加簡潔,處理邏輯的表達就像裝配“流水線”,适用于對資料流的處理;
- 在訂閱(subscribe)時才觸發資料流,這種資料流叫做“冷”資料流,就像插座插上電器才會有電流一樣,還有一種資料流不管是否有訂閱者訂閱它都會一直發出資料,稱之為“熱”資料流,Reactor中幾乎都是“冷”資料流;
- 排程器對線程管理進行更高層次的抽象,使得我們可以非常容易地切換線程執行環境;
- 靈活的錯誤處理機制有利于編寫健壯的程式;
- “回壓”機制使得訂閱者可以無限接受資料并讓它的源頭“滿負荷”推送所有的資料,也可以通過使用
方法來告知源頭它一次最多能夠處理 n 個元素,進而将“推送”模式轉換為“推送+拉取”混合的模式。request
後續随着對Reactor的了解我們還會逐漸了解它更多的好玩又好用的特性。
Reactor的開發者中也有來自RxJava的大牛,是以Reactor中甚至許多方法名都是來自RxJava的API的,學習了Reactor之後,很輕松就可以上手Rx家族的庫了。