天天看點

Mono和Flux的用法詳解

WebFlux的Flux和Mono用法

Flux

just

可以指定序列中包含的全部元素。建立出來的 Flux 序列在釋出這些元素之後會自動結束。

fromArray(),fromIterable()和 fromStream()

可以從一個數組、Iterable 對象或 Stream 對象中建立 Flux 對象。

empty()

建立一個不包含任何元素,隻釋出結束消息的序列,在響應式程式設計中,流的傳遞是基于元素的,empty表示沒有任何元素,是以不會進行後續傳遞,需要用switchIfEmpty等處理

error(Throwable error)

建立一個隻包含錯誤消息的序列。

never()

建立一個不包含任何消息通知的序列。使用示例:

Flux.range(1, 10)

    .timeout(Flux.never(), v -> Flux.never())

    .subscribe(System.out::println);

1

2

3

上面表示用不逾時

range(int start, int count)

建立包含從 start 起始的 count 個數量的 Integer 對象的序列。示例:

Flux.interval(Duration.ofSeconds(2)).doOnNext(System.out::println).blockLast();

1

intervalMillis(long period)和 intervalMillis(long delay, long period)

與 interval()方法的作用相同,隻不過該方法通過毫秒數來指定時間間隔和延遲時間。

generate()

generate()方法通過同步和逐一的方式來産生 Flux 序列。序列的産生是通過調用所提供的 SynchronousSink 對象的 next(),complete()和 error(Throwable)方法來完成的。逐一生成的含義是在具體的生成邏輯中,next()方法隻能最多被調用一次。在有些情況下,序列的生成可能是有狀态的,需要用到某些狀态對象。此時可以使用 generate()方法的另外一種形式 generate(Callable stateSupplier, BiFunction<S,SynchronousSink,S> generator),其中 stateSupplier 用來提供初始的狀态對象。在進行序列生成時,狀态對象會作為 generator 使用的第一個參數傳入,可以在對應的邏輯中對該狀态對象進行修改以供下一次生成時使用。

在代碼清單 2中,第一個序列的生成邏輯中通過 next()方法産生一個簡單的值,然後通過 complete()方法來結束該序列。如果不調用 complete()方法,所産生的是一個無限序列。第二個序列的生成邏輯中的狀态對象是一個 ArrayList 對象。實際産生的值是一個随機數。産生的随機數被添加到 ArrayList 中。當産生了 10 個數時,通過 complete()方法來結束序列

Flux.generate(sink -> {

    sink.next("Hello");

    sink.complete();

}).subscribe(System.out::println);

final Random random = new Random();

Flux.generate(ArrayList::new, (list, sink) -> {

    int value = random.nextInt(100);

    list.add(value);

    sink.next(value);

    if (list.size() == 10) {

        sink.complete();

    }

    return list;

}).subscribe(System.out::println);

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

create()方法

create()方法與 generate()方法的不同之處在于所使用的是 FluxSink 對象。FluxSink 支援同步和異步的消息産生,并且可以在一次調用中産生多個元素。在代碼清單 3 中,在一次調用中就産生了全部的 10 個元素。

Flux.create(sink -> {

    for (int i = 0; i < 10; i++) {

        sink.next(i);

    }

    sink.complete();

}).subscribe(System.out::println);

1

2

3

4

5

6

Mono

just

建立對象

empty

建立一個不包含任何元素,隻釋出結束消息的序列

error()

抛出異常,使用示例:

        Mono.defer(()->{

            return Mono.error(new RuntimeException());

        }).subscribe();

1

2

3

never()

empty裡面至少還有一個結束消息,而never則是真的啥都沒有

fromCallable()

使用示例:

Mono.fromCallable(() -> "9999").subscribe(System.out::println);

1

fromCompletionStage()

示例:

Mono.fromCompletionStage(future).block();

1

fromFuture()、fromRunnable()和 fromSupplier()

分别從 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中建立 Mono。

delay(Duration duration)和 delayMillis(long duration)

建立一個 Mono 序列,在指定的延遲時間之後,産生數字 0 作為唯一值。

Mono.delay(Duration.ofSeconds(3)).doOnNext(System.out::println).block();

1

justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data)

從一個 Optional 對象或可能為 null 的對象中建立 Mono。隻有 Optional 對象中包含值或對象不為 null 時,Mono 序列才産生對應的元素。

還可以通過 create()方法來使用 MonoSink 來建立 Mono

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);

Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);

Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

1

2

3

操作符

buffer 和 bufferTimeout

這兩個操作符的作用是把目前流中的元素收集到集合中,并把集合對象作為流中的新元素。在進行收集時可以指定不同的條件:所包含的元素的最大數量和收集的時間。方法 buffer()僅使用一個條件,而 bufferTimeout()可以同時指定兩個條件。指定時間間隔時可以使用 Duration 對象或毫秒數,即使用 bufferMillis()或 bufferTimeoutMillis()兩個方法。

除了元素數量和時間之外,還可以通過 bufferUntil 和 bufferWhile 操作符來進行收集。這兩個操作符的參數是表示每個集合中的元素所要滿足的條件的 Predicate 對象。bufferUntil 會一直收集直到 Predicate 傳回為 true。使得 Predicate 傳回 true 的那個元素可以選擇添加到目前集合或下一個集合中;bufferWhile 則隻有當 Predicate 傳回 true 時才會收集。一旦值為 false,會立即開始下一次收集

需要注意的是,在第二個案例中,首先通過 toStream()方法把 Flux 序列轉換成 Java 8 中的 Stream 對象,再通過 forEach()方法來進行輸出。這是因為序列的生成是異步的,而轉換成 Stream 對象可以保證主線程在序列生成完成之前不會退出,進而可以正确地輸出序列中的所有元素。

Flux.range(1, 100).buffer(20).subscribe(System.out::println);

Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);

Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);

Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

1

2

3

4

window

window 操作符的作用類似于 buffer,所不同的是 window 操作符是把目前流中的元素收集到另外的 Flux 序列中,是以傳回值類型是 Flux<Flux>。在代碼清單 7 中,兩行語句的輸出結果分别是 5 個和 2 個 UnicastProcessor 字元。這是因為 window 操作符所産生的流中包含的是 UnicastProcessor 類的對象,而 UnicastProcessor 類的 toString 方法輸出的就是 UnicastProcessor 字元。

Flux.range(1, 100).window(20).subscribe(System.out::println);

Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);

1

2

zipWith

zipWith 操作符把目前流中的元素與另外一個流中的元素按照一對一的方式進行合并。在合并時可以不做任何處理,由此得到的是一個元素類型為 Tuple2 的流;也可以通過一個 BiFunction 函數對合并的元素進行處理,所得到的流的元素類型為該函數的傳回值。

在代碼清單 8 中,兩個流中包含的元素分别是 a,b 和 c,d。第一個 zipWith 操作符沒有使用合并函數,是以結果流中的元素類型為 Tuple2;第二個 zipWith 操作通過合并函數把元素類型變為 String。

Flux.just("a", "b")

        .zipWith(Flux.just("c", "d"))

        .subscribe(System.out::println);

Flux.just("a", "b")

        .zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))

        .subscribe(System.out::println);

1

2

3

4

5

6

take

take 系列操作符用來從目前流中提取元素。提取的方式可以有很多種。

方法    注釋

take(long n),take(Duration timespan)和 takeMillis(long timespan)    按照指定的數量或時間間隔來提取

takeLast(long n)    提取流中的最後 N 個元素

takeUntil(Predicate<? super T> predicate)    提取元素直到 Predicate 傳回 true

takeWhile(Predicate<? super T> continuePredicate)    當 Predicate 傳回 true 時才進行提取

takeUntilOther(Publisher<?> other)    提取元素直到另外一個流開始産生元素

Flux.range(1, 1000).take(10).subscribe(System.out::println);

Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);

Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);

Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);

1

2

3

4

merge 和 mergeSequential

merge 和 mergeSequential 操作符用來把多個流合并成一個 Flux 序列。不同之處在于 merge 按照所有流中元素的實際産生順序來合并,而 mergeSequential 則按照所有流被訂閱的順序,以流為機關進行合并。

代碼清單 11 中分别使用了 merge 和 mergeSequential 操作符。進行合并的流都是每隔 100 毫秒産生一個元素,不過第二個流中的每個元素的産生都比第一個流要延遲 50 毫秒。在使用 merge 的結果流中,來自兩個流的元素是按照時間順序交織在一起;而使用 mergeSequential 的結果流則是首先産生第一個流中的全部元素,再産生第二個流中的全部元素。

Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

        .toStream()

        .forEach(System.out::println);

Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

        .toStream()

        .forEach(System.out::println);

1

2

3

4

5

6

flatMap 和 flatMapSequential

flatMap 和 flatMapSequential 操作符把流中的每個元素轉換成一個流,再把所有流中的元素進行合并。flatMapSequential 和 flatMap 之間的差別與 mergeSequential 和 merge 之間的差別是一樣的。

Flux.just(5, 10)

        .flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

        .toStream()

        .forEach(System.out::println);

1

2

3

4

測試

在對使用 Reactor 的代碼進行測試時,需要用到 io.projectreactor:reactor-test 庫。

使用 StepVerifier

進行測試時的一個典型的場景是對于一個序列,驗證其中所包含的元素是否符合預期。StepVerifier 的作用是可以對序列中包含的元素進行逐一驗證。在代碼清單 21 中,需要驗證的流中包含 a 和 b 兩個元素。通過 StepVerifier.create()方法對一個流進行包裝之後再進行驗證。expectNext()方法用來聲明測試時所期待的流中的下一個元素的值,而 verifyComplete()方法則驗證流是否正常結束。類似的方法還有 verifyError()來驗證流由于錯誤而終止。

StepVerifier.create(Flux.just("a", "b"))

        .expectNext("a")

        .expectNext("b")

        .verifyComplete();

1

2

3

4

調試

由于反應式程式設計範式與傳統程式設計範式的差異性,使用 Reactor 編寫的代碼在出現問題時比較難進行調試。為了更好的幫助開發人員進行調試,Reactor 提供了相應的輔助功能。

啟用調試模式

Hooks類中有許多方法可以進行調試

使用檢查點

另外一種做法是通過 checkpoint 操作符來對特定的流處理鍊來啟用調試模式。代碼清單 25 中,在 map 操作符之後添加了一個名為 test 的檢查點。當出現錯誤時,檢查點名稱會出現在異常堆棧資訊中。對于程式中重要或者複雜的流處理鍊,可以在關鍵的位置上啟用檢查點來幫助定位可能存在的問題。

Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);

1

“冷”與“熱”序列

之前的代碼清單中所建立的都是冷序列。冷序列的含義是不論訂閱者在何時訂閱該序列,總是能收到序列中産生的全部消息。而與之對應的熱序列,則是在持續不斷地産生消息,訂閱者隻能擷取到在其訂閱之後産生的消息。