引言
本文不對Rxjava的基本使用進行講解,僅對源碼做分析,如果你對Rxjava的基本使用還有不清楚的,建議學習官方文檔之後再閱讀本文
ReactiveX文檔中文翻譯
Rxjava
本文會逐一解析Rxjava的create()、subscribe()、操作符、subscribeOn()、obsweveOn()、背壓的源碼,模式是先給出一段模版代碼,然後逐漸深入分析
正文
Create()方法
這裡給出一個最簡單的Rxjava的執行個體
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("next");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + d);
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
複制代碼
直接看create()方法主體
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複制代碼
調用對象和傳回對象都為Observable,而傳入參數為ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
複制代碼
這是一個接口,僅包含一個方法,就是上面我們在new ObservableOnSubscribe時候需要重寫的那個方法。 再看subscribe()的形參類型ObservableEmitter
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
@NonNull
ObservableEmitter<T> serialize();
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
複制代碼
發現這也是一個接口,不需要太過關注,值得關注的是他的上層,由接口特性我們知道Emitter肯定也是一個接口,我們來看下它定義了什麼方法
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
複制代碼
看到這三個熟悉的方法,你就知道為什麼我們執行個體化的ObservableEmitter對象e可以調用onNext()、onError()、onComplete()這三個方法了
create()的參數已經看完了,下面看下create()的内容
第一句
ObjectHelper.requireNonNull(source, "source is null");
是判空代碼。 傳回值是
RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
,我看看下這個方法。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
...
return source;
}
複制代碼
具體的内容我們不需要去解讀,我們隻看他的傳回值和傳入參數,經過觀察發現都是Observable類型,乍看好像沒什麼問題,但是看上面,源碼中傳入的是一個ObservableCreate類型,是以這裡ObservableCreate有擴充卡的作用,将ObservableOnSubscribe适配為Observable類型。 下面我們就看看這個擴充卡ObservableCreate
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
...
}
}
複制代碼
成員變量、構造方法略過,我們先看看這裡頻頻出現的觀察者observer
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
複制代碼
同樣的也是一個接口,定義的這個四個方法就是我們在訂閱時,觀察者需要重寫的四個方法,注意與上面的Emitter接口及其三個方法進行區分。 看這行
observer.onSubscribe(parent);
,由上面我們知道observer.onSubscribe()是接受Disposable類型,而這裡的parent是CreateEmitter類型,你可能已經猜出來了,沒錯,這裡的CreateEmitter也是一個擴充卡,前面的ObservableCreate對被觀察者進行了适配,CreateEmitter則對觀察者進行了适配,将observer類型轉化為Disposable類型,下面看下他的源碼
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
...
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("..."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
...
}
複制代碼
主要就是重寫了那四個方法,定義了規則,比如:
- onComplete()與onError()互斥,切CreateEmitter在回調他們兩中任意一個後,都會自動dispose()
- Observable和Observer的關系沒有被dispose,才會回調Observer的onXXXX()方法
并且,到這裡你對onCreate()中的資料流動也一定有了一定的了解:
-->
-->
e.onNext("next")
-->
CreateEmitter.onNext("next")
-->
Observer.onNext("next")
Log.d(TAG, "onNext: "+value)
我們再回到ObservableCreate的subscribeActual()中。
source.subscribe(parent);
,最重要的是這一行,調用者是被觀察者,傳入的參數為觀察者,基本可以猜出來了,這裡是訂閱的作用,真正将被觀察者與觀察者聯系起來的地方
subscribe()方法
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
...
}
複制代碼
第一句的作用同樣是判空,接下來先擷取了傳入的observer并進行了相關配置,然後調用
subscribeActual(observer);
,細心的同學可能注意到了,subscribeActual()正是在上面ObservableCreate中被重寫的方法,而具有“訂閱”意義的那行代碼也包含其中,結合subscribe()的本意,這行代碼的作用也很明朗了
如果你隻是想對Rxjava基本的資料傳輸流程、訂閱的原理感興趣,那麼就不用看下去了,下面的内容主要是Rxjava操作符、線程排程、背包的源碼分析
操作符(Map)
開始分析Rxjava的操作符部分
我們以Map操作符為例展開分析,首先,還是給出一個最簡單的執行個體
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("next");
e.onComplete();
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + d);
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
複制代碼
先看map()方法整體
public final <R > Observable < R > map(Function < ? super T, ? extends R > mapper){
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複制代碼
傳回值肯定是Observable,參數是一個泛型接口,我們看下這個接口
public interface Function<T, R> {
R apply(@NonNull T t) throws Exception;
}
複制代碼
傳入T,傳回R,符合Map操作符傳入兩個資料類型進行轉換的效果,在意料之中。 繼續看map()的方法内容,第一行按照慣例是判空語句,我們發現map()的return語句與create()極為相似,都是調用了RxJavaPlugins.onAssembly(),僅是傳入的參數不同,其實不隻是Map操作符,大多操作符都是這樣的,他們的不同僅僅是傳入參數的不同,也就是擴充卡的不同,這說明,操作符的具體實作(比如Map的類型轉換)都是在各自的擴充卡中做的。
小結:create以及對大多數操作符的retun語句都是RxJavaPlugins.onAssembly(),僅是傳入參數不同
進入ObservableMap的部分
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
}
複制代碼
我們發現,ObservableMap做的事情很少,就三件事,第一:在構造方法中,将傳入的Observable也就是本身抛給父類(ObservableSource是Observable的父類,是以可以接受);第二:對轉換邏輯funtion進行儲存;第三:重寫subscribeActual()方法并在其中實作訂閱,這裡與ObservableCreate是一樣的,隻是傳遞的參數不同
小結:create以及對大多數操作符的第一層擴充卡中都會重寫subscribeActual()并實作訂閱邏輯
我們并沒有在ObservableMap的代碼中發現進行類型轉換的代碼,不要心急,有的同學估計已經發現了,這裡的進行訂閱操作的
source.subscribe()
傳入的參數類型改變了 ,之前是CreateEmitter,現在變為了一個叫MapObserver的類,我們知道CreateEmitter中實作了那四個常用的方法并制定了相關規則,是以你推測MapObserver中做了同樣的操作,其實不是的,但也查不了太多,除onNext()之外的三個方法是在它的父類BasicFuseableObserver中重寫的,MapObserver中隻對onNext()進行的重寫,而且在其中進行了資料類型轉換的工作,我們看一下源碼(這裡我們隻看onNext()部分就可以了)
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
...
}
複制代碼
可以看到再代碼中利用ObjectHelper将上遊傳過來的T,轉換成了下遊需要的U
到這裡你對.map()下的資料流動也一定有了一定的了解: -->-->
e.onNext("next")
-->
ObservableMap.MapObserver.onNext ("next")
-->
Observer.onNext("next")
訂閱的發送順序: -->
Log.d(TAG, "onNext: "+value)
-->
.subscribe(observer)
-->
ObservableMap.subscribeActual(observer)
ObservableCreate.subscribeActual(new MapObserver(observer,function))
下面進入線程排程源碼分析的階段,先看subscribeOn()
線程排程-subscribeOn()
老規矩,先來一個參考代碼
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("next");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + d);
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
複制代碼
還是一樣,直接看SubscribeOn()
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複制代碼
傳回值Observable情理之中,return傳回RxJavaPlugins.onAssembly()也是一樣,兩點不同:
- 裝飾類(也就是上文說的擴充卡)是ObservableSubscribeOn
- 傳入參數為一個Scheduler
進入ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
複制代碼
根據經驗,構造中進行調用父類、存值一些操作,沒什麼可看的,直接看訂閱的實作subscribeActual()方法,可以看見,這次對下遊觀察者進行封裝的擴充卡是SubscribeOnObserver類,根據CreateEmitter、MapObserver的經驗,我們可以猜測出它或它的父類肯定實作了那四個方法,下面我們看一下
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
複制代碼
除去構造、四個方法、基本的存儲語句就剩下一個setDisposable()方法了,如果你對Scheduler有研究,你就知道在Scheduler中真正處理線程調用邏輯的是Worker類,這裡setDisposable()的作用就是将你傳入的Scheduler傳回的worker加入管理。
目光回到subscribeActual()中,調用觀察者的onSubscribe()之後,馬上調用了parent.setDisposable(),這裡停一下,你可以翻上去觀察一下其他方法的subscribeActual()部分,都是在這時候執行訂閱操作,但是我們在這裡并沒有發現,訂閱操作不可能沒有發生,那麼是不是發生在了parent.setDisposable()這個方法裡面呢?我們之前隻關注了這個方法的内容,對于傳入的參數還沒有解析,我們現在看一下,希望有新的發現。
傳入的參數是
scheduler.scheduleDirect(new SubscribeTask(parent))
。 先看SubscribeTask這個類
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
複制代碼
這個類繼承Runnable,是以實作了一個子線程,在run()中執行操作,沒錯,
source.subscribe(parent);
,熟悉的語句,這就證明了這裡的訂閱操作發生在了Scheduler的線程中。
我們繼續看scheduleDirect()這個方法
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
複制代碼
繼續
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
複制代碼
我們可以發現,傳入的子線程被包裝配置之後,開始在Worker也就是Scheduler線程中執行 我們繼續看DisposeTask這個類,具體的訂閱子線程的啟動就在這裡
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker) w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
複制代碼
可以看到run()中調用了
ecoratedRun.run();
來啟動線程,注意這裡是使用的run()而不是start(),而且整個rxjava流程走完後會自己調用
dispose();
關閉線程。
到這裡,你應該明白了subscribeOn()線程排程的過程,正如它的效果描述一樣:将觀察者的操作運作在Scheduler.io()線程中
--> subscribeOn(Schedulers.io())
--> 傳回一個ObservableSubscribeOn的包裝類
--> 當上遊的被觀察者被訂閱之後,回調ObservableSubscribeOn包裝類中的subscribeActual()
--> 線程切換至Schedulers.io(),并進行訂閱操作
source.subscribe(parent)
理順思路之後我們發現,這裡訂閱,模式與之前相同,還是下遊觀察者對上遊被觀察者進行訂閱,依舊是自下向上的,但是我們通過之前的源碼分析知道,上遊發送資料時調用的那個四個方法實際是調用下遊觀察者對應重寫的四個方法,是以這邊滿足了線程排程的目的:将觀察者所做的操作置與Schedulers.io()線程中
并且,我們這裡還可以解釋一個問題 為什麼subscribeOn(Schedulers.xxx())切換線程N次,總是以第一次為準? 我們知道使用subscribeOn()進行線程排程時訂閱的順序是從下往上,是以有多個subscribeOn()時,從最後一個開始執行,一直執行到第一個,最後的結果還是以第一個為準
然後看obsweveOn(),有了上面subscribeOn()的經驗,分析obsweveOn()就快了
線程排程-obsweveOn()
執行個體
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("next");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + d);
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
複制代碼
observeOn()
public final Observable<T> observeOn (Scheduler scheduler){
return observeOn(scheduler, false, bufferSize());
}
複制代碼
沒看見RxJavaPlugins.onAssembly(),擔心不一樣?不存在的,被包了一層而已
public final Observable<T> observeOn (Scheduler scheduler,boolean delayError, int bufferSize){
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複制代碼
還是那個循序,ObservableObserveOn
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
複制代碼
看subscribeActual(),很好了解,就是先判斷是不是在主線程,是的話,直接訂閱完事,不是的話跳到主線程去,在訂閱,切換線程依舊是使用的Worker那一套,與subscribeOn()中類似,先建立一個主線程的Worker,然後把Worker放進觀察者的包裝類ObserveOnObserver中,不用多說,裡面肯定有對那四個方法的實作,我這裡簡化一下他的代碼
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
...
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
}
複制代碼
其他那三個方法與onNext()大緻相同,隻看這一個就可以了,
schedule();
這行代碼上面都是取資料的操作,并沒有對資料進行發送,是以說即使使用線程調用将被觀察者的操作放在主線程,他的資料準備階段仍然是在原線程執行的,當
schedule();
執行後,進入上面傳入Workder線程,也就是主線程,然後才将queue中的T取出,繼而發送給下遊的觀察者。其他方法也是一樣的流程,比如onError()、onComplete(),都是将錯誤或完成的資訊先儲存,等待切換線程後在執行發送操作。
由此,我們可知ObserverOn()是向下作用的,每次調用都對下遊的代碼産生作用,是以多次調用ObserverOn(),是最後一次生效的
背壓Flowable
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "next 1");
emitter.onNext(1);
Log.d(TAG, "next 2");
emitter.onNext(2);
Log.d(TAG, "next 3");
emitter.onNext(3);
Log.d(TAG, "發送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(3);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
複制代碼
因為使用背壓需要特定的觀察者類,是以這裡從頭開始分析。
先看最開始的Flowable.create()
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
複制代碼
傳回Flowable類型,第一個參數是FlowableOnSubscribe接口,作用我們肯定都清楚了,定義subscribe()、以及那三個在subscribe()中調用的方法,第二個參數是BackpressureStrategy,用于确定背壓的政策,方法内容與observable的create()大緻相同,先是判空,然後傳回RxJavaPlugins.onAssembly(),其中傳入裝飾類,這裡裝飾類為FlowableCreate,作用我們肯定都能猜出來,将FlowableOnSubscribe類型适配為Flowable類型。
下面我們細看證明一下自己的猜想
public interface FlowableOnSubscribe<T> {
void subscribe(@NonNull FlowableEmitter<T> e) throws Exception;
}
複制代碼
public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable s);
void setCancellable(@Nullable Cancellable c);
long requested();
boolean isCancelled();
@NonNull
FlowableEmitter<T> serialize();
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
複制代碼
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
複制代碼
public final class FlowableCreate<T> extends Flowable<T> {
final FlowableOnSubscribe<T> source;
final BackpressureStrategy backpressure;
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
}
複制代碼
沒錯,基本都和observable一樣,隻不過在FlowableCreate的subscribeActual()添加了相關代碼以對不同的背壓政策進行不同的操作。
到目前為止,發現Flowable的create()和Observable類的大同小異,之前我們分析出的結論仍然可以用到這裡,我們繼續看subscribe()
public final void subscribe(Subscriber<? super T> s) {
if (s instanceof FlowableSubscriber) {
subscribe((FlowableSubscriber<? super T>)s);
} else {
ObjectHelper.requireNonNull(s, "s is null");
subscribe(new StrictSubscriber<T>(s));
}
}
複制代碼
這裡發現與之前有些不同,observable中是在這裡new一個CreateEmitter擴充卡然後傳入并調用subscribeActual(),而Flowable這裡我們并沒有發現subscribeActual()的調用,而是調用了重載的另一個subscribe()并傳入一個StrictSubscriber包裝類,我們先看下這個重載的subscribe()
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");
subscribeActual(z);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
複制代碼
嗯,subscribeActual()在這裡被調用,再看下傳入的StrictSubscriber
public class StrictSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
...
@Override
public void request(long n) {
if (n <= 0) {
cancel();
onError(new IllegalArgumentException("§3.9 violated: positive request amount required but it was " + n));
} else {
SubscriptionHelper.deferredRequest(s, requested, n);
}
}
@Override
public void cancel() {
if (!done) {
SubscriptionHelper.cancel(s);
}
}
@Override
public void onSubscribe(Subscription s) {
if (once.compareAndSet(false, true)) {
actual.onSubscribe(this);
SubscriptionHelper.deferredSetOnce(this.s, requested, s);
} else {
s.cancel();
cancel();
onError(new IllegalStateException("§2.12 violated: onSubscribe must be called at most once"));
}
}
@Override
public void onNext(T t) {
HalfSerializer.onNext(actual, t, this, error);
}
@Override
public void onError(Throwable t) {
done = true;
HalfSerializer.onError(actual, t, this, error);
}
@Override
public void onComplete() {
done = true;
HalfSerializer.onComplete(actual, this, error);
}
}
複制代碼
其中重寫了我們使用背壓時在onSubscribe()中會調用的request()方法,以滿足“控流”的效果,而且還重寫了那四個方法,基本可以确定是功能與CreateEmitter類似的擴充卡了,注意下,上面重載的subscribe()中接受的參數是FlowableSubscriber類型,那StrictSubscriber的作用就是把Subscriber類型轉換為FlowableSubscriber類型了
最後來看下這裡的subscribeActual()
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
複制代碼
依舊是調用source.subscribe()進行訂閱操作,隻不多傳入的參數多了一層由對應XXXEmitter類的包裝,這種有關背壓政策選擇的代碼在subscribeActual()中也出現過一次,我們選擇就進去看一下他具體做了什麼包裝。
這裡以ERROR對應的ErrorAsyncEmitter為例
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 338953216916120960L;
ErrorAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
}
複制代碼
内容很簡單就是重寫一個onOverflow()方法實作了對錯誤的輸出,那麼他是在什麼地方調用呢,我們繼續看他的父類
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() != 0) {
actual.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
複制代碼
發現了onOverflow()調用的地方,但是有前置條件“get() == 0”,get()是在哪裡調用呢? 繼續看父類
abstract static class BaseEmitter<T> extends AtomicLong implements FlowableEmitter<T>, Subscription {
...
BaseEmitter(Subscriber<? super T> actual) {
this.actual = actual;
this.serial = new SequentialDisposable();
}
...
}
複制代碼
并沒有發現關于get()的定義,但是不要疏忽了,看他的父類是誰?是AtomicLong,如果你對Java的研究夠深的話,你不會對這個類陌生,正是Java中的一個原子類,用來對長整型進行原子操作,這個get()就是其中的了
到這裡,我們來理順一下背壓發生的資料流動
--> emitter.onNext()
-->StrictSubscriber.onNext()
--> Subscriber.onNext()
好了,Rxjava的源碼分析到這裡結束了,文中有很多沒有講到的地方,日後有時間的會繼續講解剩餘部分。
總結
本文中,我們對create()、subscribe()、map()、subscribeOn()、observeOn()的源碼進行的閱讀,想你已經可以從源碼的角度回答以下問題:
- 觀察者如何發送資料?
- 被觀察者如何接受資料?
- 操作符的實作原理是什麼?
- Map關鍵字是如何實作類型轉換的?
- 線程排程是如何實作的?
- 為什麼多次調用subscribeOn(),隻有第一次生效?
- 為什麼多次調用observeOn(),隻有最後一次生效?
- 背壓是如何實作的?
作者:OldJii
轉載自:https://juejin.im/post/5d865e35f265da03c128d599