天天看點

RxJava 和 RxAndroid 五(線程排程)

本文将有幾個例子說明,rxjava線程排程的正确使用姿勢。

例1

<col>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

​<code>​observable​</code>​

​<code>​.create(​</code>​​<code>​new​</code>​ ​<code>​observable.onsubscribe&lt;string&gt;() {​</code>​

​<code>​@override​</code>​

​<code>​public​</code>​ ​<code>​void​</code>​ ​<code>​call(subscriber&lt;? ​</code>​​<code>​super​</code>​ ​<code>​string&gt; subscriber) {​</code>​

​<code>​logger.v( ​</code>​​<code>​"rx_call"​</code>​ ​<code>​, thread.currentthread().getname()  );​</code>​

​<code>​subscriber.onnext( ​</code>​​<code>​"dd"​</code>​​<code>​);​</code>​

​<code>​subscriber.oncompleted();​</code>​

​<code>​}​</code>​

​<code>​})​</code>​

​<code>​.map(​</code>​​<code>​new​</code>​ ​<code>​func1&lt;string, string &gt;() {​</code>​

​<code>​public​</code>​ ​<code>​string call(string s) {​</code>​

​<code>​logger.v( ​</code>​​<code>​"rx_map"​</code>​ ​<code>​, thread.currentthread().getname()  );​</code>​

​<code>​return​</code>​ ​<code>​s + ​</code>​​<code>​"88"​</code>​​<code>​;​</code>​

​<code>​.subscribe(​</code>​​<code>​new​</code>​ ​<code>​action1&lt;string&gt;() {​</code>​

​<code>​public​</code>​ ​<code>​void​</code>​ ​<code>​call(string s) {​</code>​

​<code>​logger.v( ​</code>​​<code>​"rx_subscribe"​</code>​ ​<code>​, thread.currentthread().getname()  );​</code>​

​<code>​}) ;​</code>​

  結果

/rx_call: main           -- 主線程 /rx_map: main        --  主線程 /rx_subscribe: main   -- 主線程

例2

24

25

26

27

28

29

30

31

32

33

34

​<code>​new​</code>​ ​<code>​thread(​</code>​​<code>​new​</code>​ ​<code>​runnable() {​</code>​

​<code>​public​</code>​ ​<code>​void​</code>​ ​<code>​run() {​</code>​

​<code>​logger.v( ​</code>​​<code>​"rx_newthread"​</code>​ ​<code>​, thread.currentthread().getname()  );​</code>​

​<code>​rx();​</code>​

​<code>​}).start();​</code>​

​<code>​void​</code>​ ​<code>​rx(){​</code>​

 

      結果

/rx_newthread: thread-564   -- 子線程 /rx_call: thread-564              -- 子線程 /rx_map: thread-564            -- 子線程  /rx_subscribe: thread-564    -- 子線程

通過例1和例2,說明,rxjava預設運作在目前線程中。如果目前線程是子線程,則rxjava運作在子線程;同樣,目前線程是主線程,則rxjava運作在主線程

例3

​<code>​.subscribeon(schedulers.io())​</code>​

​<code>​.observeon(androidschedulers.mainthread())​</code>​

/rx_call: rxcachedthreadscheduler-1    --io線程 /rx_map: main                                     --主線程 /rx_subscribe: main                              --主線程

例4

​<code>​}) ; ​</code>​

/rx_call: rxcachedthreadscheduler-1     --io線程 /rx_map: rxcachedthreadscheduler-1   --io線程

通過例3、例4 可以看出  .subscribeon(schedulers.io())

 和 .observeon(androidschedulers.mainthread()) 寫的位置不一樣,造成的結果也不一樣。從例4中可以看出

map() 操作符預設運作在事件産生的線程之中。事件消費隻是在 subscribe() 裡面。

對于 create() , just() , from()   等                 --- 事件産生   

               map() , flapmap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消費

  事件産生:預設運作在目前線程,可以由 subscribeon()  自定義線程

         事件加工:預設跟事件産生的線程保持一緻, 可以由 observeon() 自定義線程

       事件消費:預設運作在目前線程,可以有observeon() 自定義

例5  多次切換線程

35

36

37

38

39

40

​<code>​.observeon( schedulers.newthread() )    ​</code>​​<code>​//新線程​</code>​

​<code>​.observeon( schedulers.io() )      ​</code>​​<code>​//io線程​</code>​

​<code>​.filter(​</code>​​<code>​new​</code>​ ​<code>​func1&lt;string, boolean&gt;() {​</code>​

​<code>​public​</code>​ ​<code>​boolean call(string s) {​</code>​

​<code>​logger.v( ​</code>​​<code>​"rx_filter"​</code>​ ​<code>​, thread.currentthread().getname()  );​</code>​

​<code>​return​</code>​ ​<code>​s != ​</code>​​<code>​null​</code>​ ​<code>​;​</code>​

​<code>​.subscribeon(schedulers.io())     ​</code>​​<code>​//定義事件産生線程:io線程​</code>​

​<code>​.observeon(androidschedulers.mainthread())     ​</code>​​<code>​//事件消費線程:主線程​</code>​

/rx_call: rxcachedthreadscheduler-1           -- io 線程 /rx_map: rxnewthreadscheduler-1             -- new出來的線程 /rx_filter: rxcachedthreadscheduler-2        -- io線程 /rx_subscribe: main                                   -- 主線程

例6:隻規定了事件産生的線程

​<code>​log.v( ​</code>​​<code>​"rx--create "​</code>​ ​<code>​, thread.currentthread().getname() ) ;​</code>​

​<code>​subscriber.onnext( ​</code>​​<code>​"dd"​</code>​ ​<code>​) ;​</code>​

​<code>​log.v( ​</code>​​<code>​"rx--subscribe "​</code>​ ​<code>​, thread.currentthread().getname() ) ;​</code>​

/rx--create: rxcachedthreadscheduler-4                      // io 線程 /rx--subscribe: rxcachedthreadscheduler-4                 // io 線程

例:7:隻規定事件消費線程

​<code>​.observeon( schedulers.newthread() )​</code>​

/rx--create: main                                           -- 主線程 /rx--subscribe: rxnewthreadscheduler-1        --  new 出來的子線程 

    從例6可以看出,如果隻規定了事件産生的線程,那麼事件消費線程将跟随事件産生線程。

    從例7可以看出,如果隻規定了事件消費的線程,那麼事件産生的線程和 目前線程保持一緻。

例8:線程排程封裝

 在android 常常有這樣的場景,背景處理處理資料,前台展示資料。

一般的用法:

​<code>​.just( ​</code>​​<code>​"123"​</code>​ ​<code>​)​</code>​

​<code>​.subscribeon( schedulers.io())​</code>​

​<code>​.observeon( androidschedulers.mainthread() )​</code>​

​<code>​.subscribe(​</code>​​<code>​new​</code>​ ​<code>​action1() {​</code>​

​<code>​public​</code>​ ​<code>​void​</code>​ ​<code>​call(object o) {​</code>​

  但是項目中這種場景有很多,是以我們就想能不能把這種場景的排程方式封裝起來,友善調用。

簡單的封裝

​<code>​public​</code>​ ​<code>​observable apply( observable observable ){​</code>​

​<code>​return​</code>​ ​<code>​observable.subscribeon( schedulers.io() )​</code>​

​<code>​.observeon( androidschedulers.mainthread() ) ;​</code>​

使用

​<code>​apply( observable.just( ​</code>​​<code>​"123"​</code>​ ​<code>​) )​</code>​

弊端:雖然上面的這種封裝可以做到線程排程的目的,但是它破壞了鍊式程式設計的結構,是程式設計風格變得不優雅。

改進:transformers 的使用(就是轉化器的意思,把一種類型的observable轉換成另一種類型的observable )

改進後的封裝

​<code>​observable.transformer schedulerstransformer = ​</code>​​<code>​new​</code>​  ​<code>​observable.transformer() {​</code>​

​<code>​@override​</code>​ ​<code>​public​</code>​ ​<code>​object call(object observable) {​</code>​

​<code>​return​</code>​ ​<code>​((observable)  observable).subscribeon(schedulers.newthread())​</code>​

​<code>​.observeon(androidschedulers.mainthread());​</code>​

​<code>​};​</code>​

  使用

​<code>​.compose( schedulerstransformer )​</code>​

  弊端:雖然保持了鍊式程式設計結構的完整,但是每次調用 .compose( schedulerstransformer ) 都是 new 了一個對象的。是以我們需要再次封裝,盡量保證單例的模式。

​<code>​package​</code>​ ​<code>​lib.app.com.myapplication;​</code>​

​<code>​import​</code>​ ​<code>​rx.observable;​</code>​

​<code>​import​</code>​ ​<code>​rx.android.schedulers.androidschedulers;​</code>​

​<code>​import​</code>​ ​<code>​rx.schedulers.schedulers;​</code>​

​<code>​/**​</code>​

​<code>​* created by ${zyj} on 2016/7/1.​</code>​

​<code>​*/​</code>​

​<code>​public​</code>​ ​<code>​class​</code>​ ​<code>​rxutil {​</code>​

​<code>​private​</code>​ ​<code>​final​</code>​ ​<code>​static​</code>​ ​<code>​observable.transformer schedulerstransformer = ​</code>​​<code>​new​</code>​  ​<code>​observable.transformer() {​</code>​

​<code>​public​</code>​ ​<code>​static​</code>​  ​<code>​&lt;t&gt; observable.transformer&lt;t, t&gt; applyschedulers() {​</code>​

​<code>​return​</code>​ ​<code>​(observable.transformer&lt;t, t&gt;) schedulerstransformer;​</code>​

​<code>​.compose( rxutil.&lt;string&gt;applyschedulers() )​</code>​