天天看點

java響應式程式設計實踐與原了解析

背景

在傳統的指令式程式設計模式下,程式都是按照人工編寫的指令一條條順序的同步執行,也就是說,隻有目前指令運作完畢,下一條指令才開始執行。那麼傳統的指令式程式設計有有些線程處理模型呢?

首先是同步阻塞式,在這種模型下,隻有阻塞操作完成後,程式才能夠繼續執行。而且阻塞會浪費資源,比如等待網絡連接配接(資料庫請求,其他服務請求),就會導緻執行線程處于空閑狀态。

第二種就是異步阻塞式,在這種方式下一般會通過線程池,建立很多線程,然後針對請求,配置設定空閑的線程來處理。每個處理線程當遇到阻塞操作時,還是會中斷等待操作完成,不過相對于同步阻塞的模式,減少了任務的響應時間。通過增加并行度,提升了資源使用率

第三種是異步非阻塞,通過回調方法來摒棄阻塞操作帶來的資源浪費。不過回調函數會層層嵌套,導緻回調噩夢(callback hell),讓可讀性變得很差。

為了利用第三種模型的優勢,同時又讓代碼維護性更高,spring社群推出了spring flux響應式非阻塞程式設計。它預設的實作叫projectreactor。projectreactor是JVM的完全非阻塞響應式程式設計基礎,具有高效的需求管理(以管理“背壓”的形式).它提供了可組合的異步序列API Flux(用于[0…N]元素)和 Mono(用于[0 | 1]元素),廣泛地實作了Reactive Extensions規範。

特點

響應式程式設計的特點包括以下幾點。待會會通過例子給大家詳細展示下。

· 可組合性&可讀性

· 直到訂閱才會發生任何事情

· 采用背壓或通過其他方式消費者向生産者發出排放率過高的信号的能力

· 具體豐富的資料流運算符

· 高水準但高價值的抽象是并發性不可知的

實作

projectreactor引入了可組合的反應類型,它們實作Publisher同時也提供了豐富的操作符,尤其是 Flux 和Mono 。

Flux 表示一個0..N項的反應序列,可以有 完成信号、錯誤資訊來結束整個流程。是以傳輸的資料為一個普通值、一個完成信号、一個錯誤信号。對應的方法為onNext()、onComplete()、onError()。

而一個Mono對象表示一個單值或空的(0..1)結果,可以認為是一種特殊的 Flux,最多可以發出一個普通值,同樣包含onComplete()、onError()。

示例

  • 靜态資料建立

直接調用just()方法進行建立,也可以通過一個Stream或者一個Iterable對象(比如List)。還有通過Flux靜态方法來生成,range方法(這個方法生成的是一個 Integer 序列,第一個參數表示起始數字,第二個參數表示,生成的個數,這裡生成的資料就為1、2、3),empty() 方法就是生成一個空的序列。

Flux<String> flux1 = Flux.just("one", "two", "three");
Flux<String> flux2 = Flux.fromStream(Stream.of("one", "two", "three"));
List<String> iterable = Arrays.asList("one", "two", "three");
Flux<String> flux3 = Flux.fromIterable(iterable);
Flux<Integer> flux4 = Flux.range(1, 3);
//或者通過 #empty() 生成空資料
Flux<String> fluxEmpty = Flux.empty();           

Mono 也有類似的建立方法,隻是對于的 just() 方法是對應隻是一個參數。而 justOrEmpty() 方法會對空值進行校驗,選擇調用 just() 或者 empty()

//Mono 也是類型
Mono<String> monoEmpty = Mono.empty();

Mono<String> mono1 = Mono.just("one");

//justOrEmpty 可以保證傳入參數為空時也不會報錯
Mono<String> mono2 = Mono.justOrEmpty(null);           
  • 動态資料建立

動态資料建立方法主要有generate與create兩種方法。

對于generate 方法,在Flux中有3個重載方法,不管是哪個方法都是會包含一個循環構造函數。在每個循環中,sink.next()方法最多被調用一次。比如在 flux_generate1 這個執行個體對應的方法。循環生成1~10的序列,當atomicInteger大于10的時候就調用complete()方法,發出資訊通知訂閱者。flux_generate2 執行個體對應的方法則将atomicInteger作為一個對象,在方法中進行傳遞,并且在最後列印在控制台上。

// generate 生成,調用 next 即生成資料,complete 則是完成了整個流程
// 一個循環中隻允許調用 next 方式一次
AtomicInteger atomicInteger = new AtomicInteger();
Flux<Integer> flux_generate1 = Flux.generate(sink -> {
if(atomicInteger.incrementAndGet() > 10){
sink.complete();
}
sink.next(atomicInteger.get());
});

Flux<Integer> flux_generate2 = Flux.generate(() -> 0, (integer, sink) -> {
if (++integer > 10) {
sink.complete();
}
sink.next(integer);
return integer;
}, integer -> {
System.out.println("last integer value is " + integer);
});           

執行過程解析

為了更好的了解flux的底層實作邏輯和程式設計思想,我們下面會給大家詳細的示範下flux.create方法的執行。尤其是前面提到的直到訂閱才會發生任何事情,這句話的真實含義。

flux.create((t) -> {
 t.next("create");
 t.next("create1");

}).subscribe(st->{
 System.out.println(st);
});           

上面是我們要執行的一段代碼。通過debug我們可以看到如下的執行過程.

  • Flux.create方法接受一個函數式接口Consumer作為輸入參數,在我們這個例子中就是
(t) -> {
 t.next("create");
 t.next("create1");
},           

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {

return create(emitter, OverflowStrategy.BUFFER);

}

  • 我們一路追蹤下去,發現它把我們的函數式接口指派給了Fluxcreate對象的一個屬性source,然後就傳回了。并沒有執行這個函數式接口的邏輯
FluxCreate(Consumer<? super FluxSink<T>> source, OverflowStrategy backpressure, FluxCreate.CreateMode createMode) {
 this.source = (Consumer)Objects.requireNonNull(source, "source");
 this.backpressure = (OverflowStrategy)Objects.requireNonNull(backpressure, "backpressure");
 this.createMode = createMode;
}           
  • 那麼什麼時候執行我們的代碼邏輯呢,接着向下看。subscribe方法也是接收了一個函數式接口
(st->{
 System.out.println(st);
})           

public final Disposable subscribe(Consumer<? super T> consumer) {

Objects.requireNonNull(consumer, "consumer");

return this.subscribe(consumer, (Consumer)null, (Runnable)null);

  • 下面我們看看調用subscribe後發生了什麼
java響應式程式設計實踐與原了解析

調用subscribe函數之1

java響應式程式設計實踐與原了解析

調用subscribe函數之2

java響應式程式設計實踐與原了解析

調用subscribe函數之3

沒錯就是通過subscribe出發了Flux.create裡面的執行代碼,而這個裡面的每次next調用,又觸發了後面的subscriber的執行,最終将結果列印出來。

Connected to the target VM, address: '127.0.0.1:53984', transport: 'socket'
create
create1
Disconnected from the target VM, address: '127.0.0.1:53984', transport: 'socket'
Process finished with exit code 0           

繼續閱讀