概述
RxJava 中的組合函數可以同時處理多個Observables來建立我們想要的Observable。組合操作符包含如下幾種
Merge
merge()将2-9個Observables合并到一個Observable中進行發射。
Merge可能會讓合并的Observables發射的資料交錯(如果想要沒有交錯,可以使用concat操作符)。
任何一個Observable發出onError的時候,onError通知會被立即傳遞給觀察者,而且會終止合并後的Observable。
如果想讓onError發生在合并後的Observable所有的資料發射完成之後,可以使用MergeDelayError
除了傳遞多個Observable,merge還可以傳遞一個Observable清單List,數組,
甚至是一個發射Observable序列的Observable,merge将合并它們的輸出作為單個Observable的輸出:
執行個體:
public void merge() {
Observable<Integer> odds = Observable.just(, , ).subscribeOn(Schedulers.io());
Observable<Integer> evens = Observable.just(, , );
Observable.merge(odds, evens)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
logger("Next: " + item);
}
@Override
public void onError(Throwable error) {
logger("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
logger("Sequence complete.");
}
});
}
列印結果:
Next:
Next:
Next:
Next:
Next:
Next:
Sequence complete.
Zip
Zip操作符将2-9個Observable發射的資料按 順序 結合兩個或多個Observables發射的資料項,每個資料隻能組合一次,而且都是有序的。
它隻發射與發射資料項最少的那個Observable一樣多的資料。Rxjava實作了zip和zipWith兩個操作符.
具體的結合方式由 第三個參數決定。
zip的最後一個參數接受每個Observable發射的一項資料,傳回被壓縮後的資料,
它可以接受一到九個參數:一個Observable序列,或者一些發射Observable的Observables。
執行個體:
- zipwith:
private Observable<String> createObserver(int index) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = ; i <= index; i++) {
logger("emitted:" + index + "-" + i);
subscriber.onNext(index + "-" + i);
try {
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).subscribeOn(Schedulers.newThread());
}
public void zipWith() {
createObserver().zipWith(createObserver(), new Func2<String, String, String>() {
@Override
public String call(String s, String s2) {
return s + "-" + s2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
logger("zipWith:" + s + "\n");
}
});
}
列印結果:
emitted:2-1
emitted:3-1
zipWith:2-1-3-1
emitted:2-2
emitted:3-2
zipWith:2-2-3-2
emitted:3-3
- zip:
public void zip() {
Observable
.zip(createObserver(), createObserver(), createObserver(), new Func3<String, String, String, String>() {
@Override
public String call(String s, String s2, String s3) {
return s + "-" + s2 + "-" + s3;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
logger("zip:" + s + "\n");
}
});
}
列印結果:
emitted:2-1
emitted:3-1
emitted:4-1
zip:2-1-3-1-4-1
emitted:2-2
emitted:3-2
emitted:4-2
zip:2-2-3-2-4-2
emitted:3-3
emitted:4-3
emitted:4-4
最終都發射出了兩個資料,因為createObserver(2)所建立的Observable隻會發射兩個資料,是以其他Observable多餘發射的資料都被丢棄了。
Join
RxJava的join()函數基于時間視窗将兩個Observables發射的資料結合在一起,
每個Observable在自己的時間視窗内都有有效的,都可以拿來組合。
Rxjava還實作了groupJoin,基本和join相同,隻是最後組合函數的參數不同。
Join(Observable,Func1,Func1,Func2) 參數說明:
- 源Observable所要組合的目标Observable
- 一個函數,接收從源Observable發射來的資料,并傳回一個Observable,這個Observable的生命周期決定了源Observable發射出來資料的有效期
- 一個函數,接收從目标Observable發射來的資料,并傳回一個Observable,這個Observable的生命周期決定了目标Observable發射出來資料的有效期
- 一個函數,接收從源Observable和目标Observable發射來的資料,并傳回最終組合完的資料。
執行個體:
private Observable<String> createJoinObserver() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = ; i < ; i++) {
subscriber.onNext("Right-" + i);
try {
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).subscribeOn(Schedulers.newThread());
}
public void join() {
Observable.just("Left-").join(createJoinObserver(),
integer -> Observable.timer(, TimeUnit.MILLISECONDS),
integer -> Observable.timer(, TimeUnit.MILLISECONDS),
(i, j) -> i + j
).subscribe(new Action1<String>() {
@Override
public void call(String s) {
logger("join:" + s + "\n");
}
});
}
列印結果:
join:Left-Right-1 –>隔1s
join:Left-Right-2 –>隔1s
join:Left-Right-3
groupJoin(Observable,Func1,Func1,Func2):第三個函數的參數:原Observable發射來的資料,要組合的目标Observable
執行個體:
public void groupJoin() {
Observable.just("Left-")
.groupJoin(createJoinObserver(),
s -> Observable.timer(, TimeUnit.MILLISECONDS),
s -> Observable.timer(, TimeUnit.MILLISECONDS),
(s, stringObservable) -> stringObservable.map(str -> s + str))
.subscribe(new Action1<Observable<String>>() {
@Override
public void call(Observable<String> stringObservable) {
stringObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
logger("groupJoin:" + s + "\n");
}
});
}
});
}
列印結果:
groupJoin:Left-Right-1 –>隔1s
groupJoin:Left-Right-2 –>隔1s
groupJoin:Left-Right-3
兩個結果一緻。原Observable發射了一個有效期為3s的資料,目标Observable每1s發射一個有效期為2s的資料,總共4個。
但是最終的組合結果也隻有3個資料。
combineLatest
combineLatest接受二到九個Observable作為參數,或者單個Observables清單作為參數。它預設不在任何特定的排程器上執行。
RxJava的combineLatest()函數有點像zip()函數的特殊形式,zip()作用于最近未打包的兩個Observables。相反,combineLatest()作用于最近發射的資料項.
zip 中隻有當原始的Observable中的每一個都發射了一條資料時zip才發射資料。
CombineLatest則在原始的Observable中任意一個發射了資料時發射一條資料。
使用執行個體:
public void combineLatest() {
Observable.combineLatest(createCombineLatest(), createCombineLatest(), (num1, num2) -> {
logger("left:" + num1 + " right:" + num2);
return num1 + num2;
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
logger("combineList:" + integer);
}
});
}
列印結果:
left:1 right:2
CombineLatest:3
left:2 right:2
CombineLatest:4
left:2 right:4
CombineLatest:6
Rxjava實作CombineLast操作符可以讓我們直接将組裝的Observable作為參數傳值,也可以将所有的Observable裝在一個List裡面穿進去。
使用執行個體:
List<Observable<Integer>> list = new ArrayList<>();
public void combineList() {
for (int i = ; i < ; i++) {
list.add(createCombineLatest(i));
}
Observable.combineLatest(list, args -> {
int temp = ;
for (Object i : args) {
logger(i);
temp += (Integer) i;
}
return temp;
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
logger("CombineLatest:" + integer);
}
});
}
列印結果:
1
2
combineList:3
2
2
combineList:4
2
4
combineList:6
Switch
将一個發射多個Observables的Observable轉換成另一個單獨的Observable,後者發射那些Observables最近發射的資料項
switch操作符在Rxjava上的實作為switchOnNext.
用來将一個發射多個小Observable的源Observable轉化為一個Observable,
然後發射這多個小Observable所發射的資料。
當原始Observable發射了一個新的Observable時(不是這個新的Observable發射了一條資料時),它将取消訂閱之前的那個Observable。
這意味着,在後來那個Observable産生之後到它開始發射資料之前的這段時間裡,前一個Observable發射的資料将被丢棄
執行個體代碼:
private Observable<String> createSwitchObserver(int index) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = ; i < ; i++) {
subscriber.onNext(index + "-" + i);
try {
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).subscribeOn(Schedulers.newThread());
}
public void switchObserver()
{
Observable.switchOnNext(Observable.create(
new Observable.OnSubscribe<Observable<String>>() {
@Override
public void call(Subscriber<? super Observable<String>> subscriber) {
for (int i = ; i < ; i++) {
subscriber.onNext(createSwitchObserver(i));
try {
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
))
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
logger("switch:" + s);
}
});
}
列印結果:
switch:1-1
switch:1-2
switch:2-1
switch:2-2
switch:2-3
switch:2-4
從列印結果看,第一個小Observable隻發射出了兩個資料,
第二個小Observable就被源Observable發射出來了,是以 第一個 接下來的兩個資料被丢棄。
StartWith
StartWith操作符會在源Observable發射的資料前面插上一些資料。
startWith可接受一個Iterable或者多個Observable作為函數的參數。
執行個體代碼:
public void startWith() {
Observable.just(, , ).startWith(-, )
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
logger("startWith:" + integer);
}
});
}
列印結果:
startWith:-1
startWith:0
startWith:1
startWith:2
startWith:3
可以看到-1和0插入到發射序列的前面
使用Demo:Combining.java
參考:ReactiveX中文翻譯文檔
RxJava部分操作符介紹
RxJava開發精要6 - 組合Observables