本文将有幾個例子說明,rxjava線程排程的正确使用姿勢。
例1
<col>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<code>observable</code>
<code>.create(</code><code>new</code> <code>observable.onsubscribe<string>() {</code>
<code>@override</code>
<code>public</code> <code>void</code> <code>call(subscriber<? </code><code>super</code> <code>string> subscriber) {</code>
<code>logger.v( </code><code>"rx_call"</code> <code>, thread.currentthread().getname() );</code>
<code>subscriber.onnext( </code><code>"dd"</code><code>);</code>
<code>subscriber.oncompleted();</code>
<code>}</code>
<code>})</code>
<code>.map(</code><code>new</code> <code>func1<string, string >() {</code>
<code>public</code> <code>string call(string s) {</code>
<code>logger.v( </code><code>"rx_map"</code> <code>, thread.currentthread().getname() );</code>
<code>return</code> <code>s + </code><code>"88"</code><code>;</code>
<code>.subscribe(</code><code>new</code> <code>action1<string>() {</code>
<code>public</code> <code>void</code> <code>call(string s) {</code>
<code>logger.v( </code><code>"rx_subscribe"</code> <code>, thread.currentthread().getname() );</code>
<code>}) ;</code>
結果
/rx_call: main -- 主線程 /rx_map: main -- 主線程 /rx_subscribe: main -- 主線程
例2
24
25
26
27
28
29
30
31
32
33
34
<code>new</code> <code>thread(</code><code>new</code> <code>runnable() {</code>
<code>public</code> <code>void</code> <code>run() {</code>
<code>logger.v( </code><code>"rx_newthread"</code> <code>, thread.currentthread().getname() );</code>
<code>rx();</code>
<code>}).start();</code>
<code>void</code> <code>rx(){</code>
結果
/rx_newthread: thread-564 -- 子線程 /rx_call: thread-564 -- 子線程 /rx_map: thread-564 -- 子線程 /rx_subscribe: thread-564 -- 子線程
通過例1和例2,說明,rxjava預設運作在目前線程中。如果目前線程是子線程,則rxjava運作在子線程;同樣,目前線程是主線程,則rxjava運作在主線程
例3
<code>.subscribeon(schedulers.io())</code>
<code>.observeon(androidschedulers.mainthread())</code>
/rx_call: rxcachedthreadscheduler-1 --io線程 /rx_map: main --主線程 /rx_subscribe: main --主線程
例4
<code>}) ; </code>
/rx_call: rxcachedthreadscheduler-1 --io線程 /rx_map: rxcachedthreadscheduler-1 --io線程
通過例3、例4 可以看出 .subscribeon(schedulers.io())
和 .observeon(androidschedulers.mainthread()) 寫的位置不一樣,造成的結果也不一樣。從例4中可以看出
map() 操作符預設運作在事件産生的線程之中。事件消費隻是在 subscribe() 裡面。
對于 create() , just() , from() 等 --- 事件産生
map() , flapmap() , scan() , filter() 等 -- 事件加工
subscribe() -- 事件消費
事件産生:預設運作在目前線程,可以由 subscribeon() 自定義線程
事件加工:預設跟事件産生的線程保持一緻, 可以由 observeon() 自定義線程
事件消費:預設運作在目前線程,可以有observeon() 自定義
例5 多次切換線程
35
36
37
38
39
40
<code>.observeon( schedulers.newthread() ) </code><code>//新線程</code>
<code>.observeon( schedulers.io() ) </code><code>//io線程</code>
<code>.filter(</code><code>new</code> <code>func1<string, boolean>() {</code>
<code>public</code> <code>boolean call(string s) {</code>
<code>logger.v( </code><code>"rx_filter"</code> <code>, thread.currentthread().getname() );</code>
<code>return</code> <code>s != </code><code>null</code> <code>;</code>
<code>.subscribeon(schedulers.io()) </code><code>//定義事件産生線程:io線程</code>
<code>.observeon(androidschedulers.mainthread()) </code><code>//事件消費線程:主線程</code>
/rx_call: rxcachedthreadscheduler-1 -- io 線程 /rx_map: rxnewthreadscheduler-1 -- new出來的線程 /rx_filter: rxcachedthreadscheduler-2 -- io線程 /rx_subscribe: main -- 主線程
例6:隻規定了事件産生的線程
<code>log.v( </code><code>"rx--create "</code> <code>, thread.currentthread().getname() ) ;</code>
<code>subscriber.onnext( </code><code>"dd"</code> <code>) ;</code>
<code>log.v( </code><code>"rx--subscribe "</code> <code>, thread.currentthread().getname() ) ;</code>
/rx--create: rxcachedthreadscheduler-4 // io 線程 /rx--subscribe: rxcachedthreadscheduler-4 // io 線程
例:7:隻規定事件消費線程
<code>.observeon( schedulers.newthread() )</code>
/rx--create: main -- 主線程 /rx--subscribe: rxnewthreadscheduler-1 -- new 出來的子線程
從例6可以看出,如果隻規定了事件産生的線程,那麼事件消費線程将跟随事件産生線程。
從例7可以看出,如果隻規定了事件消費的線程,那麼事件産生的線程和 目前線程保持一緻。
例8:線程排程封裝
在android 常常有這樣的場景,背景處理處理資料,前台展示資料。
一般的用法:
<code>.just( </code><code>"123"</code> <code>)</code>
<code>.subscribeon( schedulers.io())</code>
<code>.observeon( androidschedulers.mainthread() )</code>
<code>.subscribe(</code><code>new</code> <code>action1() {</code>
<code>public</code> <code>void</code> <code>call(object o) {</code>
但是項目中這種場景有很多,是以我們就想能不能把這種場景的排程方式封裝起來,友善調用。
簡單的封裝
<code>public</code> <code>observable apply( observable observable ){</code>
<code>return</code> <code>observable.subscribeon( schedulers.io() )</code>
<code>.observeon( androidschedulers.mainthread() ) ;</code>
使用
<code>apply( observable.just( </code><code>"123"</code> <code>) )</code>
弊端:雖然上面的這種封裝可以做到線程排程的目的,但是它破壞了鍊式程式設計的結構,是程式設計風格變得不優雅。
改進:transformers 的使用(就是轉化器的意思,把一種類型的observable轉換成另一種類型的observable )
改進後的封裝
<code>observable.transformer schedulerstransformer = </code><code>new</code> <code>observable.transformer() {</code>
<code>@override</code> <code>public</code> <code>object call(object observable) {</code>
<code>return</code> <code>((observable) observable).subscribeon(schedulers.newthread())</code>
<code>.observeon(androidschedulers.mainthread());</code>
<code>};</code>
使用
<code>.compose( schedulerstransformer )</code>
弊端:雖然保持了鍊式程式設計結構的完整,但是每次調用 .compose( schedulerstransformer ) 都是 new 了一個對象的。是以我們需要再次封裝,盡量保證單例的模式。
<code>package</code> <code>lib.app.com.myapplication;</code>
<code>import</code> <code>rx.observable;</code>
<code>import</code> <code>rx.android.schedulers.androidschedulers;</code>
<code>import</code> <code>rx.schedulers.schedulers;</code>
<code>/**</code>
<code>* created by ${zyj} on 2016/7/1.</code>
<code>*/</code>
<code>public</code> <code>class</code> <code>rxutil {</code>
<code>private</code> <code>final</code> <code>static</code> <code>observable.transformer schedulerstransformer = </code><code>new</code> <code>observable.transformer() {</code>
<code>public</code> <code>static</code> <code><t> observable.transformer<t, t> applyschedulers() {</code>
<code>return</code> <code>(observable.transformer<t, t>) schedulerstransformer;</code>
<code>.compose( rxutil.<string>applyschedulers() )</code>