天天看點

Rxjava 過程分析一(簡單流程)Rxjava 過程分析一

Rxjava 過程分析一

說明

  • 該文章是基于 Rxjava2 源碼。
  • 該篇隻是講述 Rxjava 建議的用法,不涉及操作符和線程切換, 後兩個會有新的篇幅去寫。 一步一步的來。
  • 在源碼中那些判空還有 Rxjava 中 RxJavaPlugins 鈎子等在分析中去除(隻關注用法和思想, 和主流程不管的暫時剔除)。
  • 由于習慣, 和 Rxjava2 中的命名。 我稱 emitter 為上遊, 也就是發射水(資料)的源頭, 結果回調給外部的 FlowableSubscriber, 我稱它為下遊。 上遊流水流到下遊!

最簡單的使用

Flowable.create(new FlowableOnSubscribe<String>() {
    @Override
    public void subscribe(FlowableEmitter<String> emitter) throws Exception {
         // emitter.onNext("");
        // emitter.onError();
       // emitter.onComplete();
    }
}, BackpressureStrategy.LATEST).subscribe(new FlowableSubscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
    }
    @Override
    public void onNext(String s) {
    }
    @Override
    public void onError(Throwable t) {
    }
    @Override
    public void onComplete() {
    }
});
           

引發的思考

  1. 調用 emitter 的 onNext、 onError、 onComplete, 就會回調 FlowableSubscriber 中對應的方法。 那麼這兩個對象是一個嗎? 有什麼聯系呢?
  2. 我們把上述代碼寫好後, 會自動調用并回調, 那麼上遊發射器 emitter 是什麼時候觸發的呢? 該方法是什麼時機和誰調用的呢?

源碼分析

從建立開始

public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    return new FlowableCreate<T>(source, mode);
}
           

簡單嗎大兄弟, 僅僅是建立了一個 FlowableCreate 類而已。 并對成員變量指派。

簡單說說下遊

在開發中, 可能很多都有在用回調吧。 再次機會我也想說說回調是咋回事。 其實 java 中的常用的内部類回調, 還是 c 的函數指針, 或者其語言的閉包(swift), 其實不要把它們想的多麼神奇。 就這麼想, 我把一個執行個體位址或者函數位址給你了, 你在内部去調用我的方法, 自然就運作到了外面了。

訂閱

public final void subscribe(FlowableSubscriber<? super T> s) {
    try {
        Subscriber<? super T> z = s;
        subscribeActual(z);
    } catch (NullPointerException e) { 
        throw e;
    } catch (Throwable e) {

    }
}
           

資訊量很少, 隻是調用了目前 Flowable 的 subscribeActual() 方法。 我們前面知道目前的 Flowable 是 FlowableCreate 對象, 是以進 FlowableCreate 中去看看做了什麼事情。

public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;

    switch (backpressure) {
    case MISSING: {
        emitter = new MissingEmitter<T>(t);
        break;
    }
    case ERROR: {
        emitter = new ErrorAsyncEmitter<T>(t);
        break;
    }
    case DROP: {
        emitter = new DropAsyncEmitter<T>(t);
        break;
    }
    case LATEST: {
        emitter = new LatestAsyncEmitter<T>(t);
        break;
    }
    default: {
        emitter = new BufferAsyncEmitter<T>(t, bufferSize());
        break;
    }
    }

    t.onSubscribe(emitter);
    try {
        source.subscribe(emitter);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        emitter.onError(ex);
    }
}
           

我們這裡不去讨論背壓等問題, 是以我們隻是關注主要流程和關鍵方法。 其中一眼就可以看到關鍵的一個就是在 try 塊中的 source.subscribe(emitter); source 是什麼呀? 就是我們在 new FlowableCreate 傳進來的 Flowable.create(new FlowableOnSubscribe()) 中 FlowableOnSubscribe 對象。 source 的 subscribe 這不就是運作了 外部 FlowableOnSubscribe 的 subscribe嘛, 是以外部調用 onNext, onError, onComplete 方法, 其實調用了 内部 emitter 中對應的方法。 我們以背壓為 LATEST 為例看看 LatestAsyncEmitter 被調用的方法做了什麼事情。 先多說一句, 初始化 emitter 時我們傳入的是下遊哦, 下遊相應的方法調用了, 那麼外部的就會看似回調出去拿到結果了!

我們以 onNext 為例, 看看 LatestAsyncEmitter 被調用到 onNext 做了什麼事情。

public void onNext(T t) {
    queue.set(t);
    drain();
}
           

看到是先把結果存到了隊列中, 我們不考慮背壓, 是以我們看主要的大緻流程哈。 顯然下一個有用的代碼就是 drain() 了。

void drain() {
    final Subscriber<? super T> a = downstream;
    final AtomicReference<T> q = queue;
		// ......
		T o = q.getAndSet(null);
		// ......
    	a.onNext(o);
		// ......
}
           

其中 downstream 就是我們外部的 FlowableSubscriber 及下遊了。 我們可以看到, 簡單的從隊列中取出資料, 直接調用了下遊的 onNext。 就這樣資料就被從上遊流向了下遊。

前面的疑惑問題

  • 上遊和下遊是一個東西嗎? 它們的關系是什麼?
這個問題從上面的分析已經很明顯了。 上遊和下遊不是一個東西, 上遊 emitter 調用相應的方法去回調下遊的方法。
  • 在哪一個時刻觸發的事件流動呢?
其實是在上遊 emitter 調用相應的方法那一刻, 比如調用 onNext。 那麼是在哪一個時機觸發調用的呢? 很明顯是在訂閱時, 調用了 subscribeActual 中又調用了上遊的 subscribe(emitter) 觸發了資料的流動。