天天看點

java9 響應式程式設計支援概述響應式程式設計接口demo

概述

java9開始,官方支援了響應式程式設計規範,提供了頂級的響應式程式設計接口。

java11開始,官方提供了支援http2的、友好的http用戶端

java.net.http

,該用戶端就是jdk内部第一個基于響應式程式設計規範的實作。

響應式程式設計接口

package java.util.concurrent;
//這是Flow API的主要類。該類封裝了Flow API的所有重要接口。這是一個final類,我們不能擴充它。
public final class Flow {

    private Flow() {} // uninstantiable
    //釋出者
    @FunctionalInterface
    public static interface Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
    }
    //訂閱者
    public static interface Subscriber<T> {
        //這是訂閱者訂閱了釋出者後接收消息時調用的第一個方法
        public void onSubscribe(Subscription subscription);
        public void onNext(T item);
        public void onError(Throwable throwable);
        public void onComplete();
    }
    
    public static interface Subscription {
        public void request(long n);
        public void cancel();
    }
    //此接口同時擴充了Publisher和Subscriber接口,用于在釋出者和訂閱者之間轉換消息。
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }
    static final int DEFAULT_BUFFER_SIZE = 256;

    public static int defaultBufferSize() {
        return DEFAULT_BUFFER_SIZE;
    }
}           

該響應式接口規範與reactor中的基本一緻,每個接口及方法含義請參考

reactor3 源碼分析

demo

這裡我們隻是簡單示範一下。借用

SubmissionPublisher

作為釋出者。

import java.util.Arrays;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class Java9FlowTest {
    public static class MySubscriber<T> implements Flow.Subscriber<T> {
        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1); //這裡要使用Long.MAX_VALUE就會被認為擷取無窮的資料。
        }
        @Override
        public void onNext(T item) {
            System.out.println("Got : " + item);
            subscription.request(1);
        }
        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }
        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        //注冊訂閱者
        MySubscriber<Integer> subscriber = new MySubscriber<>();
        publisher.subscribe(subscriber);

        //釋出資訊
        for(int i = 0 ; i < 10 ; i ++){
            publisher.submit(i);
            TimeUnit.SECONDS.sleep(1);
        }
        publisher.close();

        TimeUnit.SECONDS.sleep(100);//為了等待結束
    }
}           

輸出結果

Got : 0
Got : 1
Got : 2
Got : 3
Got : 4
Done