RxJava各類型操作符詳解如下:
RxJava操作符彙總
RxJava操作符(一) —-建立操作符
RxJava操作符(二)—-轉換操作符
RxJava操作符(三)—-合并操作符
RxJava操作符(四)—-功能操作符
RxJava操作符(五) —-過濾操作符
RxJava操作符(六)—-條件操作符
合并操作符 : 組合多個被觀察者(Observable)&合并需要發送的事件。 包含:concatMap(),concat(), merge(),mergeArray(),concateArray(),reduce(),collect(),startWith(),zip(),count()。
1、merge(),concat ()操作符
/**
* ========================merge,concat 操作符 ======================================
*
* merge操作符是把多個Observable合并成一個進行發射。merge可能會讓合并到Observable的資料順序發生錯亂(組合被觀察者數量<=4個)(并行無序)
* mergeArray操作符和merge作用一樣,但不同的是組合被觀察者數量>4個)(并行無序)
*
* concat操作符也是把多個Observable合并成一個進行發射。但concat則保證合并的每個Observable的事件按順序發射出去。(組合被觀察者數量<=4個)(串行有序)
* concatArray操作符和concat作用一樣,但不同的是組合被觀察者數量>4個)(串行有序)
*/
public static void merge() {
Observable observable1 = Observable.just(, , );
Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");
Observable
.merge(observable1, observable2).delay(, TimeUnit.SECONDS)
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.d(TAG + "merge", o.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "merge", "onComplete");
}
});
}
輸出如下:
2、concatDelayError()/mergeDelayError() 操作符
/**
* ========================concatDelayError()/mergeDelayError() 操作符 ======================================
*
* 這兩個操作符的作用是: 使用concat()和merge()操作符時,若其中一個被觀察者發送onError事件,則會馬上終止其它被觀察者繼續發送事件。是以呐,這時使用concatError()/
* mergeDelayError()事件可以使onError事件推遲到其它被觀察者發送事件結束後在再觸發
*/
public static void concatDelayError() {
Observable
.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext();
emitter.onNext();
emitter.onError(new NullPointerException());
emitter.onNext();
emitter.onNext();
}
}), Observable.just(, ))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "cDelayError", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG + "cDelayError", "onError");
}
@Override
public void onComplete() {
Log.d(TAG + "cDelayError", "onComplete");
}
});
}
輸出如下:
3、zip 操作符
/**
* ========================zip 操作符 ======================================
*
* 把多個Observable合并後,并且把這些Observable的資料進行轉換再發射出去。轉換之後的資料數目由最短資料長度的那個Observable決定。發射完最終會自動調用觀察者的onComplete方法()
*
* 如以下代碼: 資料長度為4的observable1和資料長度為3的observable2進行合并轉換後,觀察者隻接收到3個資料
*/
public static void zip() {
Observable observable1 = Observable.just(, , , );
Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");
Observable
.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return s + integer;
}
})
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.d(TAG + "zip", o.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "merge", "onComplete");
}
});
}
輸出如下:
4、combineLatest 操作符
/**
* ========================combineLatest 操作符 ======================================
*
* 當兩個Observable 中的任何一個發送了資料,将先發送了資料的Observable的最新(最後)一個資料和另一個Observable發送的每個資料結合,最終基于該結合的結果發送資料
*
* 與zip()的差別: zip()是按個數合并,即對合并;而combineLatest()是基于時間合并,,即在同一時間點上合并
*/
/**
*
* ======================combineLatestDelayError =================================
*
* 作用類似于concatDelayError() / mergeDelayError(),用于錯誤處理
public static void combineLatest() {
Observable
.combineLatest(Observable.just(, , )
, Observable.intervalRange(, , , , TimeUnit.SECONDS)
, new BiFunction<Integer, Long, String>() {
@Override
public String apply(Integer integer, Long aLong) throws Exception {
return "合并後的資料為:" + integer + aLong;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "combineLatest", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "combineLatest", "onComplete");
}
});
}
輸出如下:
5、reduce ()操作符
/**
* ======================reduce 操作符=================================
*
* 把被觀察者需要發送的資料按照指定規則聚合成一個資料發送
*
* 聚合的規則需要我們編寫,内部流程是前兩個資料按照我們的規則合并後,再與後面的資料按規則合并,依次類推。這樣說有點抽象,看下面的例子。
*/
public static void reduce() {
Observable
.just(, , , , )
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG + "reduce", "本次合并的過程是: " + integer + "+" + integer2);
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "reduce", "最終計算的結果是 : " + integer);
}
});
}
輸出如下:
6、collect() 操作符
/**
* ========================collect 操作符=================================
*
* 作用是把 Observable(被觀察者)發送的事件收集到一個資料結構中
*/
public static void collect() {
Observable
.just(, , , , )
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
integers.add(integer);
}
})
.subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> integers) throws Exception {
Log.d(TAG + "collect", integers.toString());
}
});
}
輸出如下:
7、startWith()/startWithArray() 操作符
/**
* ========================startWith/startWithArray 操作符=================================
*
* 在一個被觀察者發送時間前,追加發送一些資料/一個新的被觀察者
*/
public static void startWith() {
Observable.just(, , )
.startWith() //在發送序列去追加單個資料
.startWithArray(, ) //在發送序列去追加多個資料
.startWith(Observable.just(, , )) //在發送序列去追加單個被觀察者
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "startWith", String.valueOf(integer));
}
});
}
輸出如下:
8、count() 操作符
/**
* ========================count 操作符=================================
*
* 統計被觀察者發送事件數量
*/
public static void count() {
Observable
.just(, , , )
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "count", "發送事件的數量 : " + aLong);
}
});
}
輸出如下:
上面代碼位址
RxJava各類型操作符詳解如下:
RxJava操作符彙總
RxJava操作符(一) —-建立操作符
RxJava操作符(二)—-轉換操作符
RxJava操作符(三)—-合并操作符
RxJava操作符(四)—-功能操作符
RxJava操作符(五) —-過濾操作符
RxJava操作符(六)—-條件操作符