天天看點

RxJava操作符(三)----合并操作符

RxJava各類型操作符詳解如下:

RxJava操作符彙總

RxJava操作符(一) —-建立操作符

RxJava操作符(二)—-轉換操作符

RxJava操作符(三)—-合并操作符

RxJava操作符(四)—-功能操作符

RxJava操作符(五) —-過濾操作符

RxJava操作符(六)—-條件操作符

合并操作符 : 組合多個被觀察者(Observable)&合并需要發送的事件。 包含:concatMap(),concat(), merge(),mergeArray(),concateArray(),reduce(),collect(),startWith(),zip(),count()。

1、merge(),concat ()操作符

/**
     * ========================merge,concat 操作符 ======================================
     * 
     * merge操作符是把多個Observable合并成一個進行發射。merge可能會讓合并到Observable的資料順序發生錯亂(組合被觀察者數量<=4個)(并行無序)
     * mergeArray操作符和merge作用一樣,但不同的是組合被觀察者數量>4個)(并行無序)
     * 
     * concat操作符也是把多個Observable合并成一個進行發射。但concat則保證合并的每個Observable的事件按順序發射出去。(組合被觀察者數量<=4個)(串行有序)
     * concatArray操作符和concat作用一樣,但不同的是組合被觀察者數量>4個)(串行有序)
     */
    public static void merge() {
        Observable observable1 = Observable.just(, , );
        Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");

        Observable
                .merge(observable1, observable2).delay(, TimeUnit.SECONDS)
                .subscribe(new Observer() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Object o) {
                        Log.d(TAG + "merge", o.toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG + "merge", "onComplete");
                    }
                });
    }
           

輸出如下:

RxJava操作符(三)----合并操作符

2、concatDelayError()/mergeDelayError() 操作符

/**
     * ========================concatDelayError()/mergeDelayError() 操作符 ======================================
     * 
     * 這兩個操作符的作用是: 使用concat()和merge()操作符時,若其中一個被觀察者發送onError事件,則會馬上終止其它被觀察者繼續發送事件。是以呐,這時使用concatError()/
     * mergeDelayError()事件可以使onError事件推遲到其它被觀察者發送事件結束後在再觸發
     */
    public static void concatDelayError() {

        Observable
                .concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext();
                        emitter.onNext();
                        emitter.onError(new NullPointerException());
                        emitter.onNext();
                        emitter.onNext();
                    }
                }), Observable.just(, ))


                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG + "cDelayError", String.valueOf(integer));
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG + "cDelayError", "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG + "cDelayError", "onComplete");
                    }
                });
    }
           

輸出如下:

RxJava操作符(三)----合并操作符

3、zip 操作符

/**
     * ========================zip 操作符 ======================================
     * 
     * 把多個Observable合并後,并且把這些Observable的資料進行轉換再發射出去。轉換之後的資料數目由最短資料長度的那個Observable決定。發射完最終會自動調用觀察者的onComplete方法()
     * 
     * 如以下代碼: 資料長度為4的observable1和資料長度為3的observable2進行合并轉換後,觀察者隻接收到3個資料
     */

    public static void zip() {

        Observable observable1 = Observable.just(, , , );
        Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");


        Observable
                .zip(observable1, observable2, new BiFunction<Integer, String, String>() {

                    @Override
                    public String apply(Integer integer, String s) throws Exception {
                        return s + integer;
                    }
                })
                .subscribe(new Observer() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Object o) {
                        Log.d(TAG + "zip", o.toString());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG + "merge", "onComplete");
                    }
                });
    }
           

輸出如下:

RxJava操作符(三)----合并操作符

4、combineLatest 操作符

/**
     * ========================combineLatest 操作符 ======================================
     * 
     * 當兩個Observable 中的任何一個發送了資料,将先發送了資料的Observable的最新(最後)一個資料和另一個Observable發送的每個資料結合,最終基于該結合的結果發送資料
     * 
     * 與zip()的差別: zip()是按個數合并,即對合并;而combineLatest()是基于時間合并,,即在同一時間點上合并
     */

/**
     *
     *  ======================combineLatestDelayError =================================
     *
     *  作用類似于concatDelayError() / mergeDelayError(),用于錯誤處理

    public static void combineLatest() {

        Observable
                .combineLatest(Observable.just(, , )
                        , Observable.intervalRange(, , , , TimeUnit.SECONDS)
                        , new BiFunction<Integer, Long, String>() {
                            @Override
                            public String apply(Integer integer, Long aLong) throws Exception {
                                return "合并後的資料為:" + integer + aLong;
                            }
                        })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG + "combineLatest", s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG + "combineLatest", "onComplete");
                    }
                });
    }
           

輸出如下:

RxJava操作符(三)----合并操作符

5、reduce ()操作符

/**
     * ======================reduce  操作符=================================
     * 
     * 把被觀察者需要發送的資料按照指定規則聚合成一個資料發送
     * 
     * 聚合的規則需要我們編寫,内部流程是前兩個資料按照我們的規則合并後,再與後面的資料按規則合并,依次類推。這樣說有點抽象,看下面的例子。
     */
    public static void reduce() {

        Observable
                .just(, , , , )
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        Log.d(TAG + "reduce", "本次合并的過程是:  " + integer + "+" + integer2);
                        return integer + integer2;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "reduce", "最終計算的結果是 :  " + integer);
                    }
                });
    }
           

輸出如下:

RxJava操作符(三)----合并操作符

6、collect() 操作符

/**
     * ========================collect 操作符=================================
     * 
     * 作用是把 Observable(被觀察者)發送的事件收集到一個資料結構中
     */
    public static void collect() {

        Observable
                .just(, , , , )
                .collect(new Callable<ArrayList<Integer>>() {
                    @Override
                    public ArrayList<Integer> call() throws Exception {
                        return new ArrayList<>();
                    }
                }, new BiConsumer<ArrayList<Integer>, Integer>() {
                    @Override
                    public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
                        integers.add(integer);
                    }
                })
                .subscribe(new Consumer<ArrayList<Integer>>() {
                    @Override
                    public void accept(ArrayList<Integer> integers) throws Exception {
                        Log.d(TAG + "collect", integers.toString());
                    }
                });
    }

           

輸出如下:

RxJava操作符(三)----合并操作符

7、startWith()/startWithArray() 操作符

/**
    * ========================startWith/startWithArray 操作符=================================
    * 
    * 在一個被觀察者發送時間前,追加發送一些資料/一個新的被觀察者
    */
    public static void startWith() {

        Observable.just(, , )
                .startWith()   //在發送序列去追加單個資料
                .startWithArray(, )  //在發送序列去追加多個資料
                .startWith(Observable.just(, , ))  //在發送序列去追加單個被觀察者
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "startWith", String.valueOf(integer));
                    }
                });
    }
           

輸出如下:

RxJava操作符(三)----合并操作符

8、count() 操作符

/**
    * ========================count 操作符=================================
    * 
    * 統計被觀察者發送事件數量
    */
    public static void count() {
        Observable
                .just(, , , )
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "count", "發送事件的數量 : " + aLong);
                    }
                });
    }
           

輸出如下:

RxJava操作符(三)----合并操作符

上面代碼位址

RxJava各類型操作符詳解如下:

RxJava操作符彙總

RxJava操作符(一) —-建立操作符

RxJava操作符(二)—-轉換操作符

RxJava操作符(三)—-合并操作符

RxJava操作符(四)—-功能操作符

RxJava操作符(五) —-過濾操作符

RxJava操作符(六)—-條件操作符