天天看点

Java,JDK11,发布订阅模式,响应式流(Reactive Streams)及背压

作者:IT小奋斗

Reactive Streams(响应式流)

概念

响应式流是以非阻塞背压方式处理异步数据流的标准,提供一组最小化的接口、方法和协议来描述必要的操作和实体。

Reactive Streams是JVM中面向流的库标准和规范:1、处理可能无限数量的元素,2、按顺序处理,3、组件之间异步传递,4、强制性非阻塞背压(Backpressure)。

Backpressure(背压)

背压是一种常用策略,使得发布者拥有无限制的缓冲区存储元素,用于确保发布者发布元素太快时,不会去压制订阅者。

发布/订阅模式

系统之间高并发的大量数据流交互通常采用异步的发布/订阅模式,数据由发布者推送给订阅者的过程中,容易产生的一个问题是,当发布者(生产者)产生的数据速度远大于订阅者(消费者)的消费速度时,消费者会承受巨大的资源压力(pressure)而有可能崩溃。

为了解决这个问题,数据流的速度需要被控制,即为流量控制(flow control),以防止快速的数据流不会压垮目标,因此,需要反压即背压(back pressure),生产者和消费者之间需要通过实现一种背压机制来互操作,实现这种背压机制要求是异步非阻塞的,如果是同步阻塞的,消费者在处理数据时生产者必须等待,会产生性能问题。

响应式流(Reactive Streams)

通过定义一组实体,接口和互操作方法,给出了实现非阻塞背压的标准。第三方遵循这个标准来实现具体的解决方案,常见的有:Reactor,RxJava,Akka Streams,Ratpack等。

该标准定义了四个接口:

#发布者Publisher:

interface Publisher<T>

    void subscribe(Subscriber<? super T> subscriber);

}           

发布者只有一个方法,用来接受订阅者进行订阅(subscribe),T代表发布者和订阅者之间传输的数据类型。

#订阅者Subscriber:
interface Subscriber<T> {
    void onSubscribe(Subscription s);
    void onNext(T t);
    void onError(Throwable t);
    void onComplete();
}           

订阅者有四个事件方法,分别在开启订阅、接收数据、发生错误和数据传输结束时被调用。

#订阅对象Subscription:
interface Subscription {
    void request(long n);
    void cancel();
}           

订阅对象是发布者和订阅者之间交互的操作对象,在发布者(Publisher)通过subscribe方法加入订阅者时,会通过调用订阅者(Subscriber)的onSubscribe把订阅对象(Subscription)传给订阅者。

订阅者拿到订阅对象后,通过调用订阅对象的request方法,根据自身消费能力请求n条数据,或者调用cancel方法来停止接收数据。

订阅对象的request方法被调用时,会触发订阅者的onNext事件方法,把数据传输给订阅者,如果数据全部传输完成,则触发订阅者的onComplete事件方法,如果数据传输发生错误,则触发订阅者的onError事件方法。

#处理者Processor
interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}           

处理者既是发布者又是订阅者,用于在发布者和订阅者之间转换数据格式,把发布者的T类型数据转换为订阅者接受的R类型数据。处理者作为数据转换的中介不是必须的。

由以上的接口可以看出,核心在于订阅者可以通过request(long n)方法来控制接收的数据量,达到了实现背压的目的。

Java对响应式流的支持

环境说明:

JDK11

代码案例:

package com.what21.demo;

import java.util.Arrays;

import java.util.concurrent.Flow;
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;

public class FlowDemo {

    public static void main(String[] args) {
        // 创建一个发布者(publisher)
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        // 创建一个订阅者,并向发布者注册
        MySubscriber<String> subscriber = new MySubscriber<>();
        publisher.subscribe(subscriber);
        // 发布多个数据项
        String[] items = {
                "one", "two", "three", "four", "five", "six",
                "seven", "eight", "nine", "ten", "eleven", "twelve"
        };
        Arrays.asList(items).stream().forEach((i) -> publisher.submit(i));
        // 关闭发布服务器
        publisher.close();

        try {
            System.out.println("Wait");
            synchronized ("A") {
                "A".wait();
            }
            System.out.println("Finish");
        } catch (InterruptedException ie) {
        }
    }

    public static class MySubscriber<T> implements Flow.Subscriber<T> {

        private Subscription subscription;

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1);
        }

        @Override
        public void onNext(T item) {
            System.out.println("Received: " + item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
            synchronized ("A") {
                "A".notifyAll();
            }
        }

        @Override
        public void onComplete() {
            synchronized ("A") {
                "A".notifyAll();
            }
            System.out.println("Done");
        }
    }

}

           

继续阅读