天天看點

開源庫之RxJava & RxAndroid基本使用

Andrid Studio中引用RxAndroid

compile 'io.reactivex:rxandroid:1.2.0'      
Hello World

RxJava最核心的兩個東西是Observables(被觀察者,事件源)和Subscribers(觀察者)。Observables發出一系列事件,Subscribers處理這些事件(例如:觸摸事件,web接口調用傳回的資料...)。一個Observable可以發出零個或者多個事件,直到結束或者出錯。每發出一個事件,就會調用它的Subscriber的onNext方法,最後調用Subscriber.onError()或者Subscriber.onError()結束。

Rxjava的看起來很想設計模式中的觀察者模式,但是有一點明顯不同,那就是如果一個Observerble沒有任何的的Subscriber,那麼這個Observable是不會發出任何事件的。

建立一個Observable對象很簡單,直接調用Observable.create即可。

Observable<String> myObservable = Observable.create(//建立事件源
        new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> sub) {
                sub.onNext("Hello, world!");
                sub.onCompleted();//該方法必須主動調用
            }
        }
);      

這裡定義的Observable對象myObservable僅僅發出一個"Hello World"字元串,然後就結束了。

接着我們建立一個Subscriber來處理Observable對象發出的字元串。

Subscriber<String> mySubscriber = new Subscriber<String>() {//建立觀察者
    @Override
    public void onNext(String s) {
        lg.e("RxJava 測試結果:" + s);
    }

    @Override
    public void onCompleted() {
        lg.e("RxJava onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        lg.e("RxJava onError");
    }
};      

這裡subscriber僅僅就是列印observable發出的字元串。

Subscriber實作了Observer接口,而在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 。是以如果你隻想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的差別對于使用者來說主要有兩點:

    1.onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始而事件還未發送之前被調用,可以用于做一些準備工作,例如資料的清零或重置,預設空實作。需要注意的是,onStart() 總是在 subscribe 所發生的線程被調用,并不能保證在主線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的文中看到。 

    2.unsubscribe(): 這是 Subscriber 所實作的另一個接口 Subscription 的方法,用于取消訂閱。調用該方法将使得Subscriber 不再接收事件。一般在調用前使用 isUnsubscribed() 先判斷一下狀态。 unsubscribe()方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,将有記憶體洩露的風險。是以最好保持一個原則:要在不再使用的時候盡快在合适的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免記憶體洩露的發生。

通過subscribe函數就可以将我們定義的myObservable對象和mySubscriber對象關聯起來,這樣就完成了subscriber對observable的訂閱。

myObservable.subscribe(mySubscriber);      

一旦mySubscriber訂閱了myObservable,myObservable就是調用mySubscriber對象的onNext和onComplete|onError方法。

變換-> Map &flatMap操作符

RxJava 提供了對事件序列進行變換的支援

,這是它的核心功能之一,也是大多數人說『RxJava 真是太好用了』的最大原因。所謂變換,就是将事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。

使用場景:當需要修改接收的内容時,

a.可以在Observable中直接修改字元串,局限在于

    1.若Observable是第三方提供的,可能不允許更改

    2.此時修改屬于批量修改,凡是訂閱此Observable的資料均會改變

b.在訂閱者端處理:

    1.與希望Subscribers越輕量越好的原則沖突

    2.根據響應式函數程式設計的概念,Subscribers更應該做的事情是"響應",響應Observable發出的事件,而不是去修改

基于map針對對象進行變換

線程控制 -> Scheduler

 在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在調用 subscribe()的線程生産事件;在生産事件的線程消費事件。如果需要切換線程,就需要用到 Scheduler(排程器)。

1) Scheduler 的 API (一)

在RxJava 中,Scheduler ——排程器,相當于線程控制器,RxJava 通過它來指定每一段代碼應該運作在什麼樣的線程。RxJava 已經内置了幾個 Scheduler ,它們已經适合大多數的使用場景:

    a.Schedulers.immediate(): 直接在目前線程運作(與),相當于不指定線程。這是預設的 Scheduler。

    b.Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。

    c.Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread()更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程。

    d.Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

    另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主線程運作。

接下來就可以使用subscribeOn() 和 observeOn() 兩個方法對線程進行控制了。

subscribeOn(): 即事件産生的線程。

observeOn(): 指定Subscriber所在的線程(map、subscribe等方法的執行域),即事件消費的線程。

示例如下:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {//這裡所屬線程受subscribeOn控制
        lg.e("isFunctionInMainThread-call:" + UtilsThread.isFunctionInMainThread());
        subscriber.onNext("***");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.io()).map(new Func1<String, String>() {//該map操作因為沒調用observeOn指定線程,預設使用Observable建立所屬線程(即create方法所屬)
    @Override
    public String call(String s) {
        lg.e("isFunctionInMainThread01:" + UtilsThread.isFunctionInMainThread());
        return s;
    }
}).observeOn(Schedulers.io()).map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        lg.e("isFunctionInMainThread02:" + UtilsThread.isFunctionInMainThread());
        return s;
    }
}).observeOn(AndroidSchedulers.mainThread()).map(new Func1<String, String>() {//這裡的map操作在主線程
    @Override
    public String call(String s) {
        lg.e("isFunctionInMainThread03:" + UtilsThread.isFunctionInMainThread());
        return s;
    }
}).observeOn(Schedulers.io()).subscribe(new Subscriber<String>() {//這裡表示subscribe在io線程
    @Override
    public void onCompleted() {
        lg.e("isFunctionInMainThread-subscribe:" + UtilsThread.isFunctionInMainThread());
    }

    @Override
    public void onError(Throwable e) {
        lg.e("onError");
    }

    @Override
    public void onNext(String s) {
        lg.e("onNext with " + s, "isFunctionInMainThread-subscribe:" + UtilsThread.isFunctionInMainThread());
    }
});      

運作結果如下所示,

思考:

flatmap[鋪平]相關介紹

來自為知筆記(Wiz)