天天看點

Java,JDK11,釋出訂閱模式,響應式流(Reactive Streams)及背壓

作者:IT小奮鬥

Reactive Streams(響應式流)

概念

響應式流是以非阻塞背壓方式處理異步資料流的标準,提供一組最小化的接口、方法和協定來描述必要的操作和實體。

Reactive Streams是JVM中面向流的庫标準和規範:1、處理可能無限數量的元素,2、按順序處理,3、元件之間異步傳遞,4、強制性非阻塞背壓(Backpressure)。

Backpressure(背壓)

背壓是一種常用政策,使得釋出者擁有無限制的緩沖區存儲元素,用于確定釋出者釋出元素太快時,不會去壓制訂閱者。

釋出/訂閱模式

系統之間高并發的大量資料流互動通常采用異步的釋出/訂閱模式,資料由釋出者推送給訂閱者的過程中,容易産生的一個問題是,當釋出者(生産者)産生的資料速度遠大于訂閱者(消費者)的消費速度時,消費者會承受巨大的資源壓力(pressure)而有可能崩潰。

為了解決這個問題,資料流的速度需要被控制,即為流量控制(flow control),以防止快速的資料流不會壓垮目标,是以,需要反壓即背壓(back pressure),生産者和消費者之間需要通過實作一種背壓機制來互操作,實作這種背壓機制要求是異步非阻塞的,如果是同步阻塞的,消費者在處理資料時生産者必須等待,會産生性能問題。

響應式流(Reactive Streams)

通過定義一組實體,接口和互操作方法,給出了實作非阻塞背壓的标準。第三方遵循這個标準來實作具體的解決方案,常見的有:Reactor,RxJava,Akka Streams,Ratpack等。

該标準定義了四個接口:

#釋出者Publisher:

interface Publisher<T>

    void subscribe(Subscriber<? super T> subscriber);

}           

釋出者隻有一個方法,用來接受訂閱者進行訂閱(subscribe),T代表釋出者和訂閱者之間傳輸的資料類型。

#訂閱者Subscriber:
interface Subscriber<T> {
    void onSubscribe(Subscription s);
    void onNext(T t);
    void onError(Throwable t);
    void onComplete();
}           

訂閱者有四個事件方法,分别在開啟訂閱、接收資料、發生錯誤和資料傳輸結束時被調用。

#訂閱對象Subscription:
interface Subscription {
    void request(long n);
    void cancel();
}           

訂閱對象是釋出者和訂閱者之間互動的操作對象,在釋出者(Publisher)通過subscribe方法加入訂閱者時,會通過調用訂閱者(Subscriber)的onSubscribe把訂閱對象(Subscription)傳給訂閱者。

訂閱者拿到訂閱對象後,通過調用訂閱對象的request方法,根據自身消費能力請求n條資料,或者調用cancel方法來停止接收資料。

訂閱對象的request方法被調用時,會觸發訂閱者的onNext事件方法,把資料傳輸給訂閱者,如果資料全部傳輸完成,則觸發訂閱者的onComplete事件方法,如果資料傳輸發生錯誤,則觸發訂閱者的onError事件方法。

#處理者Processor
interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}           

處理者既是釋出者又是訂閱者,用于在釋出者和訂閱者之間轉換資料格式,把釋出者的T類型資料轉換為訂閱者接受的R類型資料。處理者作為資料轉換的中介不是必須的。

由以上的接口可以看出,核心在于訂閱者可以通過request(long n)方法來控制接收的資料量,達到了實作背壓的目的。

Java對響應式流的支援

環境說明:

JDK11

代碼案例:

package com.what21.demo;

import java.util.Arrays;

import java.util.concurrent.Flow;
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;

public class FlowDemo {

    public static void main(String[] args) {
        // 建立一個釋出者(publisher)
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        // 建立一個訂閱者,并向釋出者注冊
        MySubscriber<String> subscriber = new MySubscriber<>();
        publisher.subscribe(subscriber);
        // 釋出多個資料項
        String[] items = {
                "one", "two", "three", "four", "five", "six",
                "seven", "eight", "nine", "ten", "eleven", "twelve"
        };
        Arrays.asList(items).stream().forEach((i) -> publisher.submit(i));
        // 關閉釋出伺服器
        publisher.close();

        try {
            System.out.println("Wait");
            synchronized ("A") {
                "A".wait();
            }
            System.out.println("Finish");
        } catch (InterruptedException ie) {
        }
    }

    public static class MySubscriber<T> implements Flow.Subscriber<T> {

        private Subscription subscription;

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1);
        }

        @Override
        public void onNext(T item) {
            System.out.println("Received: " + item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
            synchronized ("A") {
                "A".notifyAll();
            }
        }

        @Override
        public void onComplete() {
            synchronized ("A") {
                "A".notifyAll();
            }
            System.out.println("Done");
        }
    }

}

           

繼續閱讀