RxJava 到底是什麼
RxJava 是一個響應式程式設計架構,采用觀察者設計模式。RxJava 在 GitHub 首頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程式的庫)。其實, RxJava 的本質可以壓縮為異步這一個詞。說到根上,它就是一個實作異步操作的庫。它的優點就是在于讓異步的代碼變得非常簡潔,完全可以代替Android的AsyncTask和Handler來實作異步。
觀察者模式
我們之前說到RxJava的異步實作是采用觀察者模式實作的,是以先來了解一下觀察者模式。
定義對象間的一種一對多的依賴關系,當一個對象的狀态發送改變時,是以依賴于它的 對象都得到通知并被自動更新 。觀察者模式有個重要的作用就是解耦,将被觀察者和觀察者解耦,降低依賴,隻依賴于Observer和Observable的抽象。比如A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。觀察者模式并不需要時刻盯着被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀态),而是采用訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀态,你要在它變化的時候通知我。Android 開發中一個比較典型的例子是點選監聽器 OnClickListener 。對設定 OnClickListener 來說, View 是被觀察者, OnClickListener 是觀察者,二者通過 setOnClickListener() 方法達成訂閱關系。訂閱之後使用者點選按鈕的瞬間,Android Framework 就會将點選事件發送給已經注冊的 OnClickListener 。采取這樣被動的觀察方式,既省去了反複檢索狀态的資源消耗,也能夠得到最高的回報速度。
ListView 的觀察者模式
我們熟悉的觀察者模式應該就是ListView的adapter中的notifyDataSetChanged方法,說說大緻的實作過程,當然了解的可以直接跳過。當我們調用adapter的notifyDataSetChanged方法時,實際上就是調用BaseAdapter中的DataSetObservable的notifyChanged方法,DataSetObservable也就是一個被觀察者,在這個被觀察者的方法中會周遊是以的觀察者并且調用它們的onChanged方法,進而告知了觀察者發生了變化。是以現在有了被觀察者,那麼觀察者又是在哪裡呢?看源碼可以發現在調用setAdapter的方法中AdapterDataSetObserver也就是我們的觀察者,然後把這個觀察者注冊到了Adapter中,實際上也就是注冊到了DataSetObservable中。是以我們調用notifyDataSetChanged方法就會調用觀察者的onChanged方法,這個onChanged方法是在AdapterDataSetObserver中實作的,在這裡就是通知ListView重新整理重新布局界面,進而實作了觀察者模式!!
RxJava 的觀察者模式
RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實作訂閱關系,進而 Observable 可以在需要的時候發出事件來通知 Observer。
與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件 onNext() (相當于 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。
onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為标志。
onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。
在一個正确運作的事件序列中, onCompleted() 和 onError() 有且隻有一個,并且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。
Gradle配置
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
Observer
Observer 即觀察者,它決定事件觸發的時候将有怎樣的行為,就比如之前說到點選之後會産生怎樣的行為。
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: ");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};
Subscriber
這是一個實作了Observer接口和Subscription接口的抽象類,是以它也是一個觀察者,也就是說也可以使用方式和Observer一樣,當然也有它的不同之處。Subscriber中新增了onStart方法,這個方法是在事件被訂閱但是還沒有釋出之前調用,在這裡面可以做一些初始化的操作,當然如果初始化操作設計一些UI操作需要在主線程中那麼可能就不是那麼适用,因為它是在subscribe 所發生的線程中被調用,這個線程不一定就是主線程。
Subscriber中還增加了一個unsubscribe方法,這個方法是用來取消訂閱的,可以避免記憶體洩漏,在調用前可以使用isUnsubscribed方法來判斷一下。
實質上Observer在訂閱的過程中也會被轉化為一個Subscriber,是以它們的使用方法是基本相似的。
Observable
Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。下面看看Observable的建立
rx.Observable observable = rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("你好啊1");
subscriber.onNext("你好啊2");
subscriber.onNext("你好啊3");
subscriber.onCompleted();
}
});
這裡通過 create()方法來建立Observable對象,create方法傳入一個OnSubscribe對象。當Observable被訂閱的時候就會調用這個OnSubscribe對象裡面的call方法,在這個方法中參數得到了一個subscriber觀察者,這裡就可以調用觀察者的方法,于是将會觸發觀察者對應的行為。實作了觀察者模式,實作了被觀察者向觀察者的事件傳遞。
當然建立Observable的方式也不隻有這一種,create是最基本的一種,RxJava還提供了一些快捷的建立方式:
Observable.just()
Observable<String> observable = Observable.just("hi1", "hi2", "hi3");
這樣就如上面一樣會依次調用subscriber.onNext("hi1“),subscriber.onNext("hi2“),subscriber.onNext("hi3“),subscriber.onCompleted()。
Observable.from()
String[] strings = {"難過", "快樂", "微笑"};
Observable<String> observable = Observable.from(strings);
和上面的調用是一樣的,不過這裡是傳遞的一個數組
Subscribe (訂閱)
當我們寫好了Observable和Observer之後,還需要一個東西把它們關聯起來,也就是訂閱
observable.subscribe(observer);
好了,這樣寫就可以完整的工作運作了,subscribe()方法其實主要做了以下幾件事
1.調用subscriber的onStart方法,之前說過Observer都會被轉化為Subscriber。
2.調用OnSubscribe的call()方法,OnSubscribe是在建立Observable傳入的對象
3.如果發生異常調用Subscriptions的unsubscribed()取消訂閱
4.最後把傳入的subscriber以Subscriptions傳回,以便用于取消訂閱
其實在調用subscribe方法時你可以發現這裡接收的參數不隻是Observer和Subscriber,還有以下三個方法
subscribe(final Action1<? super T> onNext)
subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete)
是以我們可以采用Action1來定義onNext和onError,用Action0定義onComplete,當然也可以定義不完整的回調,不需要onError和onComplete。可以以以下方式調用
Action1<String> onNext = new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "onNext");
}
};
Action1<Throwable> onError = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.d(TAG, "onError");
}
};
Action0 onComplete = new Action0() {
@Override
public void call() {
Log.d(TAG, "onComplete");
}
};
Observable.just("hello", "hi").subscribe(onNext);
Observable.just("hello", "hi").subscribe(onNext, onError);
Observable.just("hello", "hi").subscribe(onNext, onError, onComplete);
其中Action0和Action1是RxJava的兩個接口,都隻是包含一個call方法,Action0是不帶參數和傳回值的,Action1是帶一個參數的。
Scheduler線程控制
之前代碼實作出來的都是同步觀察者,然而異步才是RxJava的關鍵,是以我們需要引入Scheduler,Scheduler用于來切換線程。在預設情況下,是線程不變的,在哪個線程調用subscribe(),那麼就在哪個線程生産事件,在哪個線程生産事件那麼就在哪個線程消費事件。首先看看下面幾個常用的API
Schedulers.immediate()直接在目前線程運作,相當于不指定線程。這是預設的 Scheduler。
Schedulers.io() I/O 操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程。
Schedulers.newThread() 總是啟用新線程,并在新線程執行操作。
Schedulers.computation() 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
AndroidSchedulers.mainThread()指定的操作将在 Android 主線程運作。
subscribeOn()指定subscribe()所發生的線程,也就是Observable.OnSubscribe 被激活時所處的線程
observeOn()指定 Subscriber 所運作在的線程。
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
上面這段代碼中,由于 subscribeOn(Schedulers.io()) 的指定,被建立的事件的内容 1、2、3、4 将會在 IO 線程發出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,是以 subscriber 數字的列印将發生在主線程 。事實上,這種在 subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它适用于多數的 『背景線程取資料,主線程顯示』的程式政策。
變換
是指對事件序列的變換,也是RxJava的核心功能之一,就是将事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。RxJava不僅支援事件對象的變換,也可以實作事件序列的變換。
map()
使用map()可以實作事件對象的變換,這是最常用的變換之一,它把Observable的對象1轉化為對象2發送給Subscriber,看看下面的用法:
Observable.just(1)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer) + "--轉化後";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "call: " + s);
}
});
這裡map接收的參數是一個Func1,和之前的Action1類似,也是一個接口,裡面隻有一個call方法,但是Func1是帶傳回值的。從上面的代碼可以看到,是把一個Integer類型轉化為了String類型傳回,然後觀察者接收到這個String類型的值。
flatMap()
flatMap()也是一個把一個對象轉化為另外一個對象,但是比之map()更加的難懂一些,下面以一個具體的例子來說明:
比如,有這樣一個需求,有一個對象是Student,一個Student對應着有多個課程,現在要列印出每一個Student的每一個課程。現在該怎麼做?如果還是用map()的話,顯然做不到,map隻是一對一的轉換,而現在需要的是一對多的轉換,是以這時就需要用到flatMap了,先看看具體的代碼
public class Student {
private List<String> course;
public List<String> getCourse() {
return course;
}
public void setCourse(List<String> course) {
this.course = course;
}
}
//初始化資料
Student[] students = new Student[3];
for (int i = 0; i < 3; i++) {
List<String> list = new ArrayList<>();
for (int j = 0; j < 2; j++) {
list.add("學生:" + i + "的課程:" + j);
}
Student student = new Student();
student.setCourse(list);
students[i] = student;
}
Observable.from(students)
.flatMap(new Func1<Student, Observable<String>>() {
@Override
public Observable<String> call(Student student) {
return Observable.from(student.getCourse());
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: " + s);
}
});
可以從上面看到在flatMap中的call方法傳回是一個Observable對象,這個Observable對象是由傳入的事件對象(Student)建立的。這裡傳回的Observable對象也不是直接發送給Subscriber,而是把它激活,開始發送事件,這裡發送的事件就是Subscriber所接受到的事件,是以在最後列印出來的就是每個學生的課程。是以總結來說,就是以下幾個步驟:
1.利用傳入的對象建立一個Observable
2.并不直接發送這個Observable,而是激活它,讓它開始發送事件
3.這個中間産生的Observable發送的事件就交給了Subscriber
這個過程中就相當于中間添加了一個輔助的Observable,就像把一個對象裡面包含的多個對象平鋪出來,實作一對一的去發送,這就是所謂的flat。