天天看點

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

前言:九牛一毫莫自誇,驕傲自滿必翻車,曆覽古今多少事,成由謙遜敗由奢。

一、概述

RxJava2在第一篇文章中基本用法作了詳細的介紹,是一個基于事件流的異步操作庫。相信大家對RxJava有了一定的了解,由于篇幅過長是以重新寫了一篇,如果不了解Rxjava2可以參考下RxJava2最全面、最詳細的講解(一)。下面開始繼續講解RxJava2的其他用法。(源碼和其他連結在文章最後給出)

在使用前記得在build.gradle檔案中添加依賴:

implementation 'io.reactivex.rxjava2:rxjava:2.0.4'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
           

注意:RxJava1與RxJava2依賴不能共存。不能同時添加RxJava1與RxJava2的依賴,隻能添加其中一個。

RxJava的操作符就是java中的用法,詳盡的操作符請看官網ReactiveX的官方網站,這裡列舉了一部分:

Creating Observables(Observable的建立操作符),比如:Observable.create()、Observable.just()、Observable.from()等等;

Transforming Observables(Observable的轉換操作符),比如:observable.map()、observable.flatMap()、observable.buffer()等等;

Filtering Observables(Observable的過濾操作符),比如:observable.filter()、observable.sample()、observable.take()等等;

Combining Observables(Observable的組合操作符),比如:observable.join()、observable.merge()、observable.combineLatest()等等;

Error Handling Operators(Observable的錯誤處理操作符),比如:observable.onErrorResumeNext()、observable.retry()等等;

Observable Utility Operators(Observable的功能性操作符),比如:observable.subscribeOn()、observable.observeOn()、observable.delay()等等;

Conditional and Boolean Operators(Observable的條件操作符),比如:observable.amb()、observable.contains()、observable.skipUntil()等等;

Mathematical and Aggregate Operators(Observable數學運算及聚合操作符),比如:observable.count()、observable.reduce()、observable.concat()等等;

其他如observable.toList()、observable.connect()、observable.publish()等等;

二、變換操作符

變換操作符的主要是對事件序列中的事件進行處理變換,使其轉變成不同的事件,再加以處理。這裡列舉幾種常用的操作符

1.Map()

map操作符把被觀察者Observable産生的結果通過映射規則轉換成另一種結果集,并交給訂閱者處理。簡單來說就是對被觀察者發送的每個事件都通過指定函數的處理,進而轉變成另一種事件。

//鍊式程式設計
Observable.just(1, 2, 3, 4, 5)
           //使用Map操作符中的Function函數對被觀察者發送的事件統一作出處理
           .map(new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer integer) throws Exception {
            //對被觀察者just發送的結果,都全部乘以10處理
            return integer * 10;
        }
    }).subscribe(new Consumer<Integer>() {//訂閱
        @Override
        public void accept(Integer integer) throws Exception {//接受事件結果,是處理後的結果
            Log.e(TAG, "map:accept == " + integer);
        }
    });
           

上面的例子中我們使用just()操作符連續發送1,2,3,4,5等5個事件,通過Map操作符中的Function函數對被觀察者發送的事件統一作出乘以10處理,訂閱後輸出最終結果:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

2.flatMap()

flatMap()操作符是将Observable(被觀察者)産生的結果拆分和單獨轉換變成多個Observable,然後把多個Observable“扁平化”整合成新的一個Observable,并依次送出産生的結果給訂閱者。

大意為:flatMap()通過傳入一個函數作為參數轉換源Observable(被觀察者),在這個函數中你可以自定義轉換規則,最後在這個函數中傳回一個新的Observable,然後flatMap函數回調方法通過合并這些Observable成一個新的Observable,發送給Observer(觀察者)。(理論比較抽象,我們來看例子)

Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) {
            e.onNext("A");
            e.onNext("B");
            e.onComplete();
        }
    }).flatMap(new Function<String, ObservableSource<String>>() {
        @Override//通過flatMap将被觀察者生産的事件進行拆分,再将新的事件轉換成一個新的Observable發送
        public ObservableSource<String> apply(String s) {
            List<String> list = new ArrayList<>();
            Log.e(TAG, "flatMap:apply == 事件" + s);
            //将一個事件拆分成兩個子事件,例如将A事件拆分成A0,A1兩個事件,然後再整個合成一個Observable通過fromIterable發送給訂閱者
            for (int j = 0; j < 2; j++) {
                list.add("拆分後的子事件" + s + j);
            }
            return Observable.fromIterable(list);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) {
            Log.e(TAG, "flatMap:accept == " + s);
        }
    });
           

上面我們發送了A、B兩個事件,在flatMap中接收到A事件後,将A事件拆分成A0,A1兩個事件,再通過fromIterable(list)整合成一個Observable發送給訂閱者。列印log如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

注意:flatMap在合并Observable結果時,可能存在交叉的情況出現,即新合并生成的序列可能是無序的。(上面的例子我示範多次并沒有出現亂序)

3.concatMap()

concatMap()和flatMap()方法類似,都是把Observable(被觀察者)産生的事件轉換成多個Observable(被觀察者),然後把Observable整合成一個Observable,并依次送出生産的結果給訂閱者。

與flatMap()不同的是,concatMap()在處理産生的Observable時,采用的是“連接配接concat”的方式,而不是“marge合并”的方式,這樣就保證了産生事件的順序性,也就是說送出給訂閱者的結果是按照順序送出的,不會存在交叉的可能性。

Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) {
            e.onNext("A");
            e.onNext("B");
            e.onComplete();
        }
    }).concatMap(new Function<String, ObservableSource<String>>() {
        @Override//通過concatMap(有序)将被觀察者生産的事件進行拆分,再将新的事件轉換成一個新的Observable發送
        public ObservableSource<String> apply(String s) {
            List<String> strings = new ArrayList<>();
            Log.e(TAG, "concatMap:apply == 事件" + s);
            for (int j = 0; j < 2; j++) {
                strings.add("拆分後的子事件" + s + j);
            }
            return Observable.fromIterable(strings);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) {
            Log.e(TAG, "concatMap:accept == " + s);
        }
    });
           

列印的log如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

4.buffer()

buffer()操作符将Observable(被觀察者)需要發送的事件周期性收集到清單中,并把這清單送出給Observer(觀察者),觀察者處理後,清空buffer清單,同時接收下一次的結果交給訂閱者,周而複始。

需要注意的是:一但初始的Observable在産生事件中出現異常,即使buffer()收集到已經存在的結果,訂閱者也會馬上收到這個異常,并結束整個過程。

Observable.just("A", "B", "C", "D", "E")//這裡示範發送5個事件
         .buffer(3, 2)//緩存清單數量為3個,步長為2
                .subscribe(new Observer<List<String>>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "buffer:onSubscribe == 訂閱");
        }

        @Override
        public void onNext(List<String> strings) {
            Log.e(TAG, "buffer:onNext == 緩存事件數:" + strings.size());
            for (int j = 0; j < strings.size(); j++) {
                Log.e(TAG, "buffer:子事件==" + strings.get(j));
            }
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "buffer:onError == " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "buffer:onComplete == ");
        }
    });
           

上述例子設定了"ABCDE"5個事件,緩存事件數為3個,步長為2。在被觀察者的事件中,第一次擷取3個事件數放到緩存區域,即"A","B","C",發送給訂閱者;第二次擷取時,因為步長=2,是以事件往後移動2個,即指針往後移動2位,從"C"開始取三個事件,即"C","D","E"發送給訂閱者;第三次擷取時,事件再在第二次的基礎上往後移動2個,即到了"E",取3個事件,事件不足3個,隻能夠取"E"發送給訂閱者。看圖比較好了解,我們來看看buffer()的原理圖:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

列印結果如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

這裡先對變換操作符做出表格總結:

建立類型 作用 使用場景
變換操作符 map() 對Observable發送的每一個事件進行轉換 資料類型、事件等需要轉換
flatMap() 對Observable發送的整個個事件進行轉換(無序)
concatMap() 對Observable發送的整個個事件進行轉換(有序)
buffer() 從Observable的事件中擷取事件放到緩存區再發送事件

三、組合操作符

1.concat()與concatArray()

concat()與concatArray()都是組合多個被觀察者的一起發送資料,合并後安先後順序執行。

二者的差別是:concat()的觀察者數量最多是4個,而concatArray()的個數沒有限制

//concat的構造方法

concat(source1);

concat(source1, source2);

concat(source1, source2, source3);

concat(source1, source2, source3, source4);

//concatArray的構造方法

concatArray(ObservableSource<? extends T>... sources);

我們來看看簡單的運用:

Observable.concat(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6), Observable.just(7, 8))
            .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "concat:onSubscribe == 訂閱");
        }

        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "concat:onNext ==" + integer);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "concat:onError == " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "concat:onComplete == ");
        }
    });

  Observable.concatArray(Observable.just("一", "二"), Observable.just("三", "四"),
                Observable.just("五", "六"), Observable.just("七", "八"), Observable.just("九", "十"))
                .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "concatArray:onSubscribe == 訂閱");
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG, "concatArray:onNext ==" + s);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "concatArray:onError == " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "concatArray:onComplete == ");
        }
    });
           

上面concat()建立了四個被觀察者,發送1,2,3,4,5,6,7,8個事件,concatArray()建立了五個被觀察者,發送一,二,三,四,五,六,七,八,九,十,列印log如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

2.Merge()與MergeArray()

Merge()與MergeArray()組合多個被觀察者發送資料,按照時間先後順序來執行。與concat()與concatArray()組合類似

Merge()與MergeArray()的差別:Merge()隻能發送最多4個Observable(被觀察者),MergeArray()的數量沒有限制。MergeArray()這裡就不示範例子了。

Megre()的構造方法:

merge(source1);

merge(source1, source2);

merge(source1, source2,  source3);

merge(source1, source2,  source3,  source4);

MergeArray()的構造方法:

mergeArray(ObservableSource<? extends T>... sources);

//起始值為1,發送3個事件,第一個事件延遲1秒發送,事件間隔為1秒
    Observable<Long> observable1 = Observable.intervalRange(1, 3, 1, 1, TimeUnit.SECONDS);
    //起始值為10,發送3個事件,第一個事件延遲1秒發送,事件間隔為1秒
    Observable<Long> observable2 = Observable.intervalRange(10, 3, 1, 1, TimeUnit.SECONDS);

        Observable.merge(observable1, observable2)
            .subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "concatArray:onSubscribe == 訂閱");
        }

        @Override
        public void onNext(Long aLong) {
            Log.e(TAG, "concatArray:onNext ==" + aLong);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "concatArray:onError == " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "concatArray:onComplete == ");
        }
    });
           

這裡我們建立了兩個被觀察者,一個起始值為1,延遲1秒發送3個事件,另一個為起始值為10,延遲1秒發送3個事件,按照Observable送出結果的時間順序來執行,對Observable進行合并,Observable1每隔1秒産生資料為:1,2,3,Observable2每隔1秒産生的資料為:10,11,12,都延遲1秒産生,最後合并的結果為:1,10,2,11,3,12,MergeArray()同理,列印log如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

可以看出都是按照時間的先後順序來執行被觀察者發送事件。

3.concatDelayError()

concat()與concatArray()組合中,如果某一個Observable(被觀察者)發出onError()事件,則會馬上停止其他事件的發送。如果需要onError()事件推遲到其他事件發送完成才出發的話則需要用到concatDelayError()方法;

我們來看看concat()方法中途抛出異常的例子:

Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("第一個事件");
            e.onNext("第二個事件");
            e.onError(new Exception("中途抛出異常"));
            e.onComplete();
        }
    });
    Observable<String> observable2 = Observable.just("第三個事件");

    //中途抛出異常列子:
    Observable.concat(observable1, observable2).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "concatDelayError:onSubscribe == 訂閱");
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG, "concatDelayError:onNext ==" + s);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "concatDelayError:onError == " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "concatDelayError:onComplete == ");
        }
    });
           

這裡我們依次發送第一個事件,第二個事件,中途抛出異常,調用onComplete(),第三個事件,列印log如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

可以看到,調用onError()方法以後,其他事件概不執行了,那麼我們來看看解決辦法,改用concatDelayError():

Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("第一個事件");
            e.onNext("第二個事件");
            e.onError(new Exception("中途抛出異常"));
            e.onComplete();
        }
    });
    Observable<String> observable2 = Observable.just("第三個事件");

    //中途抛出異常列子:
    Observable.concatArrayDelayError(observable1, observable2).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "concatDelayError:onSubscribe == 訂閱");
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG, "concatDelayError:onNext ==" + s);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "concatDelayError:onError == " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "concatDelayError:onComplete == ");
        }
    });
           

會把錯誤在所有結果合并完後才執行,列印的資料如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

4.megreDelayError()

megreDelayError()方法同上,meger()合并的某個Observable(被觀察者)出現異常,會馬上停止合并,并對訂閱者回到onError()方法,而megreDelayError()會把錯誤在所有結果合并完後才執行:

//建立被觀察者-異常
    Observable<Object> errorObservable = Observable.error(new Exception("抛出異常"));

    //産生0,2,4的事件序列,每隔1秒發送事件,一共發送3次
    Observable<Object> observable1 = Observable.interval(0, 1, TimeUnit.SECONDS)
            .map(new Function<Long, Object>() {
                @Override
                public Object apply(Long aLong) throws Exception {
                    return aLong * 2;
                }
            }).take(3)/*.mergeWith(errorObservable.delay(4, TimeUnit.SECONDS))*/;

    //産生0,10,20的事件序列,每隔1秒發送事件,一共發送3次,
    Observable<Long> observable2 = Observable.interval(1, 1, TimeUnit.SECONDS)
            .map(new Function<Long, Long>() {
                @Override
                public Long apply(Long aLong) throws Exception {
                    return aLong * 10;
                }
            }).take(3);

    Observable.mergeDelayError(observable1, errorObservable, observable2)
            .subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "mergeDelayError:onSubscribe == 訂閱");
        }

        @Override
        public void onNext(Object o) {
            Log.e(TAG, "mergeDelayError:onNext ==" + o);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "mergeDelayError:onError == " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "mergeDelayError:onComplete == ");
        }
    });
           

列印效果如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

5.startWith()與startWithArray()

startWith()與startWithArray()操作符都是在源Observable(被觀察者)送出結果之前,插入指定的某些資料,注意調用順序:先調用最後加入的資料。

startWith()的構造方法:

//傳入單個資料

startWith(T item)

//傳入集合,可将資料加入一個集合中

startWith(Iterable<? extends T> items)

startWithArray()的構造方法:

//可傳入多個參數,可變長參數

startWithArray(T... items)

ArrayList<String> strings = new ArrayList<>();
        strings.add("Array:1");
        Observable.just("一", "二", "三", "四")
                .startWith(strings)//插入單個集合
                .startWith("startWith:2")//插入單個資料
                .startWithArray("startWithArray:3", "startWithArray:4")//插入多個資料
                .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "startWith:onSubscribe == 訂閱");
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG, "startWith:onNext 結果== " + s);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "startWith:onError == " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "startWith:onComplete == ");
        }
    });
           

上面例子中,在just()的前面依次加入的是:集合[Array:1],“startWith:2”,"startWithArray:3", "startWithArray:4",但是列印出來的結果順序卻剛好相反,說明Observable優先發送最後插入的資料,log如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

組合操作符簡單總結一下:

建立類型 作用 備注 使用場景
組合操作符 concat()與concatArray() 組合多個被觀察者發送資料,合并後按照順序執行

差別:組合被觀察者數量:

concat()<=4,concatArray()無限制

合并資料源

聯合處理

Merge()與MergeArray() 組合多個被觀察者發送資料,合并後按照時間順序執行

差別:組合被觀察者數量:

Merge()<=4,MergeArray()無限制

concatDelayError() 将錯誤事件延遲到是以事件執行完後才執行錯誤事件
megreDelayError()
startWith()與startWithArray() 在一個被觀察者發送資料前,追加發送一些資料 調用順序:先追加的後調用,後追加的先調用

四、合并操作符

1.zip()

zip()是把兩個Observable(被觀察者)送出的結果,嚴格按照順序對位進行合并,最後發送給訂閱者,最終合并的數量等于多個Observable數量最少的數量。

注意:下面的Observable1中的4并沒有事件與其合并列印出來,但是事件還是會發送的。

//設定需要傳入的被觀察者資料
    Observable<Integer> observable1 = Observable.just(1, 2, 3, 4);
    Observable<String> observable2 = Observable.just("A", "B", "C");

    //回調apply()方法,并在裡面自定義合并結果的邏輯
    // BiFunction<Integer, String, String>,第一個類型為observable1的參數類型,第二個類型為observable2的參數類型,第三個為合并後的參數類型
    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
        @Override
        public String apply(Integer integer, String str) throws Exception {
            return integer + str;
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "zip:onSubscribe == 訂閱");
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG, "zip:onNext == " + s);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "zip:onError == " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "zip:onComplete == ");
        }
    });
           

上面根據順序的對位合并資料為:1A,2B,3C,4沒有與其合并的事件,列印log如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

2.combineLatest()

combineLatest()是将兩個Observable(被觀察者)産生的結果進行合并,合并的結果将組成一個新的Observable發送給訂閱者。這兩個Observable中任意的一個Observable産生的結果,都與另一個Observable最後的結果,按照一定的規則進行合并。

與zip()類似,差別在于zip()根據個數1對1合并,combineLatest()根據時間點一對一合并。

//産生0,10,20的事件序列,每隔1秒發送事件,一共發送3次
    Observable<Long> observable1 = Observable.interval(0, 1, TimeUnit.SECONDS)
            .map(new Function<Long, Long>() {
                @Override
                public Long apply(Long aLong) throws Exception {
                    return aLong * 10;
                }
            }).take(3);

    //産生0,1,2,3,4的事件序列,起始值為0,一共發送4次,延遲1秒後開始發送,每隔1秒發送事件
    Observable<Long> observable2 = Observable.intervalRange(0, 4, 1, 1, TimeUnit.SECONDS)
            .map(new Function<Long, Long>() {
                @Override
                public Long apply(Long aLong) throws Exception {
                    return aLong * 1;
                }
            });

    Observable.combineLatest(observable1, observable2, new BiFunction<Long, Long, Long>() {
        @Override
        public Long apply(Long o1, Long o2) throws Exception {
            Log.e(TAG, "combineLatest:apply: o1+o2:" + o1 + "+" + o2);
            //observable1的最後的一個資料都與observable2的每一個資料相加
            return o1 + o2;
        }
    }).subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "combineLatest:onSubscribe == 訂閱");
        }

        @Override
        public void onNext(Long aLong) {
            Log.e(TAG, "combineLatest:onNext 合并的結果== " + aLong);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "combineLatest:onError == " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "combineLatest:onComplete == ");
        }
    });
           

Observable1的事件序列為0,10,20;Observable2的事件序列為0,1,2,3, Observable2因為延遲了1秒發送,是以在Observable1執行1秒後即Observable1=10時,Observable2才開始執行事件,否則在Observable2未開始執行事件前,Observable2的預設值與Observable1合并事件,如果Observable1執行完事件後,以Observable1最後的值為準Observable1=20,那麼合并出來的資料為:10+0,20+0,20+1,20+2,20+3,列印log如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

3.combineLatestDelayError()

combineLatest()合并過程中一但出現異常就會立即停止後續合并事件,并且回調onError()方法,combineLatestDelayError()是把錯誤放到所有結果都合并完成之後才執行。

combineLatestDelayError()類似concatDelayError()、mergeDelayError(),這裡就不在舉例子了。

4.reduce()

reduce()把觀察者需要發送的事件聚合成一個事件并且發送。本質上都是前兩個資料聚合,再與後一個資料聚合,依次類推。

Observable.just(1, 2, 3, 4, 5).
    reduce(new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            Log.e(TAG, "reduce:accept 計算結果== " + integer + "*" + integer2);
            //按先後順序,兩個事件聚合處理後 ,将結果再與下一事件聚合處理,依次類推
            return integer * integer2;
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "reduce:accept 合并的結果== " + integer);
        }
    });
           

比如上面的1, 2, 3, 4, 5共5個事件,1與2聚合後生成新的Observable(被觀察者),新的Observable與下一個數3再聚合,依次類推,是以資料聚合完成,列印資料如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

5.collect()

将Observable(被觀察者)發送的資料收集到一個容器中(資料結構)。Observable将事件隊列分别裝入一個準備好的容器中,然後在新的Observable将該容器發送給訂閱者。

//第一個參數:聲明容器的類型,第二個參數:處理資料的邏輯,加入容器中
   Observable.just(1, 2, 3, 4, 5).collect(new Callable<ArrayList<Integer>>() {
        @Override
        public ArrayList<Integer> call() throws Exception {
            return new ArrayList<>();
        }
    }, new BiConsumer<ArrayList<Integer>, Integer>() {
        @Override
        public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
            Log.e(TAG, "reduce:collect 加入容器的資料== " + integer);
            list.add(integer);
        }
    }).subscribe(new Consumer<ArrayList<Integer>>() {
        @Override
        public void accept(ArrayList<Integer> integers) throws Exception {
            Log.e(TAG, "reduce:collect 最後結果== " + integers);
        }
    });
           

Callable方法聲明一個容器,Callable實作的方法将資料加入容器中,列印資料如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

合并操作符的簡單總結:

建立類型 作用 備注 使用場景
合并操作符 zip() 合并多個被觀察者發送的事件,生成一個新的事件序列。

嚴格按照事件序列進行合并

最後Observable的資料=多個Observable中數量最少的

合并資料源

聯合處理

combineLatest() 将兩個Observable産生的結果進行合并,合并新的Observable發送給訂閱者 當其中一個Observable發送資料,都與另一個Observable最後發送的資料結合,類似zip(),不同的是按照時間點合并
combineLatestDelayError() 将錯誤事件延遲到是以事件執行完後才執行錯誤事件
reduce() 把觀察者需要發送的事件聚合成一個事件并且發送。 前兩個被觀察者聚合成新的被觀察者,再與下一資料聚合,一次類推
collect() 将被觀察者發送的資料放到一個容器中處理

五、統計發送事件的數量

1.count()

count()方法是統計Observable(被觀察者)發送的事件的數量。

Observable.just(1, 2, 3, 4, 5)
             .count()
             .subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long l) throws Exception {
            Log.e(TAG, "reduce:accept 事件數== " + l);
        }
    });
           

這個方法比較簡單,just()發送了1, 2, 3, 4, 5共5個事件,列印log如下:

RxJava2最全面、最詳細的講解(二)一、概述二、變換操作符三、組合操作符四、合并操作符五、統計發送事件的數量

至此,本文結束,如果還想了解過濾操作符和其他功能性操作符的可以參考下一篇文章:RxJava2最全面、最詳細的用法講解(三).

源碼位址:https://github.com/FollowExcellence/Rxjava_Retrofit

點關注,不迷路

好了各位,以上就是這篇文章的全部内容了,能看到這裡的人呀,都是人才。

我是suming,感謝各位的支援和認可,您的點贊、評論、收藏【一鍵三連】就是我創作的最大動力,我們下篇文章見!

如果本篇部落格有任何錯誤,請批評指教,不勝感激 !

要想成為一個優秀的安卓開發者,這裡有必須要掌握的知識架構,一步一步朝着自己的夢想前進!Keep Moving!

相關文章:

Retrofit2詳解和使用(一)

  • Retrofit2的介紹和簡單使用
OKHttp3的使用和詳解
  • OKHttp3的用法介紹和解析
OKHttp3源碼詳解
  • 從源碼角度解釋OKHttp3的關鍵流程和重要操作
RxJava2詳解(一)
  • 詳細介紹了RxJava的使用(基本建立、快速建立、延遲建立等操作符)
RxJava2詳解(二)
  • RxJava轉換、組合、合并等操作符的使用
RxJava2詳解(三)
  • 詳細介紹了RxJava的使用
RxJava2詳解(四)
  • RxJava過濾、其他操作符的使用
上述幾篇都是android開發必須掌握的,後續會完善其他部分!