天天看點

Rxjava的學習筆記Rxjava的學習筆記

Rxjava的學習筆記

RxJava到底是什麼?

異步(觀察者模式的本質是同步的但Rxjava可以通過線程間的切換和通信可以實作異步)

RxJava 在 GitHub 首頁上的自我介紹是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程式的庫)。這就是 RxJava ,概括得非常精準。

Rxjava好在哪裡?

簡潔,異步操作很關鍵的一點是程式的簡潔性,因為在排程過程比較複雜的情況下,異步代碼經常會既難寫也難被讀懂。 Android 創造的 AsyncTask 和Handler ,其實都是為了讓異步代碼更加簡潔。

舉個例子

假設有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView ,它的作用是顯示多張圖檔,并能使用 addImage(Bitmap) 方法來任意增加顯示的圖檔。現在需要程式将一個給出的目錄數組 File[] folders 中每個目錄下的 png 圖檔都加載出來并顯示在 imageCollectorView 中。需要注意的是,由于讀取圖檔的這一過程較為耗時,需要放在背景執行,而圖檔的顯示則必須在 UI 線程執行。常用的實作方式有多種,我這裡貼出其中一種:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();
           

而如果使用 RxJava ,實作方式是這樣的:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });
           

API 介紹和原理簡析

  1. 概念:擴充的觀察者模式

    RxJava 的異步實作,是通過一種擴充的觀察者模式來實作的。

觀察者模式

觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。舉個例子,新聞裡喜聞樂見的警察抓小偷,警察需要在小偷伸手作案的時候實施抓捕。在這個例子裡,警察是觀察者,小偷是被觀察者,警察需要時刻盯着小偷的一舉一動,才能保證不會漏過任何瞬間。程式的觀察者模式和這種真正的『觀察』略有不同,觀察者不需要時刻盯着被觀察者(例如 A 不需要每過 2MS 就檢查一次 B 的狀态),而是采用注冊(REGISTER)或者稱為訂閱(SUBSCRIBE)的方式,告訴被觀察者:我需要你的某某狀态,你要在它變化的時候通知我。 ANDROID 開發中一個比較典型的例子是點選監聽器 ONCLICKLISTENER 。對設定 ONCLICKLISTENER 來說, VIEW 是被觀察者, ONCLICKLISTENER 是觀察者,二者通過 SETONCLICKLISTENER() 方法達成訂閱關系。訂閱之後使用者點選按鈕的瞬間,ANDROID FRAMEWORK 就會将點選事件發送給已經注冊的 ONCLICKLISTENER 。采取這樣被動的觀察方式,既省去了反複檢索狀态的資源消耗,也能夠得到最高的回報速度。當然,這也得益于我們可以随意定制自己程式中的觀察者和被觀察者,而警察叔叔明顯無法要求小偷『你在作案的時候務必通知我』。

RxJava 的觀察者模式

RxJava 有四個基本概念:

Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實作訂閱關系,進而 Observable 可以在需要的時候發出事件來通知 Observer。

與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件 onNext() (相當于 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。

  • onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為标志。
  • onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。

    在一個正确運作的事件序列中, onCompleted() 和 onError() 有且隻有一個,并且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。

RxJava 的觀察者模式大緻如下圖:

Rxjava的學習筆記Rxjava的學習筆記
  1. 基本實作

基于以上的概念, RxJava 的基本實作主要有三點:

1.observer 是觀察者,是那個幹活的人 實作observer 的抽象類 Subscriber

1) 建立 Observer

Observer 即觀察者,它決定事件觸發的時候将有怎樣的行為。 RxJava 中的 Observer 接口的實作方式:

Observer observer = new Observer() {

@Override

public void onNext(String s) {

Log.d(tag, “Item: ” + s);

}

@Override
public void onCompleted() {
    Log.d(tag, "Completed!");
}

@Override
public void onError(Throwable e) {
    Log.d(tag, "Error!");
}
           

};

Subscriber 對 Observer 接口進行了一些擴充,但他們的基本使用方式是完全一樣的:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
           

不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。是以如果你隻想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的差別對于使用者來說主要有兩點:

  • onStart():

    這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用,可以用于做一些準備工作,例如資料的清零或重置。這是一個可選方法,預設情況下它的實作為空。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不适用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的文中看到。

  • unsubscribe():

    這是 Subscriber 所實作的另一個接口 Subscription 的方法,用于取消訂閱。在這個方法被調用後,Subscriber 将不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀态。 unsubscribe() 這個方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,将有記憶體洩露的風險。是以最好保持一個原則:要在不再使用的時候盡快在合适的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免記憶體洩露的發生。

2.Observable 被觀察者

建立 Observable

Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,并為它定義事件觸發規則:

Observable observable = Observable.create(new   Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});
           

3.Subscribe (訂閱)

建立了 Observable 和 Observer 之後,再用 subscribe() 方法将它們聯結起來,整條鍊子就可以工作了。代碼形式很簡單:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
           

有人可能會注意到, subscribe() 這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣颠倒了對象關系。這讓人讀起來有點别扭,不過如果把 API 設計成 observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思維邏輯,但對流式 API 的設計就造成影響了,比較起來明顯是得不償失的。

Observable.subscribe(Subscriber) 的内部實作是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼,而是将源碼中與性能、相容性、擴充性有關的代碼剔除後的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載下傳。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
           

可以看到,subscriber() 做了3件事:

  • 調用 Subscriber.onStart() 。這個方法在前面已經介紹過,是一個可選的準備方法。
  • 調用 Observable 中的 OnSubscribe.call(Subscriber) 。在這裡,事件發送的邏輯開始運作。從這也可以看出,在 RxJava 中, Observable 并不是在建立的時候就立即開始發送事件,而是在它被訂閱的時候,即當 subscribe() 方法執行的時候。
  • 将傳入的 Subscriber 作為 Subscription 傳回。這是為了友善 unsubscribe().

整個過程中對象間的關系如下圖:

Rxjava的學習筆記Rxjava的學習筆記

線程控制 —— Scheduler (一)

在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生産事件;在哪個線程生産事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (排程器)。

1) Scheduler 的 API (一)

在RxJava 中,Scheduler ——排程器,相當于線程控制器,RxJava 通過它來指定每一段代碼應該運作在什麼樣的線程。RxJava 已經内置了幾個 Scheduler ,它們已經适合大多數的使用場景:

  • Schedulers.immediate(): 直接在目前線程運作,相當于不指定線程。這是預設的Scheduler。
  • Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。
  • Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程。
  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
  • 另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主線程運作。

有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件産生的線程。 * observeOn(): 指定 Subscriber 所運作在的線程。或者叫做事件消費的線程。

文字叙述總歸難了解,上代碼:

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });
           

上面這段代碼中,由于 subscribeOn(Schedulers.io()) 的指定,被建立的事件的内容 1、2、3、4 将會在 IO 線程發出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,是以 subscriber 數字的列印将發生在主線程 。事實上,這種在 subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它适用于多數的 『背景線程取資料,主線程顯示』的程式政策。

變換

RxJava 提供了對事件序列進行變換的支援,這是它的核心功能之一,也是大多數人說『RxJava 真是太好用了』的最大原因。所謂變換,就是将事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。概念說着總是模糊難懂的,來看 API。

1) API

首先看一個 map() 的例子:

Observable.just("images/logo.png") // 輸入類型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 參數類型 String
            return getBitmapFromPath(filePath); // 傳回類型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 參數類型 Bitmap
            showBitmap(bitmap);
        }
    });
           

這裡出現了一個叫做 Func1 的類。它和 Action1 非常相似,也是 RxJava 的一個接口,用于包裝含有一個參數的方法。 Func1 和 Action 的差別在于, Func1 包裝的是有傳回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用于不同參數個數的方法。FuncX 和 ActionX 的差別在 FuncX 包裝的是有傳回值的方法。

可以看到,map() 方法将參數中的 String 對象轉換成一個 Bitmap 對象後傳回,而在經過 map() 方法後,事件的參數類型也由 String 轉為了 Bitmap。這種直接變換對象并傳回的,是最常見的也最容易了解的變換。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件對象,還可以針對整個事件隊列,這使得 RxJava 變得非常靈活。我列舉幾個常用的變換:

  • map(): 事件對象的直接變換,具體功能上面已經介紹過。它是 RxJava 最常用的變換。 map() 的示意圖:
    Rxjava的學習筆記Rxjava的學習筆記
  • flatMap(): 這是一個很有用但非常難了解的變換,是以我決定花多些篇幅來介紹它。 首先假設這麼一種需求:假設有一個資料結構『學生』,現在需要列印出一組學生的名字。實作方式很簡單:
    Student[] students = ...;
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String name) {
            Log.d(tag, name);
        }
        ...
    };
    Observable.from(students)
        .map(new Func1<Student, String>() {
            @Override
            public String call(Student student) {
                return student.getName();
            }
        })
    .subscribe(subscriber);
               
    參考文獻:http://gank.io/post/560e15be2dca930e00da1083