天天看點

RxJava 和 RxAndroid 五(線程排程)

對rxJava不了解的同學可以先看

RxJava 和 RxAndroid 一 (基礎)

RxJava 和 RxAndroid 二(操作符的使用)

RxJava 和 RxAndroid 三(生命周期控制和記憶體優化)

RxJava 和 RxAndroid 四(RxBinding的使用)

本文将有幾個例子說明,rxjava線程排程的正确使用姿勢。

例1

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;
      

  結果

/rx_call: main           -- 主線程

/rx_map: main        --  主線程

/rx_subscribe: main   -- 主線程

例2

new Thread(new Runnable() {
            @Override
            public void run() {
                Logger.v( "rx_newThread" , Thread.currentThread().getName()  );
                rx();
            }
        }).start();

 void rx(){
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

    }
      

 

      結果

/rx_newThread: Thread-564   -- 子線程

/rx_call: Thread-564              -- 子線程

/rx_map: Thread-564            -- 子線程 

/rx_subscribe: Thread-564    -- 子線程

  • 通過例1和例2,說明,Rxjava預設運作在目前線程中。如果目前線程是子線程,則rxjava運作在子線程;同樣,目前線程是主線程,則rxjava運作在主線程

例3

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;
      

/rx_call: RxCachedThreadScheduler-1    --io線程

/rx_map: main                                     --主線程

/rx_subscribe: main                              --主線程

例4

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;       

/rx_call: RxCachedThreadScheduler-1     --io線程

/rx_map: RxCachedThreadScheduler-1   --io線程

  • 通過例3、例4 可以看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 寫的位置不一樣,造成的結果也不一樣。從例4中可以看出 map() 操作符預設運作在事件産生的線程之中。事件消費隻是在 subscribe() 裡面。
  • 對于 create() , just() , from()   等                 --- 事件産生   

               map() , flapMap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消費

  •   事件産生:預設運作在目前線程,可以由 subscribeOn()  自定義線程

         事件加工:預設跟事件産生的線程保持一緻, 可以由 observeOn() 自定義線程

       事件消費:預設運作在目前線程,可以有observeOn() 自定義

例5  多次切換線程

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .observeOn( Schedulers.newThread() )    //新線程

                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .observeOn( Schedulers.io() )      //io線程

                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        Logger.v( "rx_filter" , Thread.currentThread().getName()  );
                        return s != null ;
                    }
                })

                .subscribeOn(Schedulers.io())     //定義事件産生線程:io線程
                .observeOn(AndroidSchedulers.mainThread())     //事件消費線程:主線程

                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;      

/rx_call: RxCachedThreadScheduler-1           -- io 線程

/rx_map: RxNewThreadScheduler-1             -- new出來的線程

/rx_filter: RxCachedThreadScheduler-2        -- io線程

/rx_subscribe: main                                   -- 主線程

例6:隻規定了事件産生的線程

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .subscribeOn(Schedulers.io())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
                }) ;
      

/rx--create: RxCachedThreadScheduler-4                      // io 線程

/rx--subscribe: RxCachedThreadScheduler-4                 // io 線程

例:7:隻規定事件消費線程

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .observeOn( Schedulers.newThread() )
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
                }) ;
      

/rx--create: main                                           -- 主線程

/rx--subscribe: RxNewThreadScheduler-1        --  new 出來的子線程 

    從例6可以看出,如果隻規定了事件産生的線程,那麼事件消費線程将跟随事件産生線程。

    從例7可以看出,如果隻規定了事件消費的線程,那麼事件産生的線程和 目前線程保持一緻。

例8:線程排程封裝

 在Android 常常有這樣的場景,背景處理處理資料,前台展示資料。

一般的用法:

Observable
                .just( "123" )
                .subscribeOn( Schedulers.io())
                .observeOn( AndroidSchedulers.mainThread() )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;
      

  但是項目中這種場景有很多,是以我們就想能不能把這種場景的排程方式封裝起來,友善調用。

簡單的封裝

public Observable apply( Observable observable ){
       return observable.subscribeOn( Schedulers.io() )
                .observeOn( AndroidSchedulers.mainThread() ) ;
    }
      

使用

apply( Observable.just( "123" ) )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {

                    }
                }) ;
      

弊端:雖然上面的這種封裝可以做到線程排程的目的,但是它破壞了鍊式程式設計的結構,是程式設計風格變得不優雅。

改進:Transformers 的使用(就是轉化器的意思,把一種類型的Observable轉換成另一種類型的Observable )

改進後的封裝

Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
      

  使用

Observable
                .just( "123" )
                .compose( schedulersTransformer )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;
      

  弊端:雖然保持了鍊式程式設計結構的完整,但是每次調用 .compose( schedulersTransformer ) 都是 new 了一個對象的。是以我們需要再次封裝,盡量保證單例的模式。

package lib.app.com.myapplication;

import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;

/**
 * Created by ${zyj} on 2016/7/1.
 */
public class RxUtil {

    private final static Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };

   public static  <T> Observable.Transformer<T, T> applySchedulers() {
        return (Observable.Transformer<T, T>) schedulersTransformer;
    }

}
      
Observable
                .just( "123" )
                .compose( RxUtil.<String>applySchedulers() )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;