Android函數響應式程式設計最新RxJava-線程控制

推薦文章:
Android函數響應式程式設計最新RxJava-基本用法
Android函數響應式程式設計最新RxJava-操作符入門(1)
Android函數響應式程式設計最新RxJava-操作符入門(2)
線程控制-Scheduler
在不指定線程的情況下,RxJava遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生産事件;在哪個線程生産事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (排程器)。
Rxjava内置了5個Scheduler:
- Schedulers.immediate():表示直接在目前線程運作,這是預設的Scheduler。
- Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。
- Schedulers.io():I/O操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程。
- Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
- Schedulers.trampoline(); 表示在目前的線程執行,但并不是馬上執行,用trampoline();方法可以将它存入隊列,排程器按順序執行。
- Android還有一個專用的AndroidSchedulers.mainThread(),是RxAndroid庫中提供的Scheduler,它指定的操作将在 Android 主線程運作。
線程切換-subscribeOn和observeOn
線程的切換是通過subscribeOn和observeOn來完成的。
- subscribeOn(): 指定 subscribe() 所發生的線程。
- observeOn(): 指定 Subscriber 所運作在的線程。
執行個體
下面拿skip操作符為例:
代碼如下:
(1).skip操作符是把Observable發射的資料過濾掉前N項。
Observable.just(1,2,3,4,5).skip(2).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { Log.e(TAG,integer+""); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
上述代碼中subscribeOn(Schedulers.io())表示事件的建立在IO線程中。observeOn(AndroidSchedulers.mainThread())表示事件的運作在主線程,也就是列印在主線程。
(2).startWith操作符是在發射的資料前插入一條資料。
Observable.just(4,5).startWith(1).subscribeOn(Schedulers.computation()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { Log.e(TAG,""+integer); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
上述代碼中subscribeOn(Schedulers.computation())計算所使用的 Scheduler。