天天看點

Android異步架構RxJava 1.x系列(三) - 線程排程器Scheduler前言正文小結

前言

RxJava

事件的發出和消費都在同一個線程,基于同步的觀察者模式。觀察者模式的核心是背景處理,前台回調的異步機制。要實作異步,需要引入

RxJava

的另一個概念 - 線程排程器

Scheduler

Android異步架構RxJava 1.x系列(三) - 線程排程器Scheduler前言正文小結

正文

在不指定線程的情況下,

RxJava

遵循的是線程不變的原則。即在哪個線程調用

subscribe()

方法,就在哪個線程生産事件;在哪個線程生産事件,就在哪個線程消費事件。如果需要切換線程,就需要用到線程排程器

Scheduler

1. 幾種Scheduler介紹

RxJava

中,

Scheduler

- 排程器,相當于線程控制器,

RxJava

通過它來指定每一段代碼應該運作在什麼樣的線程。

RxJava

已經内置了幾個

Scheduler

,它們已經适合大多數的使用場景:

  • Schedulers.immediate()

直接在目前線程運作,相當于不指定線程。這是預設的

Scheduler

  • Schedulers.newThread()

總是啟用新線程,并在新線程執行操作。

  • Schedulers.io()

I/O

操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的

Scheduler

。行為模式和

newThread()

差不多,差別在于

io()

内部采用的是一個無數量上限的線程池,可以重用空閑的線程。是以多數情況下

io()

newThread()

更有效率。

注意:不要把計算任務放在

io()

中,可以避免建立不必要的線程。
  • Schedulers.computation()

計算任務所使用的

Scheduler

。這個計算指的是

CPU

密集型計算,即不會被

I/O

等操作限制性能的操作,例如圖形的計算。這個

Scheduler

使用的固定的線程池,大小為

CPU

核數。

注意:不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
  • AndroidSchedulers.mainThread()

Android

還有一個專用的

AndroidSchedulers.mainThread()

,它指定的操作将在

Android

主線程運作。

2. Scheduler的線程切換

2.1. 單次線程切換

有了這幾個

Scheduler

,就可以使用

subscribeOn()

observeOn()

兩個方法來對線程進行控制了。

*

subscribeOn()

: 指定

subscribe()

所發生的線程,即

Observable.OnSubscribe

被激活時所處的線程,或者叫做事件産生的線程。

  • observeOn()

    : 指定

    Subscriber

    所運作在的線程,或者叫做事件消費的線程。

直接看代碼:

Observable.just(, , , )
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });
           

上面這段代碼中,由于

subscribeOn(Schedulers.io())

的指定,被建立的事件的内容

1

2

3

4

将會在

IO

線程發出;由于

observeOn(AndroidScheculers.mainThread())

的指定,是以

subscriber

數字的列印将發生在主線程。

事實上,這種使用方式非常常見,它适用于多數的 『背景線程取資料,主線程顯示』的程式政策。

以下是一個完整的例子:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
// 指定事件發出,即圖檔讀取發生在 IO 線程
.subscribeOn(Schedulers.io())
// 指定事件消費 - 回調,即頁面圖檔渲染發生在主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});
           

這樣的好處是,加載圖檔的過程發生在

IO

線程,而設定圖檔則發生在了主線程。這就意味着,即使加載圖檔耗費了幾十甚至幾百毫秒的時間,也不會造成界面上的絲毫卡頓。

2.2. 多次線程切換

上面介紹到可以利用

subscribeOn()

結合

observeOn()

來實作線程控制,讓事件的産生和消費發生在不同的線程。在了解了

map()

flatMap()

等變換方法後,一個問題就産生了 - 能不能多切換幾次線程?

因為

observeOn()

指定的是

Subscriber

的線程,而這個

Subscriber

并不是

subscribe()

參數中的

Subscriber

,而是

observeOn()

執行時,目前

Observable

所對應的

Subscriber

,即它的直接下級

Subscriber

也就是說,observeOn() 指定的是它之後的操作所在的線程。是以如果有多次切換線程的需求,隻要在每個想要切換線程的位置調用一次 observeOn() 即可。

直接檢視示例代碼:

Observable.just(, , , ) 
    // 事件發出的 IO 線程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    // 新線程,由 observeOn() 指定
    .observeOn(Schedulers.newThread())
    .map(mapOperator) 
    // IO 線程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) 
    // Android 主線程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread)
    .subscribe(subscriber);
           

上面的代碼,通過

observeOn()

的多次調用,程式實作了線程的多次切換。不過,不同于

observeOn()

的是,

subscribeOn()

的位置放在哪裡都可以,但它是隻能調用一次的。

3. Scheduler的實作原理

其實,

subscribeOn()

observeOn()

的内部實作,也是用的

lift()

(見上文),具體看圖(不同顔色的箭頭表示不同的線程):

  • subscribeOn()的原理圖
Android異步架構RxJava 1.x系列(三) - 線程排程器Scheduler前言正文小結

從圖中可以看出,

subscribeOn()

進行了線程切換的工作(圖中的

schedule...

的位置)。

subscribeOn()

的線程切換發生在

OnSubscribe

中,即在它通知上一級

OnSubscribe

時,這時事件還沒有開始發送,是以

subscribeOn()

的線程控制隻能在事件發出的開端造成影響,即隻允許一次線程切換。

  • observeOn()的原理圖
Android異步架構RxJava 1.x系列(三) - 線程排程器Scheduler前言正文小結

從圖中可以看出,和

observeOn()

進行了線程切換的工作(圖中的

schedule...

的位置)。

observeOn()

的線程切換則發生在它内建的

Subscriber

中,即發生在它即将給下一級

Subscriber

發送事件時,是以

observeOn()

控制的是它後面的線程,允許多次線程切換。

  • 混合切換原理圖

最後用一張圖來解釋當多個

subscribeOn()

observeOn()

混合使用時,線程排程是怎麼發生的:

Android異步架構RxJava 1.x系列(三) - 線程排程器Scheduler前言正文小結

圖中共有

5

處對事件的操作,由圖中可以看出:

  1. ① 和 ② 兩處受第一個

    subscribeOn()

    影響,運作在紅色線程;
  2. ③ 和 ④ 處受第一個

    observeOn()

    的影響,運作在綠色線程;
  3. ⑤ 處受第二個

    onserveOn()

    影響,運作在紫色線程;
  4. 而第二個

    subscribeOn()

    ,由于在通知過程中線程就被第一個

    subscribeOn()

    截斷,是以對整個流程并沒有任何影響。
注意:當使用了多個 subscribeOn() 的時候,隻有第一個 subscribeOn() 起作用。

4. 延伸拓展

雖然超過一個的

subscribeOn()

對事件處理的流程沒有影響,但在流程之前卻是有用的。在前面的文章介紹

Subscriber

的時候,提到過

Subscriber

onStart()

可以用作流程開始前的初始化處理。

由于 onStart() 在 subscribe() 發生時就被調用了,是以不能指定線程,而是隻能執行在 subscribe() 被調用時的線程。這就導緻如果 onStart() 中含有對線程有要求的代碼(例如:在界面上顯示一個 ProgressBar,這必須在主線程執行),将會有線程非法的風險,因為無法預測 subscribe() 會在什麼線程執行。

Subscriber.onStart()

相對應的,有一個方法

Observable.doOnSubscribe()

。它和

Subscriber.onStart()

同樣是在

subscribe()

調用後而且在事件發送前執行,但差別在于它可以指定線程。預設情況下,

doOnSubscribe()

執行在

subscribe()

發生的線程;而如果在

doOnSubscribe()

之後有

subscribeOn()

的話,它将執行在離它最近的

subscribeOn()

所指定的線程。

示例代碼如下:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            // 需要在主線程執行
            progressBar.setVisibility(View.VISIBLE); 
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) 
    // 指定主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);
           

上面的代碼,在

doOnSubscribe()

的後面跟一個

subscribeOn()

,就能指定特定工作的線程了!

小結

RxJava

的提供的各種事件及事件轉換模型,以及基于轉換的線程排程器,結合觀察者模式,使得

RxJava

在異步程式設計體驗、靈活性和運作效率上領先于其他的開源架構!

歡迎關注技術公衆号: 零壹技術棧

Android異步架構RxJava 1.x系列(三) - 線程排程器Scheduler前言正文小結

本帳号将持續分享後端技術幹貨,包括虛拟機基礎,多線程程式設計,高性能架構,異步、緩存和消息中間件,分布式和微服務,架構學習和進階等學習資料和文章。