今天我們來說一下RxJava2的相關操作符。
操作符
Create
create 操作符應該是最常見的操作符了,主要用于産生一個 Obserable 被觀察者對象。
以後統一把被觀察者 Observable 稱為發射器(上遊事件),觀察者 Observer 稱為接收器(下遊事件)。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
mRxOperatorsText.append("Observable emit 1" + "\n");
Log.e(TAG, "Observable emit 1" + "\n");
e.onNext();
Log.e(TAG, "Observable emit 2" + "\n");
e.onNext();
Log.e(TAG, "Observable emit 3" + "\n");
e.onNext();
e.onComplete();
Log.e(TAG, "Observable emit 4" + "\n" );
e.onNext();
}
}).subscribe(new Observer<Integer>() {
private int i;
private Disposable mDisposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );
mDisposable = d;
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "onNext : value : " + integer + "\n" );
i++;
if (i == ) {
// 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上遊事件
mDisposable.dispose();
Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete" + "\n" );
}
});
Map
它的作用就是對上遊發送的每一個事件應用一個函數, 使得每一個事件都按照指定的函數去變化。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext();
emitter.onNext();
emitter.onNext();
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
//結果
This is result
This is result
This is result
map 基本作用就是将一個 Observable 通過某種函數關系,轉換為另一種 Observable,上面例子中就是把我們的 Integer 資料變成了 String 類型。
FlatMap
FlatMap将一個發送事件的上遊Observable變換為多個發送事件的Observables,然後将它們發射的事件合并後放進一個單獨的Observable裡,FlatMap 并不能保證事件的順序,如果需要保證,需要用到我們下面要講的 ConcatMap。
我們可以使用下面這張圖來了解一下:
上遊每發送一個事件, FlatMap都将建立一個新的水管, 然後發送轉換之後的新的事件, 下遊接收到的就是這些新的水管發送的資料。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext();
emitter.onNext();
emitter.onNext();
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = ; i < ; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(,TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
//結果
I am value
I am value
I am value
I am value
I am value
I am value
I am value
I am value
I am value
我們在flatMap中将上遊發來的每個事件轉換為一個新的發送三個String事件的水管, 為了看到flatMap結果是無序的,是以加了10毫秒的延時。
concatMap
它和flatMap的作用幾乎一模一樣, 隻是它的結果是嚴格按照上遊發送的順序來發送的。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext();
emitter.onNext();
emitter.onNext();
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = ; i < ; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(,TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
//結果
I am value
I am value
I am value
I am value
I am value
I am value
I am value
I am value
I am value
我們來看一個使用FlatMap的執行個體,使用者注冊成功後立即登入的請求。
//Retrofit2的使用方法,我們後續講解
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}
可以看到登入和注冊傳回的都是一個上遊Observable, 而我們的flatMap操作符的作用就是把一個Observable轉換為另一個Observable。
api.register(new RegisterRequest())//發起注冊請求
.subscribeOn(Schedulers.io())//在IO線程進行網絡請求
.observeOn(AndroidSchedulers.mainThread())//回到主線程去處理請求注冊結果
.doOnNext(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
//先根據注冊的響應結果去做一些操作
}
})
.observeOn(Schedulers.io())//回到IO線程去發起登入請求
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
return api.login(new LoginRequest());
}
})
.observeOn(AndroidSchedulers.mainThread())//回到主線程去處理請求登入的結果
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
Toast.makeText(MainActivity.this, "登入成功", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "登入失敗", Toast.LENGTH_SHORT).show();
}
});
Zip
Zip通過一個函數将多個Observable發送的事件結合到一起,然後發送這些組合到一起的事件,它按照嚴格的順序應用這個函數。它隻發射與發射資料項最少的那個Observable一樣多的資料。
上遊現在有兩根水管,其中一根水管負責發送圓形事件 , 另外一根水管負責發送三角形事件 , 通過Zip操作符, 使得圓形事件 和三角形事件 合并為了一個矩形事件。
- 組合的過程是分别從 兩根水管裡各取出一個事件 來進行組合, 并且一個事件隻能被使用一次, 組合的順序是嚴格按照事件發送的順利 來進行的, 也就是說不會出現圓形1 事件和三角形B 事件進行合并, 也不可能出現圓形2 和三角形A 進行合并的情況。
- 最終下遊收到的事件數量 是和上遊中發送事件最少的那一根水管的事件數量相同,當沒有足夠的事件來組合時,下遊就不會收到剩餘的事件。
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext();
Log.d(TAG, "emit 2");
emitter.onNext();
Log.d(TAG, "emit 3");
emitter.onNext();
Log.d(TAG, "emit 4");
emitter.onNext();
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
//兩個zip需要在不同的線程中,不然就是observable1先全部發送完成,在發送observable2
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit B");
emitter.onNext("B");
Log.d(TAG, "emit C");
emitter.onNext("C");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
//結果
onSubscribe
emit A
emit
onNext: A
emit B
emit
onNext: B
emit C
emit
onNext: C
emit complete2
onComplete
Zip的實踐場景,比如一個界面需要展示使用者的一些資訊, 而這些資訊分别要從兩個伺服器接口中擷取, 而隻有當兩個都擷取到了之後才能進行展示。
//Retrofit2用法,後續介紹
public interface Api {
@GET
Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);
@GET
Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);
}
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
//zip打包擷取兩個資訊并展示
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});
今天我們的操作符就說的這裡了,下節還是這些枯燥的操作符啊啊啊!