天天看點

Android函數響應式程式設計最新RxJava-線程控制

Android函數響應式程式設計最新RxJava-線程控制

Android函數響應式程式設計最新RxJava-線程控制

推薦文章:

Android函數響應式程式設計最新RxJava-基本用法

Android函數響應式程式設計最新RxJava-操作符入門(1)

Android函數響應式程式設計最新RxJava-操作符入門(2)

Android函數響應式程式設計最新RxJava-線程控制

線程控制-Scheduler

在不指定線程的情況下,RxJava遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生産事件;在哪個線程生産事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (排程器)。

Rxjava内置了5個Scheduler:

  • Schedulers.immediate():表示直接在目前線程運作,這是預設的Scheduler。
  • Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。
  • Schedulers.io():I/O操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程。
  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
  • Schedulers.trampoline();  表示在目前的線程執行,但并不是馬上執行,用trampoline();方法可以将它存入隊列,排程器按順序執行。
  • Android還有一個專用的AndroidSchedulers.mainThread(),是RxAndroid庫中提供的Scheduler,它指定的操作将在 Android 主線程運作。
Android函數響應式程式設計最新RxJava-線程控制

線程切換-subscribeOn和observeOn

線程的切換是通過subscribeOn和observeOn來完成的。

  • subscribeOn(): 指定 subscribe() 所發生的線程。
  • observeOn(): 指定 Subscriber 所運作在的線程。
Android函數響應式程式設計最新RxJava-線程控制

執行個體

下面拿skip操作符為例:

代碼如下:

(1).skip操作符是把Observable發射的資料過濾掉前N項。

Observable.just(1,2,3,4,5).skip(2).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {             @Override             public void onSubscribe(@NonNull Disposable d) {            }             @Override             public void onNext(@NonNull Integer integer) {                 Log.e(TAG,integer+"");             }             @Override             public void onError(@NonNull Throwable e) {             }             @Override             public void onComplete() {             }         });      

上述代碼中subscribeOn(Schedulers.io())表示事件的建立在IO線程中。observeOn(AndroidSchedulers.mainThread())表示事件的運作在主線程,也就是列印在主線程。

(2).startWith操作符是在發射的資料前插入一條資料。

Observable.just(4,5).startWith(1).subscribeOn(Schedulers.computation()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {             @Override             public void onSubscribe(@NonNull Disposable d) {             }             @Override             public void onNext(@NonNull Integer integer) {                 Log.e(TAG,""+integer);             }             @Override             public void onError(@NonNull Throwable e) {             }             @Override             public void onComplete() {            }         });      

上述代碼中subscribeOn(Schedulers.computation())計算所使用的 Scheduler。