Observable
在RxJava1.x中,最熟悉的莫過于Observable這個類了,筆者剛使用RxJava2.x時,建立一個Observable後,頓時是懵逼的。因為我們熟悉的Subscriber居然沒影了,取而代之的是ObservableEmitter,俗稱發射器。此外,由于沒有了Subscriber的蹤影,我們建立觀察者時需使用Observer。而Observer也不是我們熟悉的那個Observer,其回調的Disposable參數更是讓人摸不到頭腦。
廢話不多說,從會用開始,還記得使用RxJava的三部曲嗎?
第一步:初始化一個Observable
1 Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
2
3 @Override
4 public void subscribe(ObservableEmitter<Integer> e) throws Exception {
5 e.onNext(1);
6 e.onNext(2);
7 e.onComplete();
8 }
9 });
第二步:初始化一個Observer
Observer<Integer> observer= new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
第三部:建立訂閱關系
observable.subscribe(observer); //建立訂閱關系
不難看出,與RxJava1.x還是存在着一些差別的。首先,建立Observable時,回調的是ObservableEmitter,字面意思即發射器,用于發射資料(onNext)和通知(onError/onComplete)。其次,建立的Observer中多了一個回調方法onSubscribe,傳遞參數為Disposable ,Disposable相當于RxJava1.x中的Subscription,用于解除訂閱。你可能納悶為什麼不像RxJava1.x中訂閱時傳回Disposable,而是選擇回調出來呢。官方說是為了設計成Reactive-Streams架構。不過仔細想想這麼一個場景還是很有用的,假設Observer需要在接收到異常資料項時解除訂閱,在RxJava2.x中則非常簡便,如下操作即可。
Observer<Integer> observer = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(Integer value) {
Log.d("JG", value.toString());
if (value > 3) { // >3 時為異常資料,解除訂閱
disposable.dispose();
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
此外,RxJava2.x中仍然保留了其他簡化訂閱方法,我們可以根據需求,選擇相應的簡化訂閱。隻不過傳入的對象改為了Consumer。
Disposable disposable = observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//這裡接收資料項
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//這裡接收onError
}
}, new Action() {
@Override
public void run() throws Exception {
//這裡接收onComplete。
}
});
不同于RxJava1.x,RxJava2.x中沒有了一系列的Action/Func接口,取而代之的是與Java8命名類似的函數式接口,如下圖:
其中Action類似于RxJava1.x中的Action0,差別在于Action允許抛出異常。
public interface Action {
/**
* Runs the action and optionally throws a checked exception
* @throws Exception if the implementation wishes to throw a checked exception
*/
void run() throws Exception;
}
而Consumer即消費者,用于接收單個值,BiConsumer則是接收兩個值,Function用于變換對象,Predicate用于判斷。這些接口命名大多參照了Java8,熟悉Java8新特性的應該都知道意思,這裡也就不再贅述了。
線程排程
關于線程切換這點,RxJava1.x和RxJava2.x的實作思路是一樣的。這裡就簡單看下相關源碼。
subscribeOn
同RxJava1.x一樣,subscribeOn用于指定subscribe()時所發生的線程,從源碼角度可以看出,内部線程排程是通過ObservableSubscribeOn來實作的。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
ObservableSubscribeOn的核心源碼在subscribeActual方法中,通過代理的方式使用SubscribeOnObserver包裝Observer後,設定Disposable來将subscribe切換到Scheduler線程中
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent); //回調Disposable
parent.setDisposable(scheduler.scheduleDirect(new Runnable() { //設定`Disposable`
@Override
public void run() {
source.subscribe(parent); //使Observable的subscribe發生在Scheduler線程中
}
}));
}
observeOn
observeOn方法用于指定下遊Observer回調發生的線程。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//..
//驗證安全
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
主要實作在ObservableObserveOn中的subscribeActual,可以看出,不同于subscribeOn,沒有将suncribe操作全部切換到Scheduler中,而是通過ObserveOnSubscriber與Scheduler配合,通過schedule()達到切換下遊Observer回調發生的線程,這一點與RxJava1.x實作幾乎相同。關于ObserveOnSubscriber的源碼這裡不再重複描述了,有興趣的可以檢視本人RxJava源碼解讀這篇文章
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnSubscriber<T>(observer, w, delayError, bufferSize));
}
}
Flowable
Flowable是RxJava2.x中新增的類,專門用于應對背壓(Backpressure)問題,但這并不是RxJava2.x中新引入的概念。所謂背壓,即生産者的速度大于消費者的速度帶來的問題,比如在Android中常見的點選事件,點選過快則會造成點選兩次的效果。
我們知道,在RxJava1.x中背壓控制是由Observable完成的,使用如下:
Observable.range(1,10000)
.onBackpressureDrop()
.subscribe(integer -> Log.d("JG",integer.toString()));
而在RxJava2.x中将其獨立了出來,取名為Flowable。是以,原先的Observable已經不具備背壓處理能力。
通過Flowable我們可以自定義背壓處理政策。
測試Flowable例子如下:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int i=0;i<10000;i++){
e.onNext(i);
}
e.onComplete();
}
}, FlowableEmitter.BackpressureMode.ERROR) //指定背壓處理政策,抛出異常
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("JG", integer.toString());
Thread.sleep(1000);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d("JG",throwable.toString());
}
});
其中還需要注意的一點在于,Flowable并不是訂閱就開始發送資料,而是需等到執行Subscription#request才能開始發送資料。當然,使用簡化subscribe訂閱方法會預設指定Long.MAX_VALUE。手動指定的例子如下:
Flowable.range(1,10).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);//設定請求數
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Single
不同于RxJava1.x中的SingleSubscriber,RxJava2中的SingleObserver多了一個回調方法onSubscribe。
interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
}
Completable
同Single,Completable也被重新設計為Reactive-Streams架構,RxJava1.x的CompletableSubscriber改為CompletableObserver,源碼如下:
interface CompletableObserver<T> {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
}
Subject/Processor
Processor和Subject的作用是相同的。關于Subject部分,RxJava1.x與RxJava2.x在用法上沒有顯著差別,這裡就不介紹了。其中Processor是RxJava2.x新增的,繼承自Flowable,是以支援背壓控制。而Subject則不支援背壓控制。使用如下:
//Subject
AsyncSubject<String> subject = AsyncSubject.create();
subject.subscribe(o -> Log.d("JG",o));//three
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onComplete();
//Processor
AsyncProcessor<String> processor = AsyncProcessor.create();
processor.subscribe(o -> Log.d("JG",o)); //three
processor.onNext("one");
processor.onNext("two");
processor.onNext("three");
processor.onComplete();
關于操作符,RxJava1.x與RxJava2.x在命名和行為上大多數保持了一緻,部分操作符請查閱文檔。