本文将有几个例子说明,rxjava线程调度的正确使用姿势。
例1
<col>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<code>observable</code>
<code>.create(</code><code>new</code> <code>observable.onsubscribe<string>() {</code>
<code>@override</code>
<code>public</code> <code>void</code> <code>call(subscriber<? </code><code>super</code> <code>string> subscriber) {</code>
<code>logger.v( </code><code>"rx_call"</code> <code>, thread.currentthread().getname() );</code>
<code>subscriber.onnext( </code><code>"dd"</code><code>);</code>
<code>subscriber.oncompleted();</code>
<code>}</code>
<code>})</code>
<code>.map(</code><code>new</code> <code>func1<string, string >() {</code>
<code>public</code> <code>string call(string s) {</code>
<code>logger.v( </code><code>"rx_map"</code> <code>, thread.currentthread().getname() );</code>
<code>return</code> <code>s + </code><code>"88"</code><code>;</code>
<code>.subscribe(</code><code>new</code> <code>action1<string>() {</code>
<code>public</code> <code>void</code> <code>call(string s) {</code>
<code>logger.v( </code><code>"rx_subscribe"</code> <code>, thread.currentthread().getname() );</code>
<code>}) ;</code>
结果
/rx_call: main -- 主线程 /rx_map: main -- 主线程 /rx_subscribe: main -- 主线程
例2
24
25
26
27
28
29
30
31
32
33
34
<code>new</code> <code>thread(</code><code>new</code> <code>runnable() {</code>
<code>public</code> <code>void</code> <code>run() {</code>
<code>logger.v( </code><code>"rx_newthread"</code> <code>, thread.currentthread().getname() );</code>
<code>rx();</code>
<code>}).start();</code>
<code>void</code> <code>rx(){</code>
结果
/rx_newthread: thread-564 -- 子线程 /rx_call: thread-564 -- 子线程 /rx_map: thread-564 -- 子线程 /rx_subscribe: thread-564 -- 子线程
通过例1和例2,说明,rxjava默认运行在当前线程中。如果当前线程是子线程,则rxjava运行在子线程;同样,当前线程是主线程,则rxjava运行在主线程
例3
<code>.subscribeon(schedulers.io())</code>
<code>.observeon(androidschedulers.mainthread())</code>
/rx_call: rxcachedthreadscheduler-1 --io线程 /rx_map: main --主线程 /rx_subscribe: main --主线程
例4
<code>}) ; </code>
/rx_call: rxcachedthreadscheduler-1 --io线程 /rx_map: rxcachedthreadscheduler-1 --io线程
通过例3、例4 可以看出 .subscribeon(schedulers.io())
和 .observeon(androidschedulers.mainthread()) 写的位置不一样,造成的结果也不一样。从例4中可以看出
map() 操作符默认运行在事件产生的线程之中。事件消费只是在 subscribe() 里面。
对于 create() , just() , from() 等 --- 事件产生
map() , flapmap() , scan() , filter() 等 -- 事件加工
subscribe() -- 事件消费
事件产生:默认运行在当前线程,可以由 subscribeon() 自定义线程
事件加工:默认跟事件产生的线程保持一致, 可以由 observeon() 自定义线程
事件消费:默认运行在当前线程,可以有observeon() 自定义
例5 多次切换线程
35
36
37
38
39
40
<code>.observeon( schedulers.newthread() ) </code><code>//新线程</code>
<code>.observeon( schedulers.io() ) </code><code>//io线程</code>
<code>.filter(</code><code>new</code> <code>func1<string, boolean>() {</code>
<code>public</code> <code>boolean call(string s) {</code>
<code>logger.v( </code><code>"rx_filter"</code> <code>, thread.currentthread().getname() );</code>
<code>return</code> <code>s != </code><code>null</code> <code>;</code>
<code>.subscribeon(schedulers.io()) </code><code>//定义事件产生线程:io线程</code>
<code>.observeon(androidschedulers.mainthread()) </code><code>//事件消费线程:主线程</code>
/rx_call: rxcachedthreadscheduler-1 -- io 线程 /rx_map: rxnewthreadscheduler-1 -- new出来的线程 /rx_filter: rxcachedthreadscheduler-2 -- io线程 /rx_subscribe: main -- 主线程
例6:只规定了事件产生的线程
<code>log.v( </code><code>"rx--create "</code> <code>, thread.currentthread().getname() ) ;</code>
<code>subscriber.onnext( </code><code>"dd"</code> <code>) ;</code>
<code>log.v( </code><code>"rx--subscribe "</code> <code>, thread.currentthread().getname() ) ;</code>
/rx--create: rxcachedthreadscheduler-4 // io 线程 /rx--subscribe: rxcachedthreadscheduler-4 // io 线程
例:7:只规定事件消费线程
<code>.observeon( schedulers.newthread() )</code>
/rx--create: main -- 主线程 /rx--subscribe: rxnewthreadscheduler-1 -- new 出来的子线程
从例6可以看出,如果只规定了事件产生的线程,那么事件消费线程将跟随事件产生线程。
从例7可以看出,如果只规定了事件消费的线程,那么事件产生的线程和 当前线程保持一致。
例8:线程调度封装
在android 常常有这样的场景,后台处理处理数据,前台展示数据。
一般的用法:
<code>.just( </code><code>"123"</code> <code>)</code>
<code>.subscribeon( schedulers.io())</code>
<code>.observeon( androidschedulers.mainthread() )</code>
<code>.subscribe(</code><code>new</code> <code>action1() {</code>
<code>public</code> <code>void</code> <code>call(object o) {</code>
但是项目中这种场景有很多,所以我们就想能不能把这种场景的调度方式封装起来,方便调用。
简单的封装
<code>public</code> <code>observable apply( observable observable ){</code>
<code>return</code> <code>observable.subscribeon( schedulers.io() )</code>
<code>.observeon( androidschedulers.mainthread() ) ;</code>
使用
<code>apply( observable.just( </code><code>"123"</code> <code>) )</code>
弊端:虽然上面的这种封装可以做到线程调度的目的,但是它破坏了链式编程的结构,是编程风格变得不优雅。
改进:transformers 的使用(就是转化器的意思,把一种类型的observable转换成另一种类型的observable )
改进后的封装
<code>observable.transformer schedulerstransformer = </code><code>new</code> <code>observable.transformer() {</code>
<code>@override</code> <code>public</code> <code>object call(object observable) {</code>
<code>return</code> <code>((observable) observable).subscribeon(schedulers.newthread())</code>
<code>.observeon(androidschedulers.mainthread());</code>
<code>};</code>
使用
<code>.compose( schedulerstransformer )</code>
弊端:虽然保持了链式编程结构的完整,但是每次调用 .compose( schedulerstransformer ) 都是 new 了一个对象的。所以我们需要再次封装,尽量保证单例的模式。
<code>package</code> <code>lib.app.com.myapplication;</code>
<code>import</code> <code>rx.observable;</code>
<code>import</code> <code>rx.android.schedulers.androidschedulers;</code>
<code>import</code> <code>rx.schedulers.schedulers;</code>
<code>/**</code>
<code>* created by ${zyj} on 2016/7/1.</code>
<code>*/</code>
<code>public</code> <code>class</code> <code>rxutil {</code>
<code>private</code> <code>final</code> <code>static</code> <code>observable.transformer schedulerstransformer = </code><code>new</code> <code>observable.transformer() {</code>
<code>public</code> <code>static</code> <code><t> observable.transformer<t, t> applyschedulers() {</code>
<code>return</code> <code>(observable.transformer<t, t>) schedulerstransformer;</code>
<code>.compose( rxutil.<string>applyschedulers() )</code>