RxJava是什麼?根據RxJava在GitHub上給出的描述:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java
大緻意思是:
RxJava—一個可以在JVM上運作的,基于觀察者模式 實作異步操作的java庫。
RxJava的作用:
就是
異步
RxJava的使用,可以使“邏輯複雜的代碼”保持極強的閱讀性。 Rxjava github位址 RxAndorid的作用:
Android中RxAndorid與RxJava配合使用; RxAndorid 封裝了
AndroidSchedulers.mainThread()
,Android開發者使用過程中,可以輕松的将任務post Andorid主線程
中,執行頁面更新操作。 RxAndroid github位址 使用方式
1、Observable
- Observable:被觀察者
- Observer:觀察者,可接收Observable發送的資料
a、Rxjava 實作線程切換:
//
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//1、“異步線程” 執行耗時操作
//2、“執行完畢” 調用onNext觸發回調,通知觀察者
e.onNext("1");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// 訂閱線程 訂閱的那一刻在訂閱線程中執行
}
@Override
public void onNext(String value) {
// “主線程”執行的方法
}
@Override
public void onError(Throwable e) {
// "主線程"執行的方法
}
@Override
public void onComplete() {
// "主線程"執行的方法
}
});
b、Rxjava 使用操作符
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
// IO 線程
// 請求網絡資料
e.onNext("123456");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
// IO 線程
// 網絡資料解析(資料轉化)
//
// throw new RequestFailException("擷取網絡請求失敗");
return 123;
}
}).doOnNext(new Consumer<Integer>() { //儲存登入結果UserInfo
@Override
public void accept(@NonNull Integer bean) throws Exception {
// IO 線程
// 儲存網絡資料
}
}).subscribeOn(Schedulers.io()) //IO線程
.observeOn(AndroidSchedulers.mainThread()) //主線程
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer bean) throws Exception {
// 更新UI
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
// 錯誤 顯示錯誤頁面
}
});
2、Flowable
Flowable是為了應對
Backpressure
産生的。
Flowable是一個
被觀察者
,與
Subscriber(觀察者)
配合使用
//
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
//1、“異步線程” 執行耗時操作
//2、“執行完畢” 調用onNext觸發回調,通知觀察者
emitter.onNext(0);
emitter.onComplete();
}
// 若消費者消費能力不足,則抛出MissingBackpressureException異常
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// 訂閱時執行,發生在“訂閱線程”
// 這個方法是用來向生産者申請可以消費的事件數量
// 這裡表明消費者擁有Long.MAX_VALUE的消費能力
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
// “主線程”執行的方法
}
@Override
public void onError(Throwable t) {
// "主線程"執行的方法
}
@Override
public void onComplete() {
// "主線程"執行的方法
}
});
a、 Backpressure(背壓)
Backpressure(背壓)
即
生産者的生産速度
大于
消費者的消費能力
引起的問題。
在RxJava中有一種情況就是
被觀察者發送消息十分迅速
以至于
觀察者不能及時的響應這些消息
。
例如:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// “異步線程”中 生産者有無限的生産能力
while (true){
e.onNext(1);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// “主線程”中 消費者消費能力不足,進而造成事件無限堆積,最後導緻OOM
Thread.sleep(2000);
System.out.println(integer);
}
});
異步線程中
生産者有無限的生産能力;
主線程
中 消費者消費能力不足,進而造成事件無限堆積,最後導緻OOM。
上述的現象,有個專有的名詞來來形容,即:
Backpressure(背壓)
b、Subscription.request(long n);
Subscription.request(long n)
方法是用來向
生産者申請可以消費的事件數量
- 當調用了
方法後,生産者便發送對應數量的事件供消費者消費;request(long n)
- 如果
就表示不顯示調用request
消費能力為0
在異步調用時,RxJava中有個緩存池,用來緩存消費者處理不了暫時緩存下來的資料,緩存池的預設大小為128,即隻能緩存128個事件。
無論request()中傳入的數字比128大或小,緩存池中在剛開始都會存入128個事件;當然如果本身并沒有這麼多事件需要發送,則不會存128個事件。
-
政策下,如果生産者生産的事件大于128個,緩存池便會溢出,進而抛出BackpressureStrategy.ERROR
異常;MissingBackpressureException
-
政策:将RxJava中預設的128個事件的緩存池換成一個更大的緩存池,這樣,消費者通過request()即使傳入一個很大的數字,生産者也會生産事件。但是這種方式比較消耗記憶體,除非是我們比較了解消費者的消費能力,能夠把握具體情況,不會産生OOM。總之BUFFER要慎用。BackpressureStrategy.BUFFER
-
政策:當消費者處理不了事件,則丢棄。消費者通過request()傳入其需求n,然後生産者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丢掉。BackpressureStrategy.DROP
-
政策: LATEST與DROP功能基本一緻。消費者通過request()傳入其需求n,然後生産者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丢掉。唯一的差別就是LATEST總能使消費者能夠接收到生産者産生的最後一個事件。BackpressureStrategy.LATEST
源碼閱讀——簡單例子 (一)
注:目前使用的源碼版本 rxjava:2.1.9
從這段不涉及操作符和線程切換的簡單例子開始:
// 建立觀察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String o) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError data is :" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
// 建立被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
// 訂閱
observable.subscribe(observer);
a、ObservableOnSubscribe.java
先看一下
ObservableOnSubscribe.java
這個類
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
由代碼可知
ObservableOnSubscribe
是一個回調接口,回調方法中參數為
ObservableEmitter
,下邊看一下
ObservableEmitter
這個類。
ObservableEmitter.java
ObservableEmitter字面意思是被觀察者發射器,看一下源碼:
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
@NonNull
ObservableEmitter<T> serialize();
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
ObservableEmitter
是對
Emitter
的擴充,而擴充的方法正是 RxJava2.0 之後引入的。提供了可中途取消等新能力,我們看
Emitter
源碼:
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
Emitter
字面意思是發射器,這裡邊的三個方法,大家都很熟悉了。其對應了以下這段代碼:
new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
}
回調說完,下邊我們來看
Observable.create(ObservableOnSubscribe<T> source)
這段代碼。
b、Observable.create(ObservableOnSubscribe source)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
- RxJavaPlugins 先忽略
- 我們看到傳入的
被用來建立ObservableOnSubscribe
,其實ObservableCreate
ObservableCreate
的一個實作類Observable
是以
Observable.create(ObservableOnSubscribe<T> source)
這段代碼,實際是:
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO線程中執行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
- 這裡我們知道:當
方法被執行時,使用者通過調用ObservableOnSubscribe.subscribe
方法,将資料發送出去(發送給觀察者)ObservableEmitter.onNext
下邊我們看一下
ObservableCreate
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// 省略部分代碼 ...
}
-
方法是在ObservableOnSubscribe.subscribe
方法中第四行中被執行了;ObservableCreate.subscribeActual
方法中,使用者通過調用subscribe
方法,将資料發送出去;ObservableEmitter.onNext
- 而
方法第二行,調用了subscribeActual
方法。 訂閱發生時,在訂閱線程主動執行了observer.onSubscribe(parent);
的observer
方法;onSubscribe
-
CreateEmitter
方法傳入的ObservableCreate.subscribeActual(Observer<? super T> observer)
的封裝;Observer
-
的作用是任務取消時,可以不再回調其封裝的觀察者;CreateEmitter
observer
方法,由onNext
方法調用;CreateEmitter.onNext
Observable.create(ObservableOnSubscribe<T> source);
方法最終傳回一個
ObservableCreate
對象。
下邊看
observable.subscribe(observer);
方法
c、observable.subscribe(observer);
-
即 訂閱發生的那一刻。observable.subscribe(observer);
- 這裡
實際是observable.subscribe(observer);
ObservableCreate.subscribe(observer);
下邊檢視
Observable
subscribe(observer)
Observable.subscribe(Observer observer)
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// Observable的subscribe方法,實際執行的是subscribeActual方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
//
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
- 調用
方法時,實際是調用了observable.subscribe(observer);
方法。observable.subscribeActual(observer)
-
為observable
的引用,是以這裡調用的是ObservableCreate
ObservableCreate.subscribeActual(observer)
我們又回到
ObservableCreate
這個類的
subscribeActual
ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
// subscribeActual 方法在 訂閱發生的那一刻被調用 既 observable.subscribe(observer);時被調用
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 若中途任務取消,通過CreateEmitter 可終止對observer中方法onNext 、onError 等的回調
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 訂閱發生時,執行 觀察者的onSubscribe(Disposable d) 方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// 省略部分代碼 ...
}
-
方法在 訂閱發生的那一刻被調用的;在subscribeActual
時被調用;observable.subscribe(observer);
-
訂閱發生時,在訂閱線程回調observer.onSubscribe(parent);
observer
onSubscribe
-
方法中,傳入的subscribeActual
會被包裝成一個Observer
;若中途任務取消,通過CreateEmitter
可終止對CreateEmitter
中方法observer
等的回調;onNext 、onError
subscribeActual 中第二行代碼 observer.onSubscribe(parent);
observer.onSubscribe(parent);
訂閱發生時,執行 觀察者的
onSubscribe(Disposable d)
方法,這裡回到了以下代碼
// 建立觀察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
// ... 省略onNext、onError、onComplete
};
- 這裡傳入的參數為
,其實作了new CreateEmitter<T>(observer)
接口,若任務取消,則不回調傳入的觀察者Disposable
對應的observer
等方法onNext 、onError、onComplete
subscribeActual 中第四行代碼 source.subscribe(parent);
source.subscribe(parent);
是
ObservableOnSubscribe.subscribe(new CreateEmitter<T>(observer));
代碼最終回到
ObservableOnSubscribe
subscribe
:
new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
}
- 在
中,調用到subscribe
類的CreateEmitter
方法,将資料發送onNext 、onComplete、onError
中的CreateEmitter
觀察者
到此,“這段不涉及操作符和線程切換的簡單例子” 的代碼跟蹤結束。
源碼閱讀——線程切換 (二)
從這段線程切換的簡單例子開始:
// 建立觀察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 訂閱線程 訂閱的那一刻在訂閱線程中執行
}
@Override
public void onNext(String o) {
// Android 主線程中執行
}
@Override
public void onError(@NonNull Throwable e) {
// Android 主線程中執行
}
@Override
public void onComplete() {
// Android 主線程中執行
}
};
// 建立被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO線程中執行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
// 被觀察者 IO 線程
observable = observable.subscribeOn(Schedulers.io());
// 觀察者 Android主線程
observable = observable.observeOn(AndroidSchedulers.mainThread());
// 訂閱
observable.subscribe(observer);
先來個我總結的RxJava2的整個代碼執行流程:

a、Observable.create(ObservableOnSubscribe source)
在 源碼閱讀——簡單例子 (一) 中我們了解到了
Observable.create(ObservableOnSubscribe<T> source)
實際是 如下代碼:
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO線程中執行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
-
中含有一個ObservableCreate
方法,用于執行傳入觀察者的subscribeActual(observer)
方法,和間接調用 觀察者的observer.onSubscribe
等方法;onNext、onComplete
ObservableCreate
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// 省略部分代碼 ...
}
-
方法第二行,調用了傳入的觀察者的subscribeActual
方法; 訂閱發生時,在訂閱線程主動執行了observer.onSubscribe(parent);
observer
onSubscribe
-
方法第四行,調用了傳入的觀察者的subscribeActual
observer.subscribe
subscribe
CreateEmitter.onNext
-
CreateEmitter
ObservableCreate.subscribeActual(Observer<? super T> observer)
Observer
-
CreateEmitter
observer
onNext
CreateEmitter.onNext
下邊檢視observable.subscribeOn(Schedulers.io())相關代碼
注:
ObservableEmitter
CreateEmitter
的引用,是對
Observer
的進一步封裝。
CreateEmitter
在執行
onNext
時,如果任務取消,則不再回調
Observer
onNext
b、observable.subscribeOn(Schedulers.io())
下邊我們檢視
Observable
subscribeOn(Scheduler scheduler)
Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 生成一個ObservableSubscribeOn對象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
- 繼續忽略
RxJavaPlugins
- 最終傳回一個
對象ObservableSubscribeOn
Observable observable = observableCreate.subscribeOn(Schedulers.io())
代碼實際是
ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
-
傳回的是一個observable.subscribeOn(Schedulers.io())
的引用ObservableSubscribeOn
下邊檢視ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// ... 省略部分代碼
}
看一下
ObservableSubscribeOn
subscribeActual
-
方法第二行代碼中,執行了傳入subscribeActual
Observer
onSubscribe
-
方法第三行: 在subscribeActual
scheduler
中,執行IO線程
observableCreate
方法,傳入參數為subscribe
,即:SubscribeOnObserver
執行IO線程中
observableCreate.subscribe(new SubscribeOnObserver(observer));
是以,無論
ObservableSubscribeOn.subscribeActual(observer)
在哪個線程中被調用
observableCreate.subscribe(new SubscribeOnObserver<T>(observer))
均在IO線程中執行,是以觀察者的
e.onNext("hello"); e.onComplete();
亦在IO線程中執行;
c、observable.observeOn(AndroidSchedulers.mainThread())
Observable
observeOn(Scheduler scheduler)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
//
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
這裡可以看到
Observable observable = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())
實際是:
ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
是以 ,
observable.observeOn(AndroidSchedulers.mainThread())
傳回的是
ObservableObserveOn
的引用。
下邊檢視ObservableObserveOn
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// ... 省略部分代碼
}
ObservableObserveOn
subscribeActual
-
方法第五行代碼,實際為subscribeActual
observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
-
的作用是在ObserveOnObserver
ObserveOnObserver
方法被實行時;将onNext
observer
方法post到onNext
中;Android主線程
d、observable.subscribe(observer)
- 我們知道
Observable
方法,實際調用到了subscribe(Observer<? super T> observer)
Observable
subscribeActual(Observer<? super T> observer)
- 而這裡的
observable
的引用;ObservableObserveOn
是以,
observable.subscribe(observer)
實際執行的是
observableObserveOn.subscribeActual(observer)
到這裡,我們 線程切換 (二) 的小例子變換為了以下代碼:
// 建立觀察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 訂閱線程 訂閱的那一刻在訂閱線程中執行
}
@Override
public void onNext(String o) {
// Android 主線程中執行
}
@Override
public void onError(@NonNull Throwable e) {
// Android 主線程中執行
}
@Override
public void onComplete() {
// Android 主線程中執行
}
};
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO線程中執行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
//
ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
//
ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
//
observableObserveOn.subscribeActual(observer);
observableObserveOn.subscribeActual(observer)
ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
// source 為 observableSubscribeOn
super(source);
// scheduler 為AndroidSchedulers.mainThread()
this.scheduler = scheduler;
// false
this.delayError = delayError;
// 128
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// AndroidSchedulers.mainThread() 為 HandlerScheduler,是以會走到else部分代碼
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
}
// 代碼會走到else 部分
else {
Scheduler.Worker w = scheduler.createWorker();
// source 為 observableSubscribeOn
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// ... 省略部分代碼
}
-
方法中,subscribeActual
AndroidSchedulers.mainThread()
,是以 if 中的判斷語句直接忽略,直接走到代碼的 else 部分。HandlerScheduler
-
方法中,将觀察者subscribeActual
封裝成了observer
;并且調用ObserveOnObserver
observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
-
observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
// 1、“訂閱線程中” —— 執行onSubscribe, 實際執行的是observer的onSubscribe方法
observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver));
// 2、“IO程中” —— 執行subscribe ;IO線程 subscribe方法中,使用者主動調用ObserveOnObserver的onNext、onError、onComplete方法,将資料發出去
observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))
- 使用者調用
SubscribeOnObserver
是将資料發送出去onNext
-
調用了SubscribeOnObserver.onNext
observeOnObserver.onNext
-
通過observeOnObserver.onNext
将HandlerScheduler
等方法post到Android主線程中執行。observer.onNext、observer.onError、observer.onComplete
e、整體流程圖如下
最後總結一下RxJava2的整個執行流程:

參考
手把手教你使用 RxJava 2.0(一) RxJava2 源碼解析(一) RxJava2 源碼解析——流程= THE END =
文章首發于公衆号”CODING技術小館“,如果文章對您有幫助,可關注我的公衆号。