天天看點

一文詳解 RxJava2 使用及實作原理

RxJava是什麼?根據RxJava在GitHub上給出的描述:

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java

大緻意思是:

RxJava—一個可以在JVM上運作的,基于觀察者模式 實作異步操作的java庫。

RxJava的作用:

就是

異步

RxJava的使用,可以使“邏輯複雜的代碼”保持極強的閱讀性。 Rxjava github位址

RxAndorid的作用:

Android中RxAndorid與RxJava配合使用; RxAndorid 封裝了

AndroidSchedulers.mainThread()

,Android開發者使用過程中,可以輕松的将任務post

Andorid主線程

中,執行頁面更新操作。 RxAndroid github位址

使用方式

1、Observable

  • Observable:被觀察者
  • Observer:觀察者,可接收Observable發送的資料

a、Rxjava 實作線程切換:

//
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        //1、“異步線程” 執行耗時操作
        //2、“執行完畢” 調用onNext觸發回調,通知觀察者
        e.onNext("1");
        e.onComplete();
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // 訂閱線程  訂閱的那一刻在訂閱線程中執行
            }

            @Override
            public void onNext(String value) {
                // “主線程”執行的方法
            }

            @Override
            public void onError(Throwable e) {
                // "主線程"執行的方法
            }

            @Override
            public void onComplete() {
                // "主線程"執行的方法
            }
        });           

b、Rxjava 使用操作符

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        // IO 線程
        // 請求網絡資料
        e.onNext("123456");
    }
}).map(new Function<String, Integer>() {
    @Override
    public Integer apply(String s) {
        // IO 線程
        // 網絡資料解析(資料轉化)
        //
        // throw new RequestFailException("擷取網絡請求失敗");
        return 123;
    }
}).doOnNext(new Consumer<Integer>() {    //儲存登入結果UserInfo
    @Override
    public void accept(@NonNull Integer bean) throws Exception {
        // IO 線程
        // 儲存網絡資料

    }
}).subscribeOn(Schedulers.io())   //IO線程
.observeOn(AndroidSchedulers.mainThread())  //主線程
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer bean) throws Exception {
        // 更新UI
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(@NonNull Throwable throwable) throws Exception {
        // 錯誤 顯示錯誤頁面
    }
});           

2、Flowable

Flowable是為了應對

Backpressure

産生的。

Flowable是一個

被觀察者

,與

Subscriber(觀察者)

配合使用

//
Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        //1、“異步線程” 執行耗時操作
        //2、“執行完畢” 調用onNext觸發回調,通知觀察者
        emitter.onNext(0);
        emitter.onComplete();
    }
    // 若消費者消費能力不足,則抛出MissingBackpressureException異常
}, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                // 訂閱時執行,發生在“訂閱線程”
                // 這個方法是用來向生産者申請可以消費的事件數量
                // 這裡表明消費者擁有Long.MAX_VALUE的消費能力
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                // “主線程”執行的方法
            }

            @Override
            public void onError(Throwable t) {
                // "主線程"執行的方法
            }

            @Override
            public void onComplete() {
                // "主線程"執行的方法
            }
        });           

a、 Backpressure(背壓)

Backpressure(背壓)

生産者的生産速度

大于

消費者的消費能力

引起的問題。

在RxJava中有一種情況就是

被觀察者發送消息十分迅速

以至于

觀察者不能及時的響應這些消息

例如:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        // “異步線程”中 生産者有無限的生産能力
        while (true){
            e.onNext(1);
        }
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        // “主線程”中 消費者消費能力不足,進而造成事件無限堆積,最後導緻OOM
        Thread.sleep(2000);
        System.out.println(integer);
    }
});           

異步線程中

生産者有無限的生産能力;

主線程

中 消費者消費能力不足,進而造成事件無限堆積,最後導緻OOM。

上述的現象,有個專有的名詞來來形容,即:

Backpressure(背壓)

b、Subscription.request(long n);

Subscription.request(long n)

方法是用來向

生産者申請可以消費的事件數量

  • 當調用了

    request(long n)

    方法後,生産者便發送對應數量的事件供消費者消費;
  • 如果

    不顯示調用request

    就表示

    消費能力為0

在異步調用時,RxJava中有個緩存池,用來緩存消費者處理不了暫時緩存下來的資料,緩存池的預設大小為128,即隻能緩存128個事件。

無論request()中傳入的數字比128大或小,緩存池中在剛開始都會存入128個事件;當然如果本身并沒有這麼多事件需要發送,則不會存128個事件。

  • BackpressureStrategy.ERROR

    政策下,如果生産者生産的事件大于128個,緩存池便會溢出,進而抛出

    MissingBackpressureException

    異常;
  • BackpressureStrategy.BUFFER

    政策:将RxJava中預設的128個事件的緩存池換成一個更大的緩存池,這樣,消費者通過request()即使傳入一個很大的數字,生産者也會生産事件。但是這種方式比較消耗記憶體,除非是我們比較了解消費者的消費能力,能夠把握具體情況,不會産生OOM。總之BUFFER要慎用。
  • BackpressureStrategy.DROP

    政策:當消費者處理不了事件,則丢棄。消費者通過request()傳入其需求n,然後生産者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丢掉。
  • BackpressureStrategy.LATEST

    政策: LATEST與DROP功能基本一緻。消費者通過request()傳入其需求n,然後生産者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丢掉。唯一的差別就是LATEST總能使消費者能夠接收到生産者産生的最後一個事件。

源碼閱讀——簡單例子 (一)

注:目前使用的源碼版本 rxjava:2.1.9

從這段不涉及操作符和線程切換的簡單例子開始:

// 建立觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String o) {

    }

    @Override
    public void onError(@NonNull Throwable e) {
        Log.d(TAG, "onError data is :" + e.toString());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete");
    }
};

// 建立被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
// 訂閱
observable.subscribe(observer);           

a、ObservableOnSubscribe.java

先看一下

ObservableOnSubscribe.java

這個類

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}           

由代碼可知

ObservableOnSubscribe

是一個回調接口,回調方法中參數為

ObservableEmitter

,下邊看一下

ObservableEmitter

這個類。

ObservableEmitter.java

ObservableEmitter字面意思是被觀察者發射器,看一下源碼:

public interface ObservableEmitter<T> extends Emitter<T> {

    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    @NonNull
    ObservableEmitter<T> serialize();

    @Experimental
    boolean tryOnError(@NonNull Throwable t);
}           

ObservableEmitter

是對

Emitter

的擴充,而擴充的方法正是 RxJava2.0 之後引入的。提供了可中途取消等新能力,我們看

Emitter

源碼:

public interface Emitter<T> {

    void onNext(@NonNull T value);

    void onError(@NonNull Throwable error);

    void onComplete();
}           

Emitter

字面意思是發射器,這裡邊的三個方法,大家都很熟悉了。其對應了以下這段代碼:

new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
}           

回調說完,下邊我們來看

Observable.create(ObservableOnSubscribe<T> source)

這段代碼。

b、Observable.create(ObservableOnSubscribe source)

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}           
  • RxJavaPlugins 先忽略
  • 我們看到傳入的

    ObservableOnSubscribe

    被用來建立

    ObservableCreate

    ,其實

    ObservableCreate

    Observable

    的一個實作類

是以

Observable.create(ObservableOnSubscribe<T> source)

這段代碼,實際是:

//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO線程中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});           
  • 這裡我們知道:當

    ObservableOnSubscribe.subscribe

    方法被執行時,使用者通過調用

    ObservableEmitter.onNext

    方法,将資料發送出去(發送給觀察者)

下邊我們看一下

ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代碼 ...
}           
  • ObservableOnSubscribe.subscribe

    方法是在

    ObservableCreate.subscribeActual

    方法中第四行中被執行了;

    subscribe

    方法中,使用者通過調用

    ObservableEmitter.onNext

    方法,将資料發送出去;
  • subscribeActual

    方法第二行,調用了

    observer.onSubscribe(parent);

    方法。 訂閱發生時,在訂閱線程主動執行了

    observer

    onSubscribe

    方法;
  • CreateEmitter

    ObservableCreate.subscribeActual(Observer<? super T> observer)

    方法傳入的

    Observer

    的封裝;
  • CreateEmitter

    的作用是任務取消時,可以不再回調其封裝的觀察者;

    observer

    onNext

    方法,由

    CreateEmitter.onNext

    方法調用;

Observable.create(ObservableOnSubscribe<T> source);

方法最終傳回一個

ObservableCreate

對象。

下邊看

observable.subscribe(observer);

方法

c、observable.subscribe(observer);

  • observable.subscribe(observer);

    即 訂閱發生的那一刻。
  • 這裡

    observable.subscribe(observer);

    實際是

    ObservableCreate.subscribe(observer);

下邊檢視

Observable

subscribe(observer)

Observable.subscribe(Observer observer)

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // Observable的subscribe方法,實際執行的是subscribeActual方法
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
        //
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}           
  • 調用

    observable.subscribe(observer);

    方法時,實際是調用了

    observable.subscribeActual(observer)

    方法。
  • observable

    ObservableCreate

    的引用,是以這裡調用的是

    ObservableCreate.subscribeActual(observer)

我們又回到

ObservableCreate

這個類的

subscribeActual

ObservableCreate.java

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    //  subscribeActual 方法在 訂閱發生的那一刻被調用 既 observable.subscribe(observer);時被調用
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 若中途任務取消,通過CreateEmitter 可終止對observer中方法onNext 、onError 等的回調
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 訂閱發生時,執行 觀察者的onSubscribe(Disposable d) 方法
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代碼 ...
}           
  • subscribeActual

    方法在 訂閱發生的那一刻被調用的;在

    observable.subscribe(observer);

    時被調用;
  • observer.onSubscribe(parent);

    訂閱發生時,在訂閱線程回調

    observer

    onSubscribe

  • subscribeActual

    方法中,傳入的

    Observer

    會被包裝成一個

    CreateEmitter

    ;若中途任務取消,通過

    CreateEmitter

    可終止對

    observer

    中方法

    onNext 、onError

    等的回調;

subscribeActual 中第二行代碼 observer.onSubscribe(parent);

observer.onSubscribe(parent);

訂閱發生時,執行 觀察者的

onSubscribe(Disposable d)

方法,這裡回到了以下代碼

// 建立觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
    }
    // ... 省略onNext、onError、onComplete
};           
  • 這裡傳入的參數為

    new CreateEmitter<T>(observer)

    ,其實作了

    Disposable

    接口,若任務取消,則不回調傳入的觀察者

    observer

    對應的

    onNext 、onError、onComplete

    等方法

subscribeActual 中第四行代碼 source.subscribe(parent);

source.subscribe(parent);

ObservableOnSubscribe.subscribe(new CreateEmitter<T>(observer));

代碼最終回到

ObservableOnSubscribe

subscribe

:

new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
}           
  • subscribe

    中,調用到

    CreateEmitter

    類的

    onNext 、onComplete、onError

    方法,将資料發送

    CreateEmitter

    中的

    觀察者

到此,“這段不涉及操作符和線程切換的簡單例子” 的代碼跟蹤結束。

源碼閱讀——線程切換 (二)

從這段線程切換的簡單例子開始:

// 建立觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        // 訂閱線程  訂閱的那一刻在訂閱線程中執行
    }

    @Override
    public void onNext(String o) {
        // Android 主線程中執行
    }

    @Override
    public void onError(@NonNull Throwable e) {
        // Android 主線程中執行
    }

    @Override
    public void onComplete() {
        // Android 主線程中執行
    }
};

// 建立被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO線程中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
// 被觀察者 IO 線程
observable = observable.subscribeOn(Schedulers.io());
// 觀察者  Android主線程
observable = observable.observeOn(AndroidSchedulers.mainThread());
// 訂閱
observable.subscribe(observer);           

先來個我總結的RxJava2的整個代碼執行流程:

一文詳解 RxJava2 使用及實作原理

a、Observable.create(ObservableOnSubscribe source)

在 源碼閱讀——簡單例子 (一) 中我們了解到了

Observable.create(ObservableOnSubscribe<T> source)

實際是 如下代碼:

//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO線程中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
           
  • ObservableCreate

    中含有一個

    subscribeActual(observer)

    方法,用于執行傳入觀察者的

    observer.onSubscribe

    方法,和間接調用 觀察者的

    onNext、onComplete

    等方法;

ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代碼 ...
}           
  • subscribeActual

    方法第二行,調用了傳入的觀察者的

    observer.onSubscribe(parent);

    方法; 訂閱發生時,在訂閱線程主動執行了

    observer

    onSubscribe

  • subscribeActual

    方法第四行,調用了傳入的觀察者的

    observer.subscribe

    subscribe

    CreateEmitter.onNext

  • CreateEmitter

    ObservableCreate.subscribeActual(Observer<? super T> observer)

    Observer

  • CreateEmitter

    observer

    onNext

    CreateEmitter.onNext

下邊檢視observable.subscribeOn(Schedulers.io())相關代碼

注:

ObservableEmitter

CreateEmitter

的引用,是對

Observer

的進一步封裝。

CreateEmitter

在執行

onNext

時,如果任務取消,則不再回調

Observer

onNext

b、observable.subscribeOn(Schedulers.io())

下邊我們檢視

Observable

subscribeOn(Scheduler scheduler)

Observable.java

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    // 生成一個ObservableSubscribeOn對象
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}           
  • 繼續忽略

    RxJavaPlugins

  • 最終傳回一個

    ObservableSubscribeOn

    對象

Observable observable = observableCreate.subscribeOn(Schedulers.io())

代碼實際是

ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())           
  • observable.subscribeOn(Schedulers.io())

    傳回的是一個

    ObservableSubscribeOn

    的引用

下邊檢視ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    // ... 省略部分代碼
}           

看一下

ObservableSubscribeOn

subscribeActual

  • subscribeActual

    方法第二行代碼中,執行了傳入

    Observer

    onSubscribe

  • subscribeActual

    方法第三行: 在

    scheduler

    IO線程

    中,執行

    observableCreate

    subscribe

    方法,傳入參數為

    SubscribeOnObserver

    ,即:

    IO線程中

    執行

    observableCreate.subscribe(new SubscribeOnObserver(observer));

是以,無論

ObservableSubscribeOn.subscribeActual(observer)

在哪個線程中被調用

observableCreate.subscribe(new SubscribeOnObserver<T>(observer))

均在IO線程中執行,是以觀察者的

e.onNext("hello"); e.onComplete();

亦在IO線程中執行;

c、observable.observeOn(AndroidSchedulers.mainThread())

Observable

observeOn(Scheduler scheduler)

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
// 
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}           

這裡可以看到

Observable observable = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())

實際是:

ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);           

是以 ,

observable.observeOn(AndroidSchedulers.mainThread())

傳回的是

ObservableObserveOn

的引用。

下邊檢視ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // ... 省略部分代碼
}           

ObservableObserveOn

subscribeActual

  • subscribeActual

    方法第五行代碼,實際為

    observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

  • ObserveOnObserver

    的作用是在

    ObserveOnObserver

    onNext

    方法被實行時;将

    observer

    onNext

    方法post到

    Android主線程

    中;

d、observable.subscribe(observer)

  • 我們知道

    Observable

    subscribe(Observer<? super T> observer)

    方法,實際調用到了

    Observable

    subscribeActual(Observer<? super T> observer)

  • 而這裡的

    observable

    ObservableObserveOn

    的引用;

是以,

observable.subscribe(observer)

實際執行的是

observableObserveOn.subscribeActual(observer)

到這裡,我們 線程切換 (二) 的小例子變換為了以下代碼:

// 建立觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        // 訂閱線程  訂閱的那一刻在訂閱線程中執行
    }

    @Override
    public void onNext(String o) {
        // Android 主線程中執行
    }

    @Override
    public void onError(@NonNull Throwable e) {
        // Android 主線程中執行
    }

    @Override
    public void onComplete() {
        // Android 主線程中執行
    }
};
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO線程中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
//
ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
//
ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
//
observableObserveOn.subscribeActual(observer);           

observableObserveOn.subscribeActual(observer)

ObservableObserveOn.java

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        // source 為 observableSubscribeOn
        super(source);
        // scheduler 為AndroidSchedulers.mainThread()
        this.scheduler = scheduler;
        // false
        this.delayError = delayError;
        // 128
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // AndroidSchedulers.mainThread() 為 HandlerScheduler,是以會走到else部分代碼
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        }
        // 代碼會走到else 部分
         else {
            Scheduler.Worker w = scheduler.createWorker();
            // source 為 observableSubscribeOn
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // ... 省略部分代碼
}           
  • subscribeActual

    方法中,

    AndroidSchedulers.mainThread()

    HandlerScheduler

    ,是以 if 中的判斷語句直接忽略,直接走到代碼的 else 部分。
  • subscribeActual

    方法中,将觀察者

    observer

    封裝成了

    ObserveOnObserver

    ;并且調用

    observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))

  • observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))

ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
// 1、“訂閱線程中” —— 執行onSubscribe, 實際執行的是observer的onSubscribe方法
observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver));
// 2、“IO程中” —— 執行subscribe ;IO線程 subscribe方法中,使用者主動調用ObserveOnObserver的onNext、onError、onComplete方法,将資料發出去
observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))           
  • 使用者調用

    SubscribeOnObserver

    onNext

    是将資料發送出去
  • SubscribeOnObserver.onNext

    調用了

    observeOnObserver.onNext

  • observeOnObserver.onNext

    通過

    HandlerScheduler

    observer.onNext、observer.onError、observer.onComplete

    等方法post到Android主線程中執行。

e、整體流程圖如下

最後總結一下RxJava2的整個執行流程:

一文詳解 RxJava2 使用及實作原理

參考

手把手教你使用 RxJava 2.0(一) RxJava2 源碼解析(一) RxJava2 源碼解析——流程

= THE END =

文章首發于公衆号”CODING技術小館“,如果文章對您有幫助,可關注我的公衆号。

繼續閱讀