文章大綱
一、什麼是RxJava
二、為什麼要用RxJava
三、RxJava使用詳解
四、項目源碼下載下傳
Rx(Reactive Extensions)是一個庫,用來處理事件和異步任務,在很多語言上都有實作,RxJava是Rx在Java上的實作。簡單來說,RxJava就是處理異步的一個庫,最基本是基于觀察者模式來實作的。通過Obserable和Observer的機制,實作所謂響應式的程式設計體驗。
比如說一個龐大的項目,一個事件傳遞的整個過程可能要經曆很多方法,方法套方法,每個方法的位置七零八落,一個個方法跳進去看,跳過去跳過來很容易把腦袋弄暈,不夠直覺。但是Rxjava可以把所有邏輯用鍊式加閉包的方式呈現,做了哪些操作,誰在前誰在後非常直覺,邏輯清晰,維護就會非常輕松。就算不是你寫的你也可以很快的了解,你可以把它看作一條河流,整個過程就是對裡面的水流做進行加工。懂了這個特性我們才知道在複雜的邏輯中運用Rxjava是多麼的重要。
假設有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView ,它的作用是顯示多張圖檔,并能使用 addImage(Bitmap) 方法來任意增加顯示的圖檔。現在需要程式将一個給出的目錄數組 File[] folders 中每個目錄下的 png 圖檔都加載出來并顯示在 imageCollectorView 中。需要注意的是,由于讀取圖檔的這一過程較為耗時,需要放在背景執行,而圖檔的顯示則必須在 UI 線程執行。常用的實作方式有多種,我這裡貼出其中一種:
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
而如果使用 RxJava ,實作方式是這樣的:
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
1. RxJava設計模式
RxJava 的異步實作,是通過一種擴充的觀察者模式來實作的。
觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。舉個例子,新聞裡喜聞樂見的警察抓小偷,警察需要在小偷伸手作案的時候實施抓捕。在這個例子裡,警察是觀察者,小偷是被觀察者,警察需要時刻盯着小偷的一舉一動,才能保證不會漏過任何瞬間。程式的觀察者模式和這種真正的『觀察』略有不同,觀察者不需要時刻盯着被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀态),而是采用注冊(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀态,你要在它變化的時候通知我。 Android 開發中一個比較典型的例子是點選監聽器
OnClickListener
。對設定
OnClickListener
來說,
View
是被觀察者,
OnClickListener
是觀察者,二者通過
setOnClickListener()
方法達成訂閱關系。訂閱之後使用者點選按鈕的瞬間,Android Framework 就會将點選事件發送給已經注冊的
OnClickListener
。采取這樣被動的觀察方式,既省去了反複檢索狀态的資源消耗,也能夠得到最高的回報速度。當然,這也得益于我們可以随意定制自己程式中的觀察者和被觀察者,而警察叔叔明顯無法要求小偷『你在作案的時候務必通知我』。
OnClickListener 的模式大緻如下圖:
RxJava 的觀察者模式
RxJava 有四個基本概念:
Observable
(可觀察者,即被觀察者)、
Observer
(觀察者)、
subscribe
(訂閱)、事件。
Observable
和
Observer
通過
subscribe()
方法實作訂閱關系,進而
Observable
可以在需要的時候發出事件來通知
Observer
。
與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件
onNext()
(相當于
onClick()
/
onEvent()
)之外,還定義了兩個特殊的事件:
onCompleted()
onError()
-
: 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的onCompleted()
發出時,需要觸發onNext()
方法作為标志。onCompleted()
-
: 事件隊列異常。在事件處理過程中出異常時,onError()
會被觸發,同時隊列自動終止,不允許再有事件發出。onError()
- 在一個正确運作的事件序列中,
onCompleted()
有且隻有一個,并且是事件序列中的最後一個。需要注意的是,onError()
onCompleted()
二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。onError()
RxJava 的觀察者模式大緻如下圖:
開始接入RxJava之間,添加依賴
dependencies {
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'
}
2. 建立RxJava幾種方式
方式1:簡單建立Rxjava
/**
* 簡單建立Rxjava
*
* Observable是被觀察者,建立後傳入一個OnSubscribe對象,當Observable(觀察者)調用subscribe進行注冊觀察者時,OnSubscribe的call方法會觸發。
ObservableEmitter: Emitter 是發射器的意思,它可以發出三種類型的事件,與之對應的。
Observer有三個回調方法:
onNext:接受到一個事件
onCompleted:接受完事件後調用,隻會調用一次
onError :發生錯誤時調用,并停止接受事件,調用一次
注:onCompleted和onError不會同時調用,隻會調用其中之一
*/
public static void createOne() {
//建立被觀察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("吳");
subscriber.onNext("曉暢");
subscriber.onCompleted();
}
});
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println("Item: " + s);
}
////事件隊列完結,RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為标志。
@Override
public void onCompleted() {
System.out.println("Completed!");
}
////事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。
@Override
public void onError(Throwable e) {
System.out.println("Error!");
}
};
observable.subscribe(subscriber);
}
運作結果如下所示:
方式2:just(T...): 将傳入的參數依次發送出來
public static void createTwo()
{
//相當于
// 将會依次調用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
Observable observable = Observable.just("Hello", "wu", "xiaochang");
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println("Item: " + s);
}
////事件隊列完結,RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為标志。
@Override
public void onCompleted() {
System.out.println("Completed!");
}
////事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。
@Override
public void onError(Throwable e) {
System.out.println("Error!");
}
};
observable.subscribe(subscriber);
}
方式3:将傳入的數組或 Iterable 拆分成具體對象後,依次發送出來
public static void createThree()
{
String[] words = {"Hello", "wu", "xiaochang"};
//相當于
// 将會依次調用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
Observable observable = Observable.from(words);
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println("Item: " + s);
}
////事件隊列完結,RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為标志。
@Override
public void onCompleted() {
System.out.println("Completed!");
}
////事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。
@Override
public void onError(Throwable e) {
System.out.println("Error!");
}
};
observable.subscribe(subscriber);
}
運作結果如下圖所示:
方式4:發送多種類型參數
/**
*發送多種類型參數
*/
public static void createFour()
{
//Just類似于From,但是From會将數組或Iterable的元素具取出然後逐個發射,而Just隻是簡單的原樣發射,将數組或Iterable當做單個資料。
//Just接受一至九個參數,傳回一個按參數清單順序發射這些資料的Observable
Observable justObservable = Observable.just(1, "someThing", false, 3.256f, "NewYork");
justObservable.subscribe(new Subscriber() {
@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
});
}
方式5:自定義Subscriber
/**
* 自定義Subscriber
*/
public static void createFive()
{
Observable observable = Observable.just("Hello", "Hi", "Aloha");
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
System.out.println(s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
System.out.println("completed");
}
};
// 自動建立 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
}
3. 建立觀察者方法
建立方式如下:
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
//建立方式2
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d("MainActivity", "Item: " + s);
}
@Override
public void onCompleted() {
Log.d("MainActivity", "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d("MainActivity", "Error!");
}
};
實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。是以如果你隻想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的差別對于使用者來說主要有兩點:
onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用,可以用于做一些準備工作,例如資料的清零或重置。這是一個可選方法,預設情況下它的實作為空。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不适用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的文中看到。
unsubscribe(): 這是 Subscriber 所實作的另一個接口 Subscription 的方法,用于取消訂閱。在這個方法被調用後,Subscriber 将不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀态。 unsubscribe() 這個方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,将有記憶體洩露的風險。是以最好保持一個原則:要在不再使用的時候盡快在合适的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免記憶體洩露的發生。
3. 線程Scheduler (排程器)
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生産事件;在哪個線程生産事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (排程器)。
在RxJava 中,Scheduler ——排程器,相當于線程控制器,RxJava 通過它來指定每一段代碼應該運作在什麼樣的線程。RxJava 已經内置了幾個 Scheduler ,它們已經适合大多數的使用場景:
Schedulers.immediate(): 直接在目前線程運作,相當于不指定線程。這是預設的 Scheduler。
Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。
Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程。
Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主線程運作。
有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件産生的線程。 * observeOn(): 指定 Subscriber 所運作在的線程。或者叫做事件消費的線程。
public class RxJavaScheduler {
public static void showScheduler()
{
Observable.just(1, 2, 3, 4)
// .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
// .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
System.out.println("number:" + number);
}
});
}
public static void main(String[] args) {
showScheduler();
}
}
連結:
https://pan.baidu.com/s/1Na7DH_N2rf-pXEadmQxgUQ密碼:xvr2
Android前沿技術
關于RXJava的全部學習内容,我們這邊都有系統的知識體系以及進階視訊資料,有需要的朋友可以加群免費領取安卓進階視訊教程,源碼,面試資料,群内有大牛一起交流讨論技術;818520403
(包括自定義控件、NDK、架構設計、混合式開發工程師(React native,Weex)、性能優化、完整商業項目開發等)
Android進階進階視訊教程