天天看點

Project Reactor 響應式程式設計

目錄

  • 一. 什麼是響應式程式設計?
  • 二. Project Reactor介紹
  • 三. Reactor核心概念
    • Flux
      • 1. just()
      • 2. fromArray(),fromIterable()和 fromStream()
      • 3. empty()
      • 4. error(Throwable error)
      • 5. never():建立一個不包含任何消息通知的序列。
      • 6. range(int start, int count)
      • 7. interval(Duration period)和 interval(Duration delay, Duration period)
      • 8. intervalMillis(long period)和 intervalMillis(long delay, long period)
    • Mono
      • 1. fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier()
      • 2. delay(Duration duration)和 delayMillis(long duration)
      • 3. ignoreElements(Publisher source)
      • 4. justOrEmpty(Optional data)和 justOrEmpty(T data)
    • 操作符
      • 1. buffer 和 bufferTimeout
      • 2. filter
      • 3. window
      • 4. zipWith
      • 5. take
      • 6. reduce 和 reduceWith
      • 7. merge 和 mergeSequential
      • 8. flatMap 和 flatMapSequential
      • 9. concatMap 和 combineLatest
  • 四. 結束
在計算中,響應式程式設計或反應式程式設計是一種面向資料流和變化傳播的聲明式程式設計範式。這意味着可以在程式設計語言中很友善地表達靜态或動态的資料流,而相關的計算模型會自動将變化的值通過資料流進行傳播。

上面一段話來自維基百科。

響應式程式設計顧名思義就是在于

響應

二字,我們需要在某個事件發生時做出響應。

我們現實生活就是對響應式很好的解釋,我們人類的舉動大多都是基于事件驅動模式,當有人呼喊你的名字,你會根據這個事件來判斷要不要進行應答,這個過程其實就是産生事件,然後我們作為消費者對事件進行處理,而我們的處理結果也會繼續向下傳遞。

在響應式程式設計中,通常是采用異步回調的方式,回調方法的調用和控制則會由響應式架構來完成,對于應用開發來說隻需要關注回調方法的實作就可以了。

這裡提一個著名的設計原則:好萊塢原則(Hollywood principle)

Don't call us, we will call you.

演員送出履歷之後,回家等着就好,演藝公司會主動打電話給你。

Java中最早的Reactor庫RxJava借鑒于.Net的Reactor Extensions,後來Jdk在Java9提供了标準化的響應式庫實作

java.util.concurrent.Flow

,再後來,Project Reactor作為第四代響應式程式設計架構出現,它是一個完全非阻塞響應式程式設計的基石,直接內建了Java函數式API,特别是CompletableFuture,Stream和Duration。Reactor Netty實作了非阻塞跨程序通信,提升了服務間通信效率。

我們在平常開發中,異步程式設計無非是使用JUC包下的工具類或者一些Java同步語義。

  • 阻塞等待:如 Future.get()
  • 不安全的資料通路:如 ReentrantLock.lock()
  • 異常冒泡:如 try…​catch…​finally
  • 同步阻塞:如 synchronized{ }
  • Wrapper配置設定(GC 壓力):如 new Wrapper(event)

或者自定義線程池,但也會遇到諸如一下的問題。

  • Callable 配置設定 -- 可能導緻 GC 壓力。
  • 同步過程強制每個線程執行停-檢查操作。
  • 消息的消費可能比生産慢。
  • 使用線程池(ThreadPool)将任務傳遞給目标線程 -- 通過 FutureTask 方式肯定會産生 GC 壓力。
  • 阻塞直至IO回調。

上面等等問題都會造成的系統性能瓶頸或者安全問題,在Future.get時我們無法避免阻塞等待,最差情況下程式運作其實還是同步的,使用Reactor不但可以很有效的解決上述問題,還能讓我們寫出更加簡潔明了的代碼。

代碼: https://github.com/CasterWx/reactor-ppt

Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結束的消息和序列出錯的消息。當消息通知産生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。

可以指定序列中包含的全部元素。建立出來的 Flux 序列在釋出這些元素之後會自動結束。

Flux.just("hello", "world")
	.doOnNext((i) -> {
            System.out.println("[doOnNext] " + i);
        })
	.doOnComplete(() -> System.out.println("[doOnComplete]"))
	.subscribe(i -> System.out.println("[subscribe] " + i));

// 執行結果
[doOnNext] hello
[subscribe] hello
[doOnNext] world
[subscribe] world
[doOnComplete]
           

可以從一個數組、Iterable 對象或 Stream 對象中建立 Flux 對象。

List<String> arr = Arrays.asList("flux", "mono", "reactor", "core");
Flux.fromIterable(arr)
	.doOnNext((i) -> {
            System.out.println("[doOnNext] " + i);
        })
	.subscribe((i) -> {
            System.out.println("[subscribe] " + i);
        });
//執行結果
[doOnNext] flux
[subscribe] flux
[doOnNext] mono
[subscribe] mono
[doOnNext] reactor
[subscribe] reactor
[doOnNext] core
[subscribe] core
           

建立一個不包含任何元素,隻釋出結束消息的序列。

Flux.empty()
	.doOnNext(i -> {
            System.out.println("[doOnNext] " + i);
        }).doOnComplete(() -> {
            System.out.println("[DoOnComplete] ");
        }).subscribe(i -> {
            System.out.println("[subscribe] " + i);
        });
//執行結果
[DoOnComplete]
           

建立一個隻包含錯誤消息的序列。

try {
	int []arr = new int[5];
	arr[10] = 2;
} catch (Exception e) {
	Flux.error(e).subscribe(i -> {
	System.out.println("error subscribe");
	});
}
//執行結果
           

Flux.never()
	.doOnNext(i -> {
            System.out.println("doOnNext " + i);
        }).doOnComplete(() -> {
            System.out.println("doOnComplete");
        }).subscribe((i) -> {
            System.out.println("subscribe " + i);
        });
//執行結果
空
           

建立包含從 start 起始的 count 個數量的 Integer 對象的序列。

Flux.range(5, 10)
	.doOnNext(i -> {
            System.out.println("doOnNext " + i);
        }).doOnComplete(() -> {
            System.out.println("doOnComplete");
        }).subscribe((i) -> {
            System.out.println("subscribe " + i);
        });
//執行結果
doOnNext 5
subscribe 5
doOnNext 6
subscribe 6
doOnNext 7
subscribe 7
doOnNext 8
subscribe 8
doOnNext 9
subscribe 9
doOnNext 10
subscribe 10
doOnNext 11
subscribe 11
doOnNext 12
subscribe 12
doOnNext 13
subscribe 13
doOnNext 14
subscribe 14
doOnComplete
           

建立一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間隔來釋出。除了間隔時間之外,還可以指定起始元素釋出之前的延遲時間。

Flux.interval(Duration.ofSeconds(4), Duration.ofSeconds(2))
	.doOnNext(i -> {
            System.out.println("doOnNext " + i);
        }).doOnComplete(() -> {
            System.out.println("doOnComplete " + new Date());
        }).subscribe((i) -> {
            System.out.println("subscribe " + i + ", date: " + new Date());
        });
try {
	Thread.sleep(10000);
} catch (InterruptedException e) {
	e.printStackTrace();
}
//執行結果
doOnNext 0
subscribe 0, date: Fri Jun 25 10:17:56 CST 2021
doOnNext 1
subscribe 1, date: Fri Jun 25 10:17:58 CST 2021
doOnNext 2
subscribe 2, date: Fri Jun 25 10:18:00 CST 2021
doOnNext 3
subscribe 3, date: Fri Jun 25 10:18:02 CST 2021
           

上面執行個體為什麼沒有輸出doOnComplete, 從第四秒開始,每兩秒生産一個元素,等到最後complete時已經到了sleep的十秒時間,主線程main已經推出。

與 interval()方法的作用相同,隻不過該方法通過毫秒數來指定時間間隔和延遲時間。

Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。

分别從 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中建立 Mono。

Mono.fromCallable(() -> {
            System.out.println("begin callable");
            return "Hello";
        })
	.subscribeOn(Schedulers.elastic())
	.doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
	.subscribe(System.out::println);
Thread.sleep(10000);
//執行結果
begin callable
doOnNext Hello, thread :elastic-2
Hello
           
Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
            System.out.println("begin");
            return "hello";
        }))
	.subscribeOn(Schedulers.elastic())
	.doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
	.subscribe(System.out::println);
Thread.sleep(10000);
//執行結果
begin
doOnNext hello, thread :elastic-2
hello
           

建立一個 Mono 序列,在指定的延遲時間之後,産生數字 0 作為唯一值。

Mono.delay(Duration.ofSeconds(1)).subscribe(System.out::println);
Thread.sleep(3000);
//執行結果, 延遲一秒後列印
0
           

建立一個 Mono 序列,忽略作為源的 Publisher 中的所有元素,隻産生結束消息。

Mono.ignoreElements((i) -> {
            System.out.println("ignoreElements");
        })
	.doOnNext((i) -> System.out.println("doOnNext " + i))
        .subscribe(System.out::println);
//執行結果
ignoreElements
           

4. justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data)

從一個 Optional 對象或可能為 null 的對象中建立 Mono。隻有 Optional 對象中包含值或對象不為 null 時,Mono 序列才産生對應的元素。

Optional<Integer> optional = Optional.empty();
Mono.justOrEmpty(optional)
	.doOnNext((i) -> System.out.println("doOnNext " + i))
	.subscribe(System.out::println);

System.out.println("========");

optional = Optional.of(100);
Mono.justOrEmpty(optional)
	.doOnNext((i) -> System.out.println("doOnNext " + i))
	.subscribe(System.out::println);
//執行結果
========
doOnNext 100
100
           

這兩個操作符的作用是把目前流中的元素收集到集合中,并把集合對象作為流中的新元素。在進行收集時可以指定不同的條件:所包含的元素的最大數量或收集的時間間隔。方法 buffer()僅使用一個條件,而 bufferTimeout()可以同時指定兩個條件。指定時間間隔時可以使用 Duration 對象或毫秒數,即使用 bufferMillis()或 bufferTimeoutMillis()兩個方法。

除了元素數量和時間間隔之外,還可以通過 bufferUntil 和 bufferWhile 操作符來進行收集。這兩個操作符的參數是表示每個集合中的元素所要滿足的條件的 Predicate 對象。bufferUntil 會一直收集直到 Predicate 傳回為 true。使得 Predicate 傳回 true 的那個元素可以選擇添加到目前集合或下一個集合中;bufferWhile 則隻有當 Predicate 傳回 true 時才會收集。一旦值為 false,會立即開始下一次收集。

Flux.range(1, 100).buffer(20).subscribe(System.out::println);

//執行結果
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
           

對流中包含的元素進行過濾,隻留下滿足 Predicate 指定條件的元素。

Flux.range(1, 10)
	.filter(i -> i%2==0)
        .doOnNext(i -> {
		System.out.println("[doOnNext] " + i);
	})
	.subscribe(i -> {
		System.out.println("subscribe " + i);
	});
//執行結果
[doOnNext] 2
subscribe 2
[doOnNext] 4
subscribe 4
[doOnNext] 6
subscribe 6
[doOnNext] 8
subscribe 8
[doOnNext] 10
subscribe 10
           

window 操作符的作用類似于 buffer,所不同的是 window 操作符是把目前流中的元素收集到另外的 Flux 序列中,是以傳回值類型是 Flux。

Flux.range(1, 15).window(5)
	.doOnNext((flux -> {}))
	.subscribe(flux -> {
		flux.doOnNext((item) -> {
			System.out.println("[window] flux: " + item);
		})
	.doOnComplete(() -> System.out.println("flux item complete"))
	.subscribe();
});
// 執行結果
[window] flux: 1
[window] flux: 2
[window] flux: 3
[window] flux: 4
[window] flux: 5
flux item complete
[window] flux: 6
[window] flux: 7
[window] flux: 8
[window] flux: 9
[window] flux: 10
flux item complete
[window] flux: 11
[window] flux: 12
[window] flux: 13
[window] flux: 14
[window] flux: 15
flux item complete
           

zipWith 操作符把目前流中的元素與另外一個流中的元素按照一對一的方式進行合并。在合并時可以不做任何處理,由此得到的是一個元素類型為 Tuple2 的流;也可以通過一個 BiFunction 函數對合并的元素進行處理,所得到的流的元素類型為該函數的傳回值。

Flux.just("Hello", "Project")
	.zipWith(Flux.just("World", "Reactor"))
	.subscribe(System.out::println);

System.out.println("======");

Flux.just("Hello", "Project")
	.zipWith(Flux.just("World", "Reactor"), (s1, s2) -> String.format("%s!%s!", s1, s2))
	.subscribe(System.out::println);
// 執行結果
Hello,World
Project,Reactor
======
Hello!World!
Project!Reactor!
           

take 系列操作符用來從目前流中提取元素。提取的方式可以有很多種。

take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的數量或時間間隔來提取。

Flux.range(1, 10).take(2).subscribe(System.out::println);
// 執行結果
1
2
           
  1. takeLast(long n):提取流中的最後 N 個元素。
Flux.range(1, 10).takeLast(2).subscribe(System.out::println);
// 執行結果
9
10
           
  1. takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 傳回 true。
Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);
// 執行結果
1
2
3
4
5
6
           
  1. takeWhile(Predicate<? super T> continuePredicate): 當 Predicate 傳回 true 時才進行提取。
Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println);
// 執行結果
1
2
3
4
           
  1. takeUntilOther(Publisher<?> other):提取元素直到另外一個流開始産生元素。
Flux.range(1, 5).takeUntilOther((i) -> {
	try {
		Thread.sleep(1000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}).subscribe(System.out::println);
// 執行結果,暫停1000ms後開始輸出
1
2
3
4
5
           

reduce 和 reduceWith 操作符對流中包含的所有元素進行累積操作,得到一個包含計算結果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的。在操作時可以指定一個初始值。如果沒有初始值,則序列的第一個元素作為初始值。

Flux.range(1, 10)
	.reduce((x, y) -> {
		System.out.println("x:" + x + ", y:" + y);
		return x+y;
	})
	.subscribe(System.out::println);
// 執行結果
x:1, y:2
x:3, y:3
x:6, y:4
x:10, y:5
x:15, y:6
x:21, y:7
x:28, y:8
x:36, y:9
x:45, y:10
55
           
Flux.range(1, 10)
	.reduceWith(() -> 100, (x, y) -> {
	 System.out.println("x:" + x + ", y:" + y);
                    return x+y;
                })
                .subscribe(System.out::println);
// 執行結果
x:100, y:1
x:101, y:2
x:103, y:3
x:106, y:4
x:110, y:5
x:115, y:6
x:121, y:7
x:128, y:8
x:136, y:9
x:145, y:10
155
           

merge 和 mergeSequential 操作符用來把多個流合并成一個 Flux 序列。不同之處在于 merge 按照所有流中元素的實際産生順序來合并,而 mergeSequential 則按照所有流被訂閱的順序,以流為機關進行合并。

Flux.merge(Flux.interval(
	Duration.of(0, ChronoUnit.MILLIS),
	Duration.of(100, ChronoUnit.MILLIS)).take(2),
	Flux.interval(
		Duration.of(50, ChronoUnit.MILLIS),
		Duration.of(100, ChronoUnit.MILLIS)).take(2))
	.toStream()
	.forEach(System.out::println);
System.out.println("==============");
Flux.mergeSequential(Flux.interval(
	Duration.of(0, ChronoUnit.MILLIS),
	Duration.of(100, ChronoUnit.MILLIS)).take(2),
	Flux.interval(
		Duration.of(50, ChronoUnit.MILLIS),
		Duration.of(100, ChronoUnit.MILLIS)).take(2))
	.toStream()
                .forEach(System.out::println);
// 執行結果
0
0
1
1
==============
0
1
0
1
           

Flux.just(1, 2)
	.flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS), Duration.of(10, ChronoUnit.MILLIS)).take(x))
	.toStream()
	.forEach(System.out::println);
// 執行結果
0
0
1
           

Flux.just(5, 10)
	.concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
	.toStream()
	.forEach(System.out::println);

Flux.combineLatest(
	Arrays::toString,
	Flux.intervalMillis(100).take(5),
	Flux.intervalMillis(50, 100).take(5)
).toStream().forEach(System.out::println);
           

繼續閱讀