天天看點

RxJava2 源碼分析(一)

這是第二次寫源碼分析,之前的一次已經是一年前了。

為何要重寫?

主要是由于今天看到了一些關于線程池的東西,我聯想到了RxJava2中的線程分類。再想到項目中的線程池相關的地方,感覺很亂,是以有一個整合的想法,想将原來自己建立的線程池替換成RxJava2中的線程池,于是就有了翻看源碼的心思。

擇日不如撞日,反正是看源碼,順便把以前的東西再整理一下,還有就是以前寫的東西,思路太亂,看着不舒服。

回想起來,RxJava2的源碼有很多套路,隻要掌握了這個套路,閱讀源碼起來就會有一切盡在掌握的感覺,否則,就會覺得源碼很繞。

是以第一篇文章的主要目的,是講明白這個套路,然後配上圖,能夠更容易讓人了解,如果以後忘記了,再回來看一遍也能迅速跟上思路,不會又要再次撸一遍源碼。

從一個簡單的例子開始

// 觀察者 -- 蝙蝠俠
// 這裡之是以沒有用 Consumer,是怕引起歧義
// 畢竟源碼利用将我們傳遞進去的 Consumer, 又封裝了一層,封裝成了 Observer
val observerBatMan = object : Observer<Int> {

    override fun onComplete() {
    }

    override fun onNext(t: Int) {
        Assert.assertEquals(1, t)
    }

    override fun onError(e: Throwable) {
    }

    override fun onSubscribe(d: Disposable) {
    }

}

// 資料源 -- 小醜
val sourceClown = ObservableOnSubscribe<Int> {
    it.onNext(1)
    it.onComplete()
}

// 開始觀察
Observable.create<Int>(sourceClown)
    .subscribe(observerBatMan)
           

嗯,果然 kotlin 還是看起來舒服。

這個例子非常簡單了,資料源發送一個int值 1,然後接收者判斷值是不是1。

現在開始分析源碼了,先看 Observable 的 create 方法:

Observable.java

該方法建立一個 Observeable 對象。分析完成之後,你就會發現實際上就是建立了一個 ObservableCreate 對象。

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // 這個是判空,嗯,沒啥好說的,我一般用注解。
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
           

套路一

别看有些人表面上風風光光,背地裡卻連隻大熊貓都沒有。

上面的 create 方法中,看起來有兩行代碼,感覺做了一些了不得的東西,但是實際上隻有半行代碼在起主要作用。

第二行代碼的前半行:

RxJavaPlugins.java

該方法在 onObservableAssembly 不會空的情況下會對 source 做一個變換,否則傳回 source。

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
           

因為 onObservableAssembly 絕大部分情況下為空,其實就是傳回了傳進來的參數。是以該方法基本可以忽略。

需要注意,這個套路在源碼中很常見。

是以最後,我們可以把 Observeable 的 create 方法了解為:

Observable.java

create 簡化後的代碼

public static <T> Observable<T> create(@NotNull ObservableOnSubscribe<T> source) {
        return new ObservableCreate<T>(source);
    }
           

這樣看是不是很簡單!!!

繼續深入,看看 ObservableCreate 有何德何能!

ObservableCreate .java

ObservableCreate 繼承至 Observable。這個繼承還有一個非常重要的作用,就是友善鍊式調用。

别看 Observable 有1w 多行代碼,但是實際上隻有一個抽象方法,其他的都是用來做操作符等等。

下面來看看這個抽象方法,後面會分析到。

Observable.java

該方法由 Observable 的 subscribe 方法調用,即 Observable.create(xxx).subscribe(xxx);

subscribe 就會調用 subscribeActual

套路二

遵循模闆:
  1. 将source封裝一下,變成一個 Observable
  2. 将 observer 封裝一下,變成一個Emitter,
  3. 然後調用 source 的 onSubscribe 方法,
  4. 然後調用 source 的 subscribe 方法,将 Emitter 傳進去。

其實隻要你知道 observer 是誰,source 是誰,很簡單的啦。

ObservableCreate 的核心代碼就在這個被覆寫的抽象方法裡面,嗯,一起來看看吧。

ObservableCreate.java

該方法由 Observable 的 subscribe 方法調用,即 Observable.create(xxx).subscribe(xxx);

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
           

看上面的代碼,需要搞清楚幾個變量,不然繞着繞着就糊塗了。

  1. source 是我們建立并傳遞進來的。額,忘記貼構造函數了,裡面有指派,這個 source 就是我們在 create 方法裡面建立的對象啦。

    ObservableCreate.java

    構造方法

    public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
               
RxJava2 源碼分析(一)
  1. observer 這裡暫時分析不出來,因為是父類調用了這個方法,是以我們去父類看看

    Observable.java

    這個方法的主要作用,就是将資料源與觀察者關聯起來

    它還調用了 subscribeActual 方法,子類必須實作 subscribeActual 方法。

    public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
               
    使用套路一,我們簡化一下代碼:

    Observable.java

    subscribe 簡化後的代碼

    public final void subscribe(Observer<? super T> observer) {
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                ...
                RxJavaPlugins.onError(e);
    			...
                throw npe;
            }
        }
    
               
    如果,是走正常流程,沒有錯誤,還可以簡化(第一次分析主流程,就是要這樣簡化簡化再簡化):

    Observable.java

    subscribe 簡化後的代碼

    public final void subscribe(Observer<? super T> observer) {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            subscribeActual(observer);
        }
    
               
    實際上,RxJavaPlugins.onSubscribe 也含有套路一,是以再次簡化:

    Observable.java

    subscribe 簡化後的代碼

    public final void subscribe(Observer<? super T> observer) {
            // 可以忽略
            observer = observer;
            subscribeActual(observer);
        }
    
               

    是以,最終實際上 subscribe 方法,就是調用了 subscribeActual 方法而已,隻不過它增加了錯誤與鈎子處理。

    看到這裡,不知道你有沒有反應過來,這個 subscribe(observer) 方法是不是很熟悉呢?

    這個方法,就是我們上面例子中的:

    我們寫的 demo 的代碼
    // 開始觀察
    Observable.create<Int>(sourceClown)
    	// 這裡就是調用的 subscribe 方法
        .subscribe(observerBatMan)
    
               

    是不是有點恍然大悟的感覺呢!

    是以到這裡,心裡應該由一個大緻架構了。

    同時也會發現,

    ObservableCreate

    subscribeActual

    方法中的

    observer

    參數,也是我們new出來的對象。
    RxJava2 源碼分析(一)

分析到了這裡,一個輪廓就出來了!!!

ObservableCreate

subscribeActual

方法中的 參數分别對應如下:

ObservableCreate.java

ObservableCreate 繼承至 Observable,是以它必須實作 subscribeActual 方法。

這個方法也是核心,是鍊式調用的核心,線程切換的核心

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        // observer 就是 observerBatMan
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            // source 就是 sourceClown
            // 這個 subscribe 就将兩個包裝的觀察者與資料源對象關聯起來了
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

           

如果,不考慮錯誤的情況,我們簡化一下代碼:

ObservableCreate.java

subscribeActual 簡化後的代碼

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        // observer 就是 observerBatMan
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 調用 observerBatMan 的 onSubscribe 方法,這個是一個鈎子方法
        // 一般專門用來告訴 observerBatMan,我,sourceClown,要搞事情了
        observer.onSubscribe(parent);
        // source 就是 sourceClown
        source.subscribe(parent);
    }

           

由于,onSubscribe 我們暫時也不用,是以去掉,再簡化:

ObservableCreate.java

subscribeActual 簡化後的代碼

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 注意這裡 subscribe 傳遞的是 parent
        source.subscribe(parent);
    }

           

嘿嘿嘿,這樣就舒服多了,就3個變量,兩個是我們自己建立的,知根知底,還有一個貨,

CreateEmitter

我們先放一放,為啥呢,因為關于 source 的代碼還沒有分析完成呢。

别看

source.subscribe(parent);

就一行代碼,但是由于 source 對象是我們自己建立的,是以這個方法實際上調用了我們寫的代碼:

ObservableOnSubscribe.java
public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

           

ObservableOnSubscribe

是一個接口,是以,我們實際上是建立了一個匿名内部類,傳遞給了 source,然後 source 又調用了 subscribe 方法,是以也就調用了我們寫的代碼。

嗯,用僞代碼表示如下:

1. 匿名内部類 = new ObservableOnSubscribe(){}

2. oc = Observerable.create(匿名内部類)

3. oc.subscribe(xxx),這個方法會調用到 -> subscribeActual

4. subscribeActual 會調用到 -> 匿名内部類.subscribe(emiiter)

5. 我們的代碼

           
我們自己寫的 demo 代碼
// 這裡的 it 是 ObservableEmitter
it.onNext(1)
it.onComplete()

           

那麼,當 it.onNext(1) 執行之後,又會發生什麼呢?

這個 it 就是 CreateEmitter,嗯,雖然有點突然,但是這個應該沒有疑問吧?!!

  1. 我們把 sourceClown 傳進去,并且調用了 ObservableEmitter 的 onNext 等方法
  2. sourceClown 被封裝成了 CreateEmitter
  3. source 的 subscribe 方法接收的是 CreateEmitter,
是以,ObservableEmitter 在運作時就是 CreateEmitter 對象。

我們先不忙着去看它的 onNext 方法,先看看這個類。

套路三

由老父親來替你打理一切

我們知道在套路二裡面,我們傳遞的 sourceClown 被封裝了一下,變成了一個

CreateEmitter

CreateEmitter

這個變量名就很叼,一看就是 observer 的老父親,那麼,可以先猜一猜,為啥它要起這樣一個名呢?

由于

ObservableOnSubscribe

subscribe

方法隻接受

ObservableEmitter

,是以

CreateEmitter

必須要實作這個接口。

好,我們看源代碼:

CreateEmitter.java
static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {...}

           

AtomicReference

是java類,就不展開講了,不知道的人(比如我)這個時候應該打開了文檔,開始學習了。

繼續看構造方法:

CreateEmitter.java
CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

           

嗯,很好,observer 被儲存起來了。

由于,在 sourceClown 中我們調用了:

我們寫的 demo 的代碼

上面說過,it 就是 CreateEmitter,是以 CreateEmitter的 onNext 方法會被調用。

現在,我們來分析它的 onNext 方法:

CreateEmitter.java
@Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

           

RxJava2 中不允許資料源發射的資料為 null,是以我們簡化一下:

CreateEmitter.java

onNext 簡化後的代碼

@Override
        public void onNext(T t) {
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

           

isDisposed 方法,就是判斷觀察者有沒有解除訂閱,畢竟,蝙蝠俠也會心累。

這上面做了這麼多判斷,現在知道為啥起名叫 parent 了不?

在我們的例子中,我們沒有解除訂閱,再簡化一下,就是:

CreateEmitter.java

onNext 簡化後的代碼

@Override
        public void onNext(T t) {
            observer.onNext(t);
        }

           

這下,夠直白了吧,直接調用了 observer 的 onNext 方法。

還記得 observer 是誰嗎,就是你,蝙蝠俠,observerBatMan。是以它的 onNext 方法會被調用。

我們寫的 demo 的代碼
override fun onNext(t: Int) {
    Assert.assertEquals(1, t)
}

           

那麼,整個流程就跑通了。

至于,onComplete 方法,差不多的啦。

最後上一張圖:

RxJava2 源碼分析(一)

最後,還有一個很重要的東西,就是這個小demo 的起始點,并不是 sourceClown,上面的圖不是程式執行流程圖,而是一種關系圖。

現在我們來看看,程式的起始點在哪?

我們寫的 demo 代碼
// 開始觀察
Observable.create<Int>(sourceClown)
    .subscribe(observerBatMan)

           

程式的起始點是 subscribe 方法,這個方法是屬于 ObservableCreate 的,是以程式的起始點在 ObservableCreate 的 subscribe 方法。

下面,貼上程式執行流程圖:

RxJava2 源碼分析(一)