這是第二次寫源碼分析,之前的一次已經是一年前了。
為何要重寫?
主要是由于今天看到了一些關于線程池的東西,我聯想到了RxJava2中的線程分類。再想到項目中的線程池相關的地方,感覺很亂,是以有一個整合的想法,想将原來自己建立的線程池替換成RxJava2中的線程池,于是就有了翻看源碼的心思。
擇日不如撞日,反正是看源碼,順便把以前的東西再整理一下,還有就是以前寫的東西,思路太亂,看着不舒服。
回想起來,RxJava2的源碼有很多套路,隻要掌握了這個套路,閱讀源碼起來就會有一切盡在掌握的感覺,否則,就會覺得源碼很繞。
是以第一篇文章的主要目的,是講明白這個套路,然後配上圖,能夠更容易讓人了解,如果以後忘記了,再回來看一遍也能迅速跟上思路,不會又要再次撸一遍源碼。
從一個簡單的例子開始
// 觀察者 -- 蝙蝠俠
// 這裡之是以沒有用 Consumer,是怕引起歧義
// 畢竟源碼利用将我們傳遞進去的 Consumer, 又封裝了一層,封裝成了 Observer
val observerBatMan = object : Observer<Int> {
override fun onComplete() {
}
override fun onNext(t: Int) {
Assert.assertEquals(1, t)
}
override fun onError(e: Throwable) {
}
override fun onSubscribe(d: Disposable) {
}
}
// 資料源 -- 小醜
val sourceClown = ObservableOnSubscribe<Int> {
it.onNext(1)
it.onComplete()
}
// 開始觀察
Observable.create<Int>(sourceClown)
.subscribe(observerBatMan)
嗯,果然 kotlin 還是看起來舒服。
這個例子非常簡單了,資料源發送一個int值 1,然後接收者判斷值是不是1。
現在開始分析源碼了,先看 Observable 的 create 方法:
Observable.java
該方法建立一個 Observeable 對象。分析完成之後,你就會發現實際上就是建立了一個 ObservableCreate 對象。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 這個是判空,嗯,沒啥好說的,我一般用注解。
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
套路一
别看有些人表面上風風光光,背地裡卻連隻大熊貓都沒有。
上面的 create 方法中,看起來有兩行代碼,感覺做了一些了不得的東西,但是實際上隻有半行代碼在起主要作用。
第二行代碼的前半行:
RxJavaPlugins.java
該方法在 onObservableAssembly 不會空的情況下會對 source 做一個變換,否則傳回 source。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
因為 onObservableAssembly 絕大部分情況下為空,其實就是傳回了傳進來的參數。是以該方法基本可以忽略。
需要注意,這個套路在源碼中很常見。
是以最後,我們可以把 Observeable 的 create 方法了解為:
Observable.java
create 簡化後的代碼
public static <T> Observable<T> create(@NotNull ObservableOnSubscribe<T> source) {
return new ObservableCreate<T>(source);
}
這樣看是不是很簡單!!!
繼續深入,看看 ObservableCreate 有何德何能!
ObservableCreate .java
ObservableCreate 繼承至 Observable。這個繼承還有一個非常重要的作用,就是友善鍊式調用。
别看 Observable 有1w 多行代碼,但是實際上隻有一個抽象方法,其他的都是用來做操作符等等。
下面來看看這個抽象方法,後面會分析到。
Observable.java
該方法由 Observable 的 subscribe 方法調用,即 Observable.create(xxx).subscribe(xxx);
subscribe 就會調用 subscribeActual
套路二
遵循模闆:
- 将source封裝一下,變成一個 Observable
- 将 observer 封裝一下,變成一個Emitter,
- 然後調用 source 的 onSubscribe 方法,
- 然後調用 source 的 subscribe 方法,将 Emitter 傳進去。
其實隻要你知道 observer 是誰,source 是誰,很簡單的啦。
ObservableCreate 的核心代碼就在這個被覆寫的抽象方法裡面,嗯,一起來看看吧。
ObservableCreate.java
該方法由 Observable 的 subscribe 方法調用,即 Observable.create(xxx).subscribe(xxx);
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
看上面的代碼,需要搞清楚幾個變量,不然繞着繞着就糊塗了。
- source 是我們建立并傳遞進來的。額,忘記貼構造函數了,裡面有指派,這個 source 就是我們在 create 方法裡面建立的對象啦。
ObservableCreate.java
構造方法
public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
- observer 這裡暫時分析不出來,因為是父類調用了這個方法,是以我們去父類看看
Observable.java
這個方法的主要作用,就是将資料源與觀察者關聯起來
它還調用了 subscribeActual 方法,子類必須實作 subscribeActual 方法。
使用套路一,我們簡化一下代碼:public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
Observable.java
subscribe 簡化後的代碼
如果,是走正常流程,沒有錯誤,還可以簡化(第一次分析主流程,就是要這樣簡化簡化再簡化):public final void subscribe(Observer<? super T> observer) { try { observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { ... RxJavaPlugins.onError(e); ... throw npe; } }
Observable.java
subscribe 簡化後的代碼
實際上,RxJavaPlugins.onSubscribe 也含有套路一,是以再次簡化:public final void subscribe(Observer<? super T> observer) { observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); }
Observable.java
subscribe 簡化後的代碼
public final void subscribe(Observer<? super T> observer) { // 可以忽略 observer = observer; subscribeActual(observer); }
是以,最終實際上 subscribe 方法,就是調用了 subscribeActual 方法而已,隻不過它增加了錯誤與鈎子處理。
看到這裡,不知道你有沒有反應過來,這個 subscribe(observer) 方法是不是很熟悉呢?
這個方法,就是我們上面例子中的:
我們寫的 demo 的代碼
// 開始觀察 Observable.create<Int>(sourceClown) // 這裡就是調用的 subscribe 方法 .subscribe(observerBatMan)
是不是有點恍然大悟的感覺呢!
是以到這裡,心裡應該由一個大緻架構了。
同時也會發現,
的ObservableCreate
方法中的subscribeActual
參數,也是我們new出來的對象。observer
分析到了這裡,一個輪廓就出來了!!!
ObservableCreate
的
subscribeActual
方法中的 參數分别對應如下:
ObservableCreate.java
ObservableCreate 繼承至 Observable,是以它必須實作 subscribeActual 方法。
這個方法也是核心,是鍊式調用的核心,線程切換的核心
@Override
protected void subscribeActual(Observer<? super T> observer) {
// observer 就是 observerBatMan
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// source 就是 sourceClown
// 這個 subscribe 就将兩個包裝的觀察者與資料源對象關聯起來了
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
如果,不考慮錯誤的情況,我們簡化一下代碼:
ObservableCreate.java
subscribeActual 簡化後的代碼
@Override
protected void subscribeActual(Observer<? super T> observer) {
// observer 就是 observerBatMan
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 調用 observerBatMan 的 onSubscribe 方法,這個是一個鈎子方法
// 一般專門用來告訴 observerBatMan,我,sourceClown,要搞事情了
observer.onSubscribe(parent);
// source 就是 sourceClown
source.subscribe(parent);
}
由于,onSubscribe 我們暫時也不用,是以去掉,再簡化:
ObservableCreate.java
subscribeActual 簡化後的代碼
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 注意這裡 subscribe 傳遞的是 parent
source.subscribe(parent);
}
嘿嘿嘿,這樣就舒服多了,就3個變量,兩個是我們自己建立的,知根知底,還有一個貨,
CreateEmitter
我們先放一放,為啥呢,因為關于 source 的代碼還沒有分析完成呢。
别看
source.subscribe(parent);
就一行代碼,但是由于 source 對象是我們自己建立的,是以這個方法實際上調用了我們寫的代碼:
ObservableOnSubscribe.java
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
ObservableOnSubscribe
是一個接口,是以,我們實際上是建立了一個匿名内部類,傳遞給了 source,然後 source 又調用了 subscribe 方法,是以也就調用了我們寫的代碼。
嗯,用僞代碼表示如下:
1. 匿名内部類 = new ObservableOnSubscribe(){}
2. oc = Observerable.create(匿名内部類)
3. oc.subscribe(xxx),這個方法會調用到 -> subscribeActual
4. subscribeActual 會調用到 -> 匿名内部類.subscribe(emiiter)
5. 我們的代碼
我們自己寫的 demo 代碼
// 這裡的 it 是 ObservableEmitter
it.onNext(1)
it.onComplete()
那麼,當 it.onNext(1) 執行之後,又會發生什麼呢?
這個 it 就是 CreateEmitter,嗯,雖然有點突然,但是這個應該沒有疑問吧?!!
是以,ObservableEmitter 在運作時就是 CreateEmitter 對象。
- 我們把 sourceClown 傳進去,并且調用了 ObservableEmitter 的 onNext 等方法
- sourceClown 被封裝成了 CreateEmitter
- source 的 subscribe 方法接收的是 CreateEmitter,
我們先不忙着去看它的 onNext 方法,先看看這個類。
套路三
由老父親來替你打理一切
我們知道在套路二裡面,我們傳遞的 sourceClown 被封裝了一下,變成了一個
CreateEmitter
。
CreateEmitter
這個變量名就很叼,一看就是 observer 的老父親,那麼,可以先猜一猜,為啥它要起這樣一個名呢?
由于
ObservableOnSubscribe
的
subscribe
方法隻接受
ObservableEmitter
,是以
CreateEmitter
必須要實作這個接口。
好,我們看源代碼:
CreateEmitter.java
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {...}
AtomicReference
是java類,就不展開講了,不知道的人(比如我)這個時候應該打開了文檔,開始學習了。
繼續看構造方法:
CreateEmitter.java
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
嗯,很好,observer 被儲存起來了。
由于,在 sourceClown 中我們調用了:
我們寫的 demo 的代碼
上面說過,it 就是 CreateEmitter,是以 CreateEmitter的 onNext 方法會被調用。
現在,我們來分析它的 onNext 方法:
CreateEmitter.java
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
RxJava2 中不允許資料源發射的資料為 null,是以我們簡化一下:
CreateEmitter.java
onNext 簡化後的代碼
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
isDisposed 方法,就是判斷觀察者有沒有解除訂閱,畢竟,蝙蝠俠也會心累。
這上面做了這麼多判斷,現在知道為啥起名叫 parent 了不?
在我們的例子中,我們沒有解除訂閱,再簡化一下,就是:
CreateEmitter.java
onNext 簡化後的代碼
@Override
public void onNext(T t) {
observer.onNext(t);
}
這下,夠直白了吧,直接調用了 observer 的 onNext 方法。
還記得 observer 是誰嗎,就是你,蝙蝠俠,observerBatMan。是以它的 onNext 方法會被調用。
我們寫的 demo 的代碼
override fun onNext(t: Int) {
Assert.assertEquals(1, t)
}
那麼,整個流程就跑通了。
至于,onComplete 方法,差不多的啦。
最後上一張圖:
最後,還有一個很重要的東西,就是這個小demo 的起始點,并不是 sourceClown,上面的圖不是程式執行流程圖,而是一種關系圖。
現在我們來看看,程式的起始點在哪?
我們寫的 demo 代碼
// 開始觀察
Observable.create<Int>(sourceClown)
.subscribe(observerBatMan)
程式的起始點是 subscribe 方法,這個方法是屬于 ObservableCreate 的,是以程式的起始點在 ObservableCreate 的 subscribe 方法。
下面,貼上程式執行流程圖: