天天看點

(4)Reactor 3快速上手——響應式Spring的道法術器(4)Reactor 3快速上手——響應式Spring的道法術器

boot最新為:2.4.2

裡面:spring-boot-starter-webflux 引用的即是,最新的 reactor 3.4.2

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
  <version>3.4.2</version>
  <scope>compile</scope>
</dependency>
           

找的路徑為:

<artifactId>spring-boot-starter-webflux</artifactId>
  <artifactId>spring-boot-starsster-reactor-netty</artifactId>
  <artifactId>reactor-netty-http</artifactId>
  <artifactId>reactor-core</artifactId>
           

(4)Reactor 3快速上手——響應式Spring的道法術器

1.3.2 Project Reactor

Project Reactor(以下簡稱“Reactor”)與Spring是兄弟項目,側重于Server端的響應式程式設計,

  • 主要 artifact 是 reactor-core,
  • 這是一個基于 Java 8 的實作了響應式流規範 (Reactive Streams specification)的響應式庫。

本文對Reactor的介紹以基本的概念和簡單的使用為主,深度以能夠滿足基本的Spring WebFlux使用為準。

  • 在下一章,我會結合Reactor的設計模式、并發排程模型 等原理層面的内容系統介紹Reactor的使用。

光說不練假把式,我們先把練習用的項目搭起來。先建立一個maven項目,然後添加依賴:

<dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.1.4.RELEASE</version>
    </dependency>
           

最新版本可到 http://search.maven.org 查詢,複制過來即可。另外出于測試的需要,添加如下依賴:

<dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <version>3.1.4.RELEASE</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
           

好了,我們開始Coding吧。

1.3.2.1 Flux與Mono

Reactor中的釋出者(Publisher)

  • Flux

    Mono

    兩個類定義,它們都提供了豐富的操作符(operator)。
  • 一個Flux對象代表一個包含 0…N 個元素的 響應式序列,
  • 而一個Mono對象代表一個 包含 零/一個(0…1)元素的結果。

既然是“資料流”的釋出者,Flux和Mono都可以發出三種“資料信号”:

  • 元素值、
  • 錯誤信号、
  • 完成信号,
  • 錯誤信号和完成信号都是終止信号,
  • 完成信号用于告知 下遊訂閱者 該資料流 正常結束,
  • 錯誤信号 終止資料流的同時将 錯誤傳遞給 下遊訂閱者。

下圖所示就是一個Flux類型的資料流,黑色箭頭是時間軸。

  • 它連續發出“1” - “6”共6個元素值,
  • 以及一個完成信号(圖中⑥後邊的加粗豎線來表示),
  • 完成信号告知訂閱者資料流已經結束。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-h2rZSlG0-1612524630318)(https://leanote.com/api/file/getImage?fileId=5a90f960ab64414490000a22)]

下圖所示是一個Mono類型的資料流,它發出一個元素值後,又發出一個完成信号。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-GmLP9fE1-1612524630320)(https://leanote.com/api/file/getImage?fileId=5a90f9e2ab64414699000ad6)]

既然Flux具有釋出一個資料元素的能力,為什麼還要專門定義一個Mono類呢?

  • 舉個例子,一個HTTP請求産生一個響應,是以對其進行“count”操作是沒有多大意義的。
  • 表示這樣一個結果的話,應該用

    Mono<HttpResponse>

    而不是

    Flux<HttpResponse>

  • 對于的操作通常隻用于處理 0/1 個元素。
  • 它們從語義上就原生包含着元素個數的資訊,進而避免了對Mono對象進行多元素場景下的處理。

有些操作可以改變基數,進而需要切換類型。比如,count操作用于Flux,

但是操作傳回的結果是

Mono<Long>

我們可以用如下代碼聲明上邊兩幅圖所示的Flux和Mono:

Flux.just(1, 2, 3, 4, 5, 6);
Mono.just(1);
           

Flux和Mono提供了多種建立資料流的方法,

  • just

    就是一種比較直接的 聲明資料流 的方式,其參數就是 資料元素。

對于圖中的Flux,還可以通過如下方式聲明(分别基于數組、集合和Stream生成):

​```java

Integer[] array = new Integer[]{1,2,3,4,5,6};

Flux.fromArray(array);//基于數組

List list = Arrays.asList(array);

Flux.fromIterable(list);//基于集合

Stream stream = list.stream();

Flux.fromStream(stream);//基于Stream

不過,這三種信号都不是一定要具備的:

- 首先,錯誤信号和 完成信号都是終止信号,二者不可能同時共存;
- 如果沒有 發出任何一個元素值,而是 直接發出完成/錯誤信号,表示這是一個空資料流;
- 如果沒有 錯誤信号和 完成信号,那麼就是一個無限資料流。

比如,對于隻有完成/錯誤信号的資料流:



```java
// 隻有完成信号的空資料流
Flux.just();
Flux.empty();
Mono.empty();
Mono.justOrEmpty(Optional.empty());
// 隻有錯誤信号的資料流
Flux.error(new Exception("some error"));
Mono.error(new Exception("some error"));
           

你可能會納悶,空的資料流有什麼用?舉個例子,當我們從響應式的 DB 中擷取結果的時候(假設DAO層是

ReactiveRepository<User>

),就有可能為空:

Mono<User> findById(long id);
 Flux<User> findAll();
           

無論是空還是發生異常,都需要通過完成/錯誤信号告知訂閱者,已經查詢完畢,但是抱歉沒有得到值,禮貌問題嘛~

1.3.2.2 訂閱前什麼都不會發生

資料流有了,假設我們想把每個資料元素原封不動地列印出來:

Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
System.out.println();
Mono.just(1).subscribe(System.out::println);
           

輸出如下:

123456
1
           

可見,

subscribe

方法中的lambda表達式作用在了每一個資料元素上。此外,Flux和Mono還提供了多個

subscribe

方法的變體:

可見,

subscribe

方法中的lambda表達式作用在了每一個資料元素上。此外,Flux和Mono還提供了多個

subscribe

方法的變體:

// 訂閱并觸發資料流
subscribe(); 

// 訂閱并指定對正常資料元素如何處理
subscribe(Consumer<? super T> consumer); 

// 訂閱并定義對正常資料元素和 錯誤信号的處理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 

// 訂閱并定義對正常資料元素、錯誤信号和 完成信号的處理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 

// 訂閱并定義對正常資料元素、錯誤信号和 完成信号的處理,以及訂閱發生時的處理邏輯
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); 
           

1)如果是訂閱上邊聲明的Flux:

Flux.just(1, 2, 3, 4, 5, 6).subscribe(
    System.out::println,
    System.err::println,
    () -> System.out.println("Completed!"));
           

輸出如下:

1
2
3
4
5
6
Completed!
           

2)再舉一個有錯誤信号的例子:

Mono.error(new Exception("some error")).subscribe(
        System.out::println,
        System.err::println,
        () -> System.out.println("Completed!")
);
           

輸出如下:

java.lang.Exception: some error  //紅字
           

列印出了錯誤信号,沒有輸出

Completed!

表明沒有發出完成信号。

這裡需要注意的一點是,

Flux.just(1, 2, 3, 4, 5, 6)

僅僅聲明了這個資料流,此時資料元素并未發出,隻有

subscribe()

方法調用的時候才會觸發資料流。是以,訂閱前什麼都不會發生。

1.3.2.3 測試與調試

從指令式 和 同步式程式設計切換到 響應式和 異步式程式設計有時候是令人生畏的。學習曲線中最陡峭的地方就是出錯時如何分析和調試。

在指令式世界,調試 通常都是非常直覺的:

  • 直接看 stack trace 就可以找到問題出現的位置, 以及其他資訊:是否問題責任全部出在你自己的代碼?問題是不是發生在某些庫代碼?
  • 如果是, 那你的哪部分代碼調用了庫,是不是傳參不合适導緻的問題?等等。

當你切換到響應式的異步代碼,事情就變得複雜的多了。不過我們先不接觸過于複雜的内容,

  • 先了解一個基本的單元測試工具——

    Step Ve ri fi er

最常見的測試 Reactor 序列的場景就是定義一個 Flux 或 Mono,然後在訂閱它的時候測試它的行為。

當你的測試關注于每一個資料元素的時候,就非常貼近使用 StepVerifier 的測試場景:

  • 下一個期望的資料或信号是什麼?
  • 你是否期望使用 Flux 來發出某一個特别的值?
  • 或者是否接下來 300ms 什麼都不做?——所有這些都可以使用 StepVerifier API 來表示。

還是以那個1-6的Flux以及會發出錯誤信号的Mono為例:

private Flux<Integer> generateFluxFrom1To6() {
        //傳回一個 正常的 Flux 流
        return Flux.just(1, 2, 3, 4, 5, 6);
    }
    private Mono<Integer> generateMonoWithError() {
        //傳回一個錯誤的 Mono流
        return Mono.error(new Exception("some error"));
    }
    @Test
    public void testViaStepVerifier() {
        //建立 一個流,内容是否是 1-6,執行完畢 核實
        StepVerifier.create(generateFluxFrom1To6())
                .expectNext(1, 2, 3, 4, 5, 6)
                .expectComplete()
                .verify();
        // 建立一個流 是否是 錯誤的,錯誤消息是否是 some error
        StepVerifier.create(generateMonoWithError())
                .expectErrorMessage("some error")
                .verify();
    }//執行結果為 測試成功
           
  • 其中,

    expectNext

    用于測試下一個期望的資料元素,
  • expectErrorMessage

    用于校驗下一個元素是否為錯誤信号,

    expectComplete

    用于測試下一個元素是否為完成信号。

StepVerifier

還提供了其他豐富的測試方法,我們會在後續的介紹中陸續接觸到。

1.3.2.4 操作符(Operator)

  • 通常情況下,我們需要對源釋出者發出的原始資料流進行多個階段的處理,并最終得到我們需要的資料。
  • 這種感覺就像是一條流水線,從流水線的源頭進入傳送帶的是原料,經過流水線上各個工位的處理,逐漸由原料變成半成品、零件、元件、成品,最終成為消費者需要的包裝品。
  • 這其中,流水線源頭的下料機就相當于源釋出者,消費者就相當于訂閱者,流水線上的一道道工序就相當于一個一個的操作符(Operator)。

下面介紹一些我們常用的操作符。

1)map - 元素映射為新元素

map

操作可以将資料元素進行轉換/映射,得到一個新元素。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-uAg3XcFc-1612524630322)(https://leanote.com/api/file/getImage?fileId=5a91310eab6441449000118c)]

public final <V> Flux<V> map(Function<? super T,? extends V> mapper)
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) 
           

上圖是Flux的map操作示意圖,上方的箭頭是原始序列的時間軸,下方的箭頭是經過map處理後的資料序列時間軸。

map

接受一個

Function

的 函數式接口為參數,這個函數式的作用是 定義轉換操作 的政策。舉例說明:

Flux.range(1, 6).map(i -> i * i).subscribe(System.out::println);

        StepVerifier.create(Flux.range(1, 6)    // 1
                .map(i -> i * i))   // 2
                .expectNext(1, 4, 9, 16, 25, 36)    //3
                .expectComplete();  // 4
//有bug,如果把36 改成 37,還是不會報錯
           
  1. Flux.range(1, 6)

    用于生成從“1”開始的,自增為1的“6”個 整型資料;
  2. map

    接受lambda

    i -> i * i

    為參數,表示對每個資料進行平方;
  3. 驗證新的序列的資料;
  4. verifyComplete()

    相當于

    expectComplete().verify()

2)flatMap - 元素映射為流

flatMap

操作可以将每個資料元素轉換/映射為一個流,然後将這些流合并為一個大的資料流。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-b6Vv7gi5-1612524630324)(https://leanote.com/api/file/getImage?fileId=5a913829ab64414699001334)]

注意到,流的合并是異步的,先來先到,并非是嚴格按照原始序列的順序(如圖藍色和紅色方塊是交叉的)。

public final <R> Flux<R> flatMap
    (Function<? super T, ? extends Publisher<? extends R>> mapper)
    
public final <R> Mono<R> flatMap
    (Function<? super T, ? extends Mono<? extends R>> transformer) 
           

flatMap

也是接收一個

Function

的函數式接口為參數,

  • 這個函數式的輸入為一個T類型資料值,對于Flux來說輸出可以是Flux和Mono,
  • 對于Mono來說輸出隻能是Mono。舉例說明:
StepVerifier.create(
        Flux.just("flux", "mono")
                .flatMap(s -> Flux.fromArray(s.split("\\s*"))   // 1
                        .delayElements(Duration.ofMillis(100))) // 2
                .doOnNext(System.out::print)) // 3
        .expectNextCount(8) // 4
        .verifyComplete();
//輸出的内容為:fmlounox
//如果 s 換成 a,依然是:mfolunxo,這是正則,現在不懂
           
  1. 對于每一個字元串

    s

    ,将其拆分為包含一個字元的字元串流;
  2. 對每個元素延遲100ms;
  3. 對每個元素進行列印(注

    doOnNext

    方法是“偷窺式”的方法,不會消費資料流);
  4. 驗證是否發出了8個元素。

列印結果為

mfolnuox

,原因在于各個拆分後的小字元串都是間隔100ms發出的,是以會交叉。

flatMap

通常用于每個元素又會引入資料流的情況,

  • 比如我們有一串url資料流,需要請求每個url并收集response資料。
  • 假設響應式的請求方法如下:

而url資料流為一個

Flux<String> urlFlux

,那麼為了得到所有的HttpResponse,就需要用到flatMap:

urlFlux.flatMap(url -> requestUrl(url));
           

其傳回内容為

Flux<HttpResponse>

類型的HttpResponse流。

3)filter - 過濾

filter

操作可以對資料元素進行篩選。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-EpnpEbkG-1612524630325)(https://leanote.com/api/file/getImage?fileId=5a913515ab644146990012ea)]

public final Flux<T> filter(Predicate<? super T> tester)
public final Mono<T> filter(Predicate<? super T> tester) 
           

filter

接受一個

Predicate

的函數式接口為參數,這個函數式的作用是進行判斷并傳回boolean。舉例說明:

StepVerifier.create(Flux.range(1, 6)
            .filter(i -> i % 2 == 1)    // 1
            .map(i -> i * i))
            .expectNext(1, 9, 25)   // 2
            .verifyComplete();
//過濾過後 為 1 3 5
           
  1. filter

    的lambda參數表示過濾操作将保留奇數;
  2. 驗證僅得到奇數的平方。

4)zip - 一對一合并

看到

zip

這個詞可能會聯想到拉鍊,它能夠将多個流一對一的合并起來。

  • zip有多個方法變體,我們介紹一個最常見的二合一的。
zip 
英 /zɪp/  美 /zɪp/  全球(美國)  
簡明 牛津 新牛津  韋氏  柯林斯 例句  百科
n. 拉鍊;活力,精力;尖嘯聲,撕裂聲;一種程式壓縮的檔案檔案格式
n. (Zip)茲普(人名)
vi. 拉開或拉上;以尖嘯聲行進
vt. 給...以速度;拉上或拉開拉鍊
           

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-eRnDyQb4-1612524630327)(https://leanote.com/api/file/getImage?fileId=5a914413ab64414699001476)]

它對兩個Flux/Mono流每次各取一個元素,合并為一個二進制組(

Tuple2

):

public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1,
                                          Publisher<? extends T2> source2)
public static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2) 
           
  • Flux

    zip

    方法接受Flux或Mono為參數,
  • Mono

    zip

    方法隻能接受Mono類型的參數。

舉個例子,假設我們有一個關于

zip

方法的說明:“Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.”,我們希望将這句話拆分為一個一個的單詞并以每200ms一個的速度發出,除了前面flatMap的例子中用到的

delayElements

,可以如下操作:

private Flux<String> getZipDescFlux() {
        //定義字元串
        String desc = "Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.";
        //字元串 以空格 剪切
        return Flux.fromArray(desc.split("\\s+"));  // 1
    }

    @Test
    public void testSimpleOperators() throws InterruptedException {
        
        CountDownLatch countDownLatch = new CountDownLatch(1);  // 2
        Flux.zip(
                getZipDescFlux(),
                Flux.interval(Duration.ofMillis(200))
                )  // 3等 200毫秒

                .subscribe(
                        t -> System.out.println(t.getT1()),
                        null,
                        countDownLatch::countDown);    // 4

        countDownLatch.await(10, TimeUnit.SECONDS);     // 5
    }
           
  1. 将英文說明用空格拆分為字元串流;
  2. 定義一個

    CountDownLatch

    ,初始為1,
    1. 則會等待執行1次

      countDown

      方法後結束,
    2. 不使用它的話,測試方法所在的線程會直接傳回 而不會等待資料流發出完畢;
  3. 使用

    Flux.interval

    聲明一個每200ms發出一個元素的long資料流;因為zip操作是一對一的,故而将其與字元串流zip之後,字元串流也将具有同樣的速度;
  4. zip之後的流中元素類型為

    Tuple2

    ,使用

    getT1

    方法拿到字元串流的元素;定義完成信号的處理為

    countDown

    ;
  5. countDownLatch.await(10, TimeUnit.SECONDS)

    會等待

    countDown

    倒數至0,最多等待10秒鐘。

除了

zip

靜态方法之外,還有

zipWith

等非靜态方法,效果與之類似:

getZipDescFlux().zipWith(Flux.interval(Duration.ofMillis(200)))
           

在異步條件下,資料流的流速不同,使用zip能夠一對一地将兩個或多個資料流的元素對齊發出。

5)更多

Reactor中提供了非常豐富的操作符,除了以上幾個常見的,還有:

  • 用于程式設計方式自定義生成資料流的

    create

    generate

    等及其變體方法;
  • 用于“無副作用的peek”場景的

    doOnNext

    doOnError

    doOncomplete

    doOnSubscribe

    doOnCancel

    等及其變體方法;
  • 用于資料流轉換的

    when

    and/or

    merge

    concat

    collect

    count

    repeat

    等及其變體方法;
  • 用于過濾/揀選的

    take

    first

    last

    sample

    skip

    limitRequest

    等及其變體方法;
  • 用于錯誤處理的

    timeout

    onErrorReturn

    onErrorResume

    doFinally

    retryWhen

    等及其變體方法;
  • 用于分批的

    window

    buffer

    group

    等及其變體方法;
  • 用于線程排程的

    publishOn

    subscribeOn

    方法。

使用這些操作符,你幾乎可以搭建出能夠進行任何業務需求的資料處理管道/流水線。

抱歉以上這些暫時不能一一介紹,更多詳情請參考JavaDoc,在下一章我們還會回頭對Reactor從更深層次進行系統的分析。

此外,也可閱讀我翻譯的Reactor參考文檔,我會盡量及時更新翻譯的内容。文檔源碼位于github,如有翻譯不當,歡迎送出Pull-Request。

學不動了。

https://blog.51cto.com/liukang/2090191

1.3.2.8 總結

以上關于Reactor的介紹主要是概念層面和使用層面的介紹,不過應該也足以應對常見的業務環境了。

從指令式程式設計到響應式程式設計的切換并不是一件容易的事,需要一個适應的過程。不過相信你通過本節的了解和實操,已經可以體會到使用Reactor程式設計的一些特點:

  • 相對于傳統的基于回調和Future的異步開發方式,響應式程式設計更加具有可編排性和可讀性,配合lambda表達式,代碼更加簡潔,處理邏輯的表達就像裝配“流水線”,适用于對資料流的處理;
  • 在訂閱(subscribe)時才觸發資料流,這種資料流叫做“冷”資料流,就像插座插上電器才會有電流一樣,還有一種資料流不管是否有訂閱者訂閱它都會一直發出資料,稱之為“熱”資料流,Reactor中幾乎都是“冷”資料流;
  • 排程器對線程管理進行更高層次的抽象,使得我們可以非常容易地切換線程執行環境;
  • 靈活的錯誤處理機制有利于編寫健壯的程式;
  • “回壓”機制使得訂閱者可以無限接受資料并讓它的源頭“滿負荷”推送所有的資料,也可以通過使用

    request

    方法來告知源頭它一次最多能夠處理 n 個元素,進而将“推送”模式轉換為“推送+拉取”混合的模式。

後續随着對Reactor的了解我們還會逐漸了解它更多的好玩又好用的特性。

Reactor的開發者中也有來自RxJava的大牛,是以Reactor中甚至許多方法名都是來自RxJava的API的,學習了Reactor之後,很輕松就可以上手Rx家族的庫了。