天天看点

RxJava 和 RxAndroid 五(线程调度)

本文将有几个例子说明,rxjava线程调度的正确使用姿势。

例1

<col>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

​<code>​observable​</code>​

​<code>​.create(​</code>​​<code>​new​</code>​ ​<code>​observable.onsubscribe&lt;string&gt;() {​</code>​

​<code>​@override​</code>​

​<code>​public​</code>​ ​<code>​void​</code>​ ​<code>​call(subscriber&lt;? ​</code>​​<code>​super​</code>​ ​<code>​string&gt; subscriber) {​</code>​

​<code>​logger.v( ​</code>​​<code>​"rx_call"​</code>​ ​<code>​, thread.currentthread().getname()  );​</code>​

​<code>​subscriber.onnext( ​</code>​​<code>​"dd"​</code>​​<code>​);​</code>​

​<code>​subscriber.oncompleted();​</code>​

​<code>​}​</code>​

​<code>​})​</code>​

​<code>​.map(​</code>​​<code>​new​</code>​ ​<code>​func1&lt;string, string &gt;() {​</code>​

​<code>​public​</code>​ ​<code>​string call(string s) {​</code>​

​<code>​logger.v( ​</code>​​<code>​"rx_map"​</code>​ ​<code>​, thread.currentthread().getname()  );​</code>​

​<code>​return​</code>​ ​<code>​s + ​</code>​​<code>​"88"​</code>​​<code>​;​</code>​

​<code>​.subscribe(​</code>​​<code>​new​</code>​ ​<code>​action1&lt;string&gt;() {​</code>​

​<code>​public​</code>​ ​<code>​void​</code>​ ​<code>​call(string s) {​</code>​

​<code>​logger.v( ​</code>​​<code>​"rx_subscribe"​</code>​ ​<code>​, thread.currentthread().getname()  );​</code>​

​<code>​}) ;​</code>​

  结果

/rx_call: main           -- 主线程 /rx_map: main        --  主线程 /rx_subscribe: main   -- 主线程

例2

24

25

26

27

28

29

30

31

32

33

34

​<code>​new​</code>​ ​<code>​thread(​</code>​​<code>​new​</code>​ ​<code>​runnable() {​</code>​

​<code>​public​</code>​ ​<code>​void​</code>​ ​<code>​run() {​</code>​

​<code>​logger.v( ​</code>​​<code>​"rx_newthread"​</code>​ ​<code>​, thread.currentthread().getname()  );​</code>​

​<code>​rx();​</code>​

​<code>​}).start();​</code>​

​<code>​void​</code>​ ​<code>​rx(){​</code>​

 

      结果

/rx_newthread: thread-564   -- 子线程 /rx_call: thread-564              -- 子线程 /rx_map: thread-564            -- 子线程  /rx_subscribe: thread-564    -- 子线程

通过例1和例2,说明,rxjava默认运行在当前线程中。如果当前线程是子线程,则rxjava运行在子线程;同样,当前线程是主线程,则rxjava运行在主线程

例3

​<code>​.subscribeon(schedulers.io())​</code>​

​<code>​.observeon(androidschedulers.mainthread())​</code>​

/rx_call: rxcachedthreadscheduler-1    --io线程 /rx_map: main                                     --主线程 /rx_subscribe: main                              --主线程

例4

​<code>​}) ; ​</code>​

/rx_call: rxcachedthreadscheduler-1     --io线程 /rx_map: rxcachedthreadscheduler-1   --io线程

通过例3、例4 可以看出  .subscribeon(schedulers.io())

 和 .observeon(androidschedulers.mainthread()) 写的位置不一样,造成的结果也不一样。从例4中可以看出

map() 操作符默认运行在事件产生的线程之中。事件消费只是在 subscribe() 里面。

对于 create() , just() , from()   等                 --- 事件产生   

               map() , flapmap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消费

  事件产生:默认运行在当前线程,可以由 subscribeon()  自定义线程

         事件加工:默认跟事件产生的线程保持一致, 可以由 observeon() 自定义线程

       事件消费:默认运行在当前线程,可以有observeon() 自定义

例5  多次切换线程

35

36

37

38

39

40

​<code>​.observeon( schedulers.newthread() )    ​</code>​​<code>​//新线程​</code>​

​<code>​.observeon( schedulers.io() )      ​</code>​​<code>​//io线程​</code>​

​<code>​.filter(​</code>​​<code>​new​</code>​ ​<code>​func1&lt;string, boolean&gt;() {​</code>​

​<code>​public​</code>​ ​<code>​boolean call(string s) {​</code>​

​<code>​logger.v( ​</code>​​<code>​"rx_filter"​</code>​ ​<code>​, thread.currentthread().getname()  );​</code>​

​<code>​return​</code>​ ​<code>​s != ​</code>​​<code>​null​</code>​ ​<code>​;​</code>​

​<code>​.subscribeon(schedulers.io())     ​</code>​​<code>​//定义事件产生线程:io线程​</code>​

​<code>​.observeon(androidschedulers.mainthread())     ​</code>​​<code>​//事件消费线程:主线程​</code>​

/rx_call: rxcachedthreadscheduler-1           -- io 线程 /rx_map: rxnewthreadscheduler-1             -- new出来的线程 /rx_filter: rxcachedthreadscheduler-2        -- io线程 /rx_subscribe: main                                   -- 主线程

例6:只规定了事件产生的线程

​<code>​log.v( ​</code>​​<code>​"rx--create "​</code>​ ​<code>​, thread.currentthread().getname() ) ;​</code>​

​<code>​subscriber.onnext( ​</code>​​<code>​"dd"​</code>​ ​<code>​) ;​</code>​

​<code>​log.v( ​</code>​​<code>​"rx--subscribe "​</code>​ ​<code>​, thread.currentthread().getname() ) ;​</code>​

/rx--create: rxcachedthreadscheduler-4                      // io 线程 /rx--subscribe: rxcachedthreadscheduler-4                 // io 线程

例:7:只规定事件消费线程

​<code>​.observeon( schedulers.newthread() )​</code>​

/rx--create: main                                           -- 主线程 /rx--subscribe: rxnewthreadscheduler-1        --  new 出来的子线程 

    从例6可以看出,如果只规定了事件产生的线程,那么事件消费线程将跟随事件产生线程。

    从例7可以看出,如果只规定了事件消费的线程,那么事件产生的线程和 当前线程保持一致。

例8:线程调度封装

 在android 常常有这样的场景,后台处理处理数据,前台展示数据。

一般的用法:

​<code>​.just( ​</code>​​<code>​"123"​</code>​ ​<code>​)​</code>​

​<code>​.subscribeon( schedulers.io())​</code>​

​<code>​.observeon( androidschedulers.mainthread() )​</code>​

​<code>​.subscribe(​</code>​​<code>​new​</code>​ ​<code>​action1() {​</code>​

​<code>​public​</code>​ ​<code>​void​</code>​ ​<code>​call(object o) {​</code>​

  但是项目中这种场景有很多,所以我们就想能不能把这种场景的调度方式封装起来,方便调用。

简单的封装

​<code>​public​</code>​ ​<code>​observable apply( observable observable ){​</code>​

​<code>​return​</code>​ ​<code>​observable.subscribeon( schedulers.io() )​</code>​

​<code>​.observeon( androidschedulers.mainthread() ) ;​</code>​

使用

​<code>​apply( observable.just( ​</code>​​<code>​"123"​</code>​ ​<code>​) )​</code>​

弊端:虽然上面的这种封装可以做到线程调度的目的,但是它破坏了链式编程的结构,是编程风格变得不优雅。

改进:transformers 的使用(就是转化器的意思,把一种类型的observable转换成另一种类型的observable )

改进后的封装

​<code>​observable.transformer schedulerstransformer = ​</code>​​<code>​new​</code>​  ​<code>​observable.transformer() {​</code>​

​<code>​@override​</code>​ ​<code>​public​</code>​ ​<code>​object call(object observable) {​</code>​

​<code>​return​</code>​ ​<code>​((observable)  observable).subscribeon(schedulers.newthread())​</code>​

​<code>​.observeon(androidschedulers.mainthread());​</code>​

​<code>​};​</code>​

  使用

​<code>​.compose( schedulerstransformer )​</code>​

  弊端:虽然保持了链式编程结构的完整,但是每次调用 .compose( schedulerstransformer ) 都是 new 了一个对象的。所以我们需要再次封装,尽量保证单例的模式。

​<code>​package​</code>​ ​<code>​lib.app.com.myapplication;​</code>​

​<code>​import​</code>​ ​<code>​rx.observable;​</code>​

​<code>​import​</code>​ ​<code>​rx.android.schedulers.androidschedulers;​</code>​

​<code>​import​</code>​ ​<code>​rx.schedulers.schedulers;​</code>​

​<code>​/**​</code>​

​<code>​* created by ${zyj} on 2016/7/1.​</code>​

​<code>​*/​</code>​

​<code>​public​</code>​ ​<code>​class​</code>​ ​<code>​rxutil {​</code>​

​<code>​private​</code>​ ​<code>​final​</code>​ ​<code>​static​</code>​ ​<code>​observable.transformer schedulerstransformer = ​</code>​​<code>​new​</code>​  ​<code>​observable.transformer() {​</code>​

​<code>​public​</code>​ ​<code>​static​</code>​  ​<code>​&lt;t&gt; observable.transformer&lt;t, t&gt; applyschedulers() {​</code>​

​<code>​return​</code>​ ​<code>​(observable.transformer&lt;t, t&gt;) schedulerstransformer;​</code>​

​<code>​.compose( rxutil.&lt;string&gt;applyschedulers() )​</code>​