1. Create
create
操作符應該是最常見的操作符了,主要用于産生一個
Obserable
被觀察者對象

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
mRxOperatorsText.append("Observable emit 1" + "\n");
Log.e(TAG, "Observable emit 1" + "\n");
e.onNext(1);
mRxOperatorsText.append("Observable emit 2" + "\n");
Log.e(TAG, "Observable emit 2" + "\n");
e.onNext(2);
mRxOperatorsText.append("Observable emit 3" + "\n");
Log.e(TAG, "Observable emit 3" + "\n");
e.onNext(3);
e.onComplete();
mRxOperatorsText.append("Observable emit 4" + "\n");
Log.e(TAG, "Observable emit 4" + "\n" );
e.onNext(4);
}
})
2. Zip
zip
專用于合并事件,該合并不是連接配接(連接配接操作符後面會說),而是兩兩配對,也就意味着,最終配對出的
Observable
發射事件數目隻和少的那個相同。
Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
@Override
public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
return s + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("zip : accept : " + s + "\n");
Log.e(TAG, "zip : accept : " + s + "\n");
}
});
3. Concat
對于單一的把兩個發射器連接配接成一個發射器,雖然
zip
不能完成,但我們還是可以自力更生,官方提供的
concat
讓我們的問題得到了完美解決。
Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("concat : "+ integer + "\n");
Log.e(TAG, "concat : "+ integer + "\n" );
}
});
4. map() flatMap() concactMap()
map()
是一對一的轉化,将事件流中的一個對象轉換為另外一種類型的對象
flatMap()是一對多的轉換,且傳回的是一個Observable對象,并且這個
Observable
對象并不是被直接發送到了
Subscriber
的回調方法中。
flatMap()
的原理是這樣的:1. 使用傳入的事件對象建立一個
Observable
對象;2. 并不發送這個
Observable
, 而是将它激活,于是它開始發送事件;3. 每一個建立出來的
Observable
發送的事件,都被彙入同一個
Observable
,而這個
Observable
負責将這些事件統一交給
Subscriber
的回調方法。這三個步驟,把事件拆成了兩級,通過一組新建立的
Observable
将初始的對象『鋪平』之後通過統一路徑分發了下去。而這個『鋪平』就是
flatMap()
所謂的 flat
如列印一組學生的課程資訊:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "flatMap : accept : " + s + "\n");
mRxOperatorsText.append("flatMap : accept : " + s + "\n");
}
});
flatMap不保證事件的順序,示意圖:
⚠️注意:由于可以在嵌套的
Observable
中添加異步代碼,
flatMap()
也常用于嵌套的異步操作,例如嵌套的網絡請求。示例代碼(Retrofit + RxJava):
networkClient.token() // 傳回 Observable<String>,在訂閱時請求 token,并在響應後發送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 傳回 Observable<Messages>,在訂閱時請求消息清單,并在響應後發送請求到的消息清單
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 處理顯示消息清單
showMessages(messages);
}
});
傳統的嵌套請求需要使用嵌套的 Callback 來實作。而通過
flatMap()
,可以把嵌套的請求寫在一條鍊中,進而保持程式邏輯的清晰