Rx并不是一種新的語言,而是一種普通的Java模式,類似于觀察者模式(Observer Pattern),可以将它看作一個普通的Java類庫,是以你可以立即使用RxJava。而RxAndroid是RxJava的一個針對Android平台的擴充,主要用于 Android 開發
-
API 介紹和原理簡析
RxJava 的異步實作,是通過一種擴充的觀察者模式來實作的。
RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。
Observable 和 Observer 通過 subscribe() 方法實作訂閱關系,進而 Observable 可以在需要的時候發出事件來通知 Observer。
與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件 onNext() (相當于 onClick() / onEvent())之外,
還定義了兩個特殊的事件:onCompleted() 和 onError()。
onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為标志。
onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。
在一個正确運作的事件序列中, onCompleted() 和 onError() 有且隻有一個,并且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。
- 基本實作
) 建立觀察者Observer
Observer 即觀察者,它決定事件觸發的時候将有怎樣的行為。 RxJava 中的 Observer 接口的實作方式:
Observer<String > observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.e(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.e(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.e(tag, "Error!");
}
};
除了 Observer 接口之外,RxJava 還内置了一個實作了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴充,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.e(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.e(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.e(tag, "Error!");
}
};
Observer 和 Subscriber它們的差別對于使用者來說主要有兩點:
onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用,可以用于做一些準備工作,例如資料的清零或重置。這是一個可選方法,預設情況下它的實作為空。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不适用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的文中看到。
unsubscribe(): 這是 Subscriber 所實作的另一個接口 Subscription 的方法,用于取消訂閱。在這個方法被調用後,Subscriber 将不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀态。 unsubscribe() 這個方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,将有記憶體洩露的風險。是以最好保持一個原則:要在不再使用的時候盡快在合适的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免記憶體洩露的發生。
) 建立 Observable
Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,并為它定義事件觸發規則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
Observable observable = Observable.just("Hello","Hi","Aloha"); 将傳入的參數依次發送出來。
List<String> list = new ArrayList();
list.add( "1" ) ;
list.add( "2" ) ;
list.add( "3" ) ;
Observable observable = Observable.from(list);
from(T[]) / from(Iterable<? extends T>) : 将傳入的數組或 Iterable 拆分成具體對象後,依次發送出來。
3) Subscribe (訂閱)
建立了 Observable 和 Observer 之後,再用 subscribe() 方法将它們聯結起來,整條鍊子就可以工作了。代碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
3 線程控制 —— Scheduler (一)
在RxJava 中,Scheduler ——排程器,相當于線程控制器,RxJava 通過它來指定每一段代碼應該運作在什麼樣的線程。RxJava 已經内置了幾個 Scheduler ,它們已經适合大多數的使用場景:
Schedulers.immediate(): 直接在目前線程運作,相當于不指定線程。這是預設的 Scheduler。
Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。
Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程。
Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
AndroidSchedulers.mainThread(),它指定的操作将在 Android 主線程運作。
有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件産生的線程。 * observeOn(): 指定 Subscriber 所運作在的線程。或者叫做事件消費的線程。
Observable.just(, , , )
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.e(tag, "number:" + number);
}
});
4 變換
所謂變換,就是将事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列
map(): 事件對象的直接變換,它是 RxJava 最常用的變換。
map()使用
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.observeOn(Schedulers.newThread()) //新線程
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Log.e( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e( "rx_subscribe" , Thread.currentThread().getName());
}
}) ;
flatMap(): 這是一個很有用但非常難了解的變換,flatMap() 中傳回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發送到了 Subscriber 的回調方法中
flatMap() 的原理是這樣的: 使用傳入的事件對象建立一個 Observable 對象; 并不發送這個 Observable, 而是将它激活,于是它開始發送事件; 每一個建立出來的 Observable 發送的事件,都被彙入同一個 Observable ,而這個 Observable 負責将這些事件統一交給 Subscriber 的回調方法
flatMap()使用
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
) 變換的原理:lift()
在 Observable 執行了 lift(Operator) 方法之後,會傳回一個新的 Observable,這個新的 Observable 會像一個代理一樣,負責接收原始的 Observable 發出的事件,并在處理後發送給 Subscriber。
) compose: 對 Observable 整體的變換
它和 lift() 的差別在于, lift() 是針對事件項和事件序列的,而 compose() 是針對 Observable 自身進行變換
Observable.just("123")
.compose(RxUtil.<String>applySchedulers())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("call---subscribe ",Thread.currentThread().getName() +";内容="+s);
}
});
5 線程控制:Scheduler (二)
操作符(Operators):操作符就是為了解決對Observable對象的變換的問題
Observable和Subscriber可以做任何事情
Observable可以是一個資料庫查詢,Subscriber用來顯示查詢結果;Observable可以是螢幕上的點選事件,Subscriber用來響應點選事件;Observable可以是一個網絡請求,Subscriber用來顯示請求結果。
Observable和Subscriber是獨立于中間的變換過程的。
在Observable和Subscriber中間可以增減任何數量的map。整個系統是高度可組合的,操作資料是一個很簡單的過程。
filter()輸出和輸入相同的元素,并且會過濾掉那些不滿足檢查條件的。
take()輸出最多指定數量的結果
doOnNext()允許我們在每次輸出一個元素之前做一些額外的事情,比如這裡的儲存标題
subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件産生的線程。
observeOn(): 指定 Subscriber 所運作在的線程。或者叫做事件消費的線程。
使用Observable.fromCallable()方法有兩點好處:
擷取要發送的資料的代碼隻會在有Observer訂閱之後執行。
擷取資料的代碼可以在子線程中執行。
zip 操作符,合并多個觀察對象的資料。并且允許 Func2()函數重新發送合并後的資料
scan累加器操作符的使用
filter 過濾操作符的使用
take :取前n個資料
takeLast:取後n個資料
first 隻發送第一個資料
last 隻發送最後一個資料
skip() 跳過前n個資料發送後面的資料
skipLast() 跳過最後n個資料,發送前面的資料
elementAt 、elementAtOrDefault
elementAt() 發送資料序列中第n個資料 ,序列号從開始
如果該序号大于資料序列中的最大序列号,則會抛出異常,程式崩潰
是以在用elementAt操作符的時候,要注意判斷發送的資料序列号是否越界
elementAtOrDefault( int n , Object default ) 發送資料序列中第n個資料 ,序列号從開始。
如果序列中沒有該序列号,則發送預設值
startWith() 插入資料
delay操作符,延遲資料發送
Timer 延時操作符的使用
interval 輪詢操作符,循環發送資料,資料從開始遞增
doOnNext() 操作符,在每次 OnNext() 方法被調用前執行
使用場景:從網絡請求資料,在資料被展示前,緩存到本地
、Buffer 操作符
Buffer( int n ) 把n個資料打成一個list包,然後再次發送。
Buffer( int n , int skip) 把n個資料打成一個list包,然後跳過第skip個資料。
throttleFirst 操作符
使用場景:、button按鈕防抖操作,防連續點選 、百度關鍵詞聯想,在一段時間内隻聯想一次,防止頻繁請求伺服器
distinct 過濾重複的資料
distinctUntilChanged() 過濾連續重複的資料
debounce() 操作符
一段時間内沒有變化,就會發送一個資料。
doOnSubscribe()
使用場景: 可以在事件發出之前做一些初始化的工作,比如彈出進度條等等
range 操作符的使用
defer 操作符
just操作符是在建立Observable就進行了指派操作,而defer是在訂閱者訂閱時才建立Observable,此時才進行真正的指派操作。
.subscribeOn(Schedulers.io()) 和 .observeOn(AndroidSchedulers.mainThread()) 寫的位置不一樣,造成的結果也不一樣。從例中可以看出 map() 操作符預設運作在事件産生的線程之中。事件消費隻是在 subscribe() 裡面。
對于 create() , just() , from() 等 --- 事件産生
map() , flapMap() , scan() , filter() 等 -- 事件加工
subscribe() -- 事件消費
事件産生:預設運作在目前線程,可以由 subscribeOn() 自定義線程
事件加工:預設跟事件産生的線程保持一緻, 可以由 observeOn() 自定義線程
事件消費:預設運作在目前線程,可以有observeOn() 自定義
如果隻規定了事件産生的線程,那麼事件消費線程将跟随事件産生線程。
如果隻規定了事件消費的線程,那麼事件産生的線程和 目前線程保持一緻
6 Retrofit的使用就是以下幾步:
定義接口,參數聲明,Url都通過Annotation指定
通過RestAdapter生成一個接口的實作類(動态代理)
調用接口請求資料
建立Retrofit 執行個體:
public static final String BASE_URL = "http://api.myservice.com";
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(BASE_URL)
.addConverterFactory(GsonConverterFactory.create())
.build();
定義 Endpoints:interface 用下面的方式定義了每個endpoint
public interface MyApiEndpointInterface {
// Request method and URL specified in the annotation
// Callback for the parsed response is the last parameter
@GET("/users/{username}")
Call<User> getUser(@Path("username") String username);
@GET("/group/{id}/users")
Call<List<User>> groupList(@Path("id") int groupId, @Query("sort") String sort);
@POST("/users/new")
Call<User> createUser(@Body User user);
}
Multipart forms:
如果你要送出多參數表單資料(multi-part form data),可以使用@Multipart與@Part注解:
String username = "sarahjean";
Call<User> call = apiService.getUser(username);
call.enqueue(new Callback<User>() {
@Override
public void onResponse(Response<User> response) {
int statusCode = response.code();
User user = response.body();
}
@Override
public void onFailure(Throwable t) {
// Log error here since request failed
}
});