前言
RxJava
事件的發出和消費都在同一個線程,基于同步的觀察者模式。觀察者模式的核心是背景處理,前台回調的異步機制。要實作異步,需要引入
RxJava
的另一個概念 - 線程排程器
Scheduler
。
正文
在不指定線程的情況下,
RxJava
遵循的是線程不變的原則。即在哪個線程調用
subscribe()
方法,就在哪個線程生産事件;在哪個線程生産事件,就在哪個線程消費事件。如果需要切換線程,就需要用到線程排程器
Scheduler
。
1. 幾種Scheduler介紹
在
RxJava
中,
Scheduler
- 排程器,相當于線程控制器,
RxJava
通過它來指定每一段代碼應該運作在什麼樣的線程。
RxJava
已經内置了幾個
Scheduler
,它們已經适合大多數的使用場景:
- Schedulers.immediate()
直接在目前線程運作,相當于不指定線程。這是預設的
Scheduler
。
- Schedulers.newThread()
總是啟用新線程,并在新線程執行操作。
- Schedulers.io()
I/O
操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的
Scheduler
。行為模式和
newThread()
差不多,差別在于
io()
内部采用的是一個無數量上限的線程池,可以重用空閑的線程。是以多數情況下
io()
比
newThread()
更有效率。
注意:不要把計算任務放在 io()
中,可以避免建立不必要的線程。
- Schedulers.computation()
計算任務所使用的
Scheduler
。這個計算指的是
CPU
密集型計算,即不會被
I/O
等操作限制性能的操作,例如圖形的計算。這個
Scheduler
使用的固定的線程池,大小為
CPU
核數。
注意:不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
- AndroidSchedulers.mainThread()
Android
還有一個專用的
AndroidSchedulers.mainThread()
,它指定的操作将在
Android
主線程運作。
2. Scheduler的線程切換
2.1. 單次線程切換
有了這幾個
Scheduler
,就可以使用
subscribeOn()
和
observeOn()
兩個方法來對線程進行控制了。
*
subscribeOn()
: 指定
subscribe()
所發生的線程,即
Observable.OnSubscribe
被激活時所處的線程,或者叫做事件産生的線程。
-
: 指定observeOn()
所運作在的線程,或者叫做事件消費的線程。Subscriber
直接看代碼:
Observable.just(, , , )
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
上面這段代碼中,由于
subscribeOn(Schedulers.io())
的指定,被建立的事件的内容
1
、
2
、
3
、
4
将會在
IO
線程發出;由于
observeOn(AndroidScheculers.mainThread())
的指定,是以
subscriber
數字的列印将發生在主線程。
事實上,這種使用方式非常常見,它适用于多數的 『背景線程取資料,主線程顯示』的程式政策。
以下是一個完整的例子:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
// 指定事件發出,即圖檔讀取發生在 IO 線程
.subscribeOn(Schedulers.io())
// 指定事件消費 - 回調,即頁面圖檔渲染發生在主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
這樣的好處是,加載圖檔的過程發生在
IO
線程,而設定圖檔則發生在了主線程。這就意味着,即使加載圖檔耗費了幾十甚至幾百毫秒的時間,也不會造成界面上的絲毫卡頓。
2.2. 多次線程切換
上面介紹到可以利用
subscribeOn()
結合
observeOn()
來實作線程控制,讓事件的産生和消費發生在不同的線程。在了解了
map()
和
flatMap()
等變換方法後,一個問題就産生了 - 能不能多切換幾次線程?
因為
observeOn()
指定的是
Subscriber
的線程,而這個
Subscriber
并不是
subscribe()
參數中的
Subscriber
,而是
observeOn()
執行時,目前
Observable
所對應的
Subscriber
,即它的直接下級
Subscriber
。
也就是說,observeOn() 指定的是它之後的操作所在的線程。是以如果有多次切換線程的需求,隻要在每個想要切換線程的位置調用一次 observeOn() 即可。
直接檢視示例代碼:
Observable.just(, , , )
// 事件發出的 IO 線程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
// 新線程,由 observeOn() 指定
.observeOn(Schedulers.newThread())
.map(mapOperator)
// IO 線程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2)
// Android 主線程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber);
上面的代碼,通過
observeOn()
的多次調用,程式實作了線程的多次切換。不過,不同于
observeOn()
的是,
subscribeOn()
的位置放在哪裡都可以,但它是隻能調用一次的。
3. Scheduler的實作原理
其實,
subscribeOn()
和
observeOn()
的内部實作,也是用的
lift()
(見上文),具體看圖(不同顔色的箭頭表示不同的線程):
- subscribeOn()的原理圖
從圖中可以看出,
subscribeOn()
進行了線程切換的工作(圖中的
schedule...
的位置)。
subscribeOn()
的線程切換發生在
OnSubscribe
中,即在它通知上一級
OnSubscribe
時,這時事件還沒有開始發送,是以
subscribeOn()
的線程控制隻能在事件發出的開端造成影響,即隻允許一次線程切換。
- observeOn()的原理圖
從圖中可以看出,和
observeOn()
進行了線程切換的工作(圖中的
schedule...
的位置)。
observeOn()
的線程切換則發生在它内建的
Subscriber
中,即發生在它即将給下一級
Subscriber
發送事件時,是以
observeOn()
控制的是它後面的線程,允許多次線程切換。
- 混合切換原理圖
最後用一張圖來解釋當多個
subscribeOn()
和
observeOn()
混合使用時,線程排程是怎麼發生的:
圖中共有
5
處對事件的操作,由圖中可以看出:
- ① 和 ② 兩處受第一個
影響,運作在紅色線程;subscribeOn()
- ③ 和 ④ 處受第一個
的影響,運作在綠色線程;observeOn()
- ⑤ 處受第二個
影響,運作在紫色線程;onserveOn()
- 而第二個
,由于在通知過程中線程就被第一個subscribeOn()
截斷,是以對整個流程并沒有任何影響。subscribeOn()
注意:當使用了多個 subscribeOn() 的時候,隻有第一個 subscribeOn() 起作用。
4. 延伸拓展
雖然超過一個的
subscribeOn()
對事件處理的流程沒有影響,但在流程之前卻是有用的。在前面的文章介紹
Subscriber
的時候,提到過
Subscriber
的
onStart()
可以用作流程開始前的初始化處理。
由于 onStart() 在 subscribe() 發生時就被調用了,是以不能指定線程,而是隻能執行在 subscribe() 被調用時的線程。這就導緻如果 onStart() 中含有對線程有要求的代碼(例如:在界面上顯示一個 ProgressBar,這必須在主線程執行),将會有線程非法的風險,因為無法預測 subscribe() 會在什麼線程執行。
與
Subscriber.onStart()
相對應的,有一個方法
Observable.doOnSubscribe()
。它和
Subscriber.onStart()
同樣是在
subscribe()
調用後而且在事件發送前執行,但差別在于它可以指定線程。預設情況下,
doOnSubscribe()
執行在
subscribe()
發生的線程;而如果在
doOnSubscribe()
之後有
subscribeOn()
的話,它将執行在離它最近的
subscribeOn()
所指定的線程。
示例代碼如下:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
// 需要在主線程執行
progressBar.setVisibility(View.VISIBLE);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
// 指定主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
上面的代碼,在
doOnSubscribe()
的後面跟一個
subscribeOn()
,就能指定特定工作的線程了!
小結
RxJava
的提供的各種事件及事件轉換模型,以及基于轉換的線程排程器,結合觀察者模式,使得
RxJava
在異步程式設計體驗、靈活性和運作效率上領先于其他的開源架構!
歡迎關注技術公衆号: 零壹技術棧
本帳号将持續分享後端技術幹貨,包括虛拟機基礎,多線程程式設計,高性能架構,異步、緩存和消息中間件,分布式和微服務,架構學習和進階等學習資料和文章。