概述
java9開始,官方支援了響應式程式設計規範,提供了頂級的響應式程式設計接口。
java11開始,官方提供了支援http2的、友好的http用戶端
java.net.http
,該用戶端就是jdk内部第一個基于響應式程式設計規範的實作。
響應式程式設計接口
package java.util.concurrent;
//這是Flow API的主要類。該類封裝了Flow API的所有重要接口。這是一個final類,我們不能擴充它。
public final class Flow {
private Flow() {} // uninstantiable
//釋出者
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
//訂閱者
public static interface Subscriber<T> {
//這是訂閱者訂閱了釋出者後接收消息時調用的第一個方法
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
//此接口同時擴充了Publisher和Subscriber接口,用于在釋出者和訂閱者之間轉換消息。
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
static final int DEFAULT_BUFFER_SIZE = 256;
public static int defaultBufferSize() {
return DEFAULT_BUFFER_SIZE;
}
}
該響應式接口規範與reactor中的基本一緻,每個接口及方法含義請參考
reactor3 源碼分析demo
這裡我們隻是簡單示範一下。借用
SubmissionPublisher
作為釋出者。
import java.util.Arrays;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class Java9FlowTest {
public static class MySubscriber<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); //這裡要使用Long.MAX_VALUE就會被認為擷取無窮的資料。
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//注冊訂閱者
MySubscriber<Integer> subscriber = new MySubscriber<>();
publisher.subscribe(subscriber);
//釋出資訊
for(int i = 0 ; i < 10 ; i ++){
publisher.submit(i);
TimeUnit.SECONDS.sleep(1);
}
publisher.close();
TimeUnit.SECONDS.sleep(100);//為了等待結束
}
}
輸出結果
Got : 0
Got : 1
Got : 2
Got : 3
Got : 4
Done