天天看點

RxJava組合操作符

概述

RxJava 中的組合函數可以同時處理多個Observables來建立我們想要的Observable。組合操作符包含如下幾種

Merge

merge()将2-9個Observables合并到一個Observable中進行發射。

Merge可能會讓合并的Observables發射的資料交錯(如果想要沒有交錯,可以使用concat操作符)。

任何一個Observable發出onError的時候,onError通知會被立即傳遞給觀察者,而且會終止合并後的Observable。

如果想讓onError發生在合并後的Observable所有的資料發射完成之後,可以使用MergeDelayError

除了傳遞多個Observable,merge還可以傳遞一個Observable清單List,數組,

甚至是一個發射Observable序列的Observable,merge将合并它們的輸出作為單個Observable的輸出:

執行個體:

public void merge() {
        Observable<Integer> odds = Observable.just(, , ).subscribeOn(Schedulers.io());
        Observable<Integer> evens = Observable.just(, , );

        Observable.merge(odds, evens)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onNext(Integer item) {
                        logger("Next: " + item);
                    }

                    @Override
                    public void onError(Throwable error) {
                        logger("Error: " + error.getMessage());
                    }

                    @Override
                    public void onCompleted() {
                        logger("Sequence complete.");
                    }
                });
    }
           

列印結果:

Next: 
Next: 
Next: 
Next: 
Next: 
Next: 
Sequence complete.
           

Zip

Zip操作符将2-9個Observable發射的資料按 順序 結合兩個或多個Observables發射的資料項,每個資料隻能組合一次,而且都是有序的。

它隻發射與發射資料項最少的那個Observable一樣多的資料。Rxjava實作了zip和zipWith兩個操作符.

具體的結合方式由 第三個參數決定。

zip的最後一個參數接受每個Observable發射的一項資料,傳回被壓縮後的資料,

它可以接受一到九個參數:一個Observable序列,或者一些發射Observable的Observables。

執行個體:

  • zipwith:
private Observable<String> createObserver(int index) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = ; i <= index; i++) {
                    logger("emitted:" + index + "-" + i);
                    subscriber.onNext(index + "-" + i);
                    try {
                        Thread.sleep();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).subscribeOn(Schedulers.newThread());
    }

    public void zipWith() {
        createObserver().zipWith(createObserver(), new Func2<String, String, String>() {
            @Override
            public String call(String s, String s2) {
                return s + "-" + s2;
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                logger("zipWith:" + s + "\n");
            }
        });
    }
           

列印結果:

emitted:2-1

emitted:3-1

zipWith:2-1-3-1

emitted:2-2

emitted:3-2

zipWith:2-2-3-2

emitted:3-3

  • zip:
public void zip() {
        Observable
                .zip(createObserver(), createObserver(), createObserver(), new Func3<String, String, String, String>() {
                    @Override
                    public String call(String s, String s2, String s3) {
                        return s + "-" + s2 + "-" + s3;
                    }
                }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                logger("zip:" + s + "\n");
            }
        });
    }
           

列印結果:

emitted:2-1

emitted:3-1

emitted:4-1

zip:2-1-3-1-4-1

emitted:2-2

emitted:3-2

emitted:4-2

zip:2-2-3-2-4-2

emitted:3-3

emitted:4-3

emitted:4-4

最終都發射出了兩個資料,因為createObserver(2)所建立的Observable隻會發射兩個資料,是以其他Observable多餘發射的資料都被丢棄了。

Join

RxJava的join()函數基于時間視窗将兩個Observables發射的資料結合在一起,

每個Observable在自己的時間視窗内都有有效的,都可以拿來組合。

Rxjava還實作了groupJoin,基本和join相同,隻是最後組合函數的參數不同。

Join(Observable,Func1,Func1,Func2) 參數說明:

  • 源Observable所要組合的目标Observable
  • 一個函數,接收從源Observable發射來的資料,并傳回一個Observable,這個Observable的生命周期決定了源Observable發射出來資料的有效期
  • 一個函數,接收從目标Observable發射來的資料,并傳回一個Observable,這個Observable的生命周期決定了目标Observable發射出來資料的有效期
  • 一個函數,接收從源Observable和目标Observable發射來的資料,并傳回最終組合完的資料。

執行個體:

private Observable<String> createJoinObserver() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = ; i < ; i++) {
                    subscriber.onNext("Right-" + i);
                    try {
                        Thread.sleep();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).subscribeOn(Schedulers.newThread());
    }

    public void join() {
        Observable.just("Left-").join(createJoinObserver(),
                integer -> Observable.timer(, TimeUnit.MILLISECONDS),
                integer -> Observable.timer(, TimeUnit.MILLISECONDS),
                (i, j) -> i + j
        ).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                logger("join:" + s + "\n");
            }
        });
    }
           

列印結果:

join:Left-Right-1 –>隔1s

join:Left-Right-2 –>隔1s

join:Left-Right-3

groupJoin(Observable,Func1,Func1,Func2):第三個函數的參數:原Observable發射來的資料,要組合的目标Observable

執行個體:

public void groupJoin() {
        Observable.just("Left-")
                .groupJoin(createJoinObserver(),
                        s -> Observable.timer(, TimeUnit.MILLISECONDS),
                        s -> Observable.timer(, TimeUnit.MILLISECONDS),
                        (s, stringObservable) -> stringObservable.map(str -> s + str))
                .subscribe(new Action1<Observable<String>>() {
                    @Override
                    public void call(Observable<String> stringObservable) {
                        stringObservable.subscribe(new Action1<String>() {
                            @Override
                            public void call(String s) {
                                logger("groupJoin:" + s + "\n");
                            }
                        });
                    }
                });
    }
           

列印結果:

groupJoin:Left-Right-1 –>隔1s

groupJoin:Left-Right-2 –>隔1s

groupJoin:Left-Right-3

兩個結果一緻。原Observable發射了一個有效期為3s的資料,目标Observable每1s發射一個有效期為2s的資料,總共4個。

但是最終的組合結果也隻有3個資料。

combineLatest

combineLatest接受二到九個Observable作為參數,或者單個Observables清單作為參數。它預設不在任何特定的排程器上執行。

RxJava的combineLatest()函數有點像zip()函數的特殊形式,zip()作用于最近未打包的兩個Observables。相反,combineLatest()作用于最近發射的資料項.

zip 中隻有當原始的Observable中的每一個都發射了一條資料時zip才發射資料。

CombineLatest則在原始的Observable中任意一個發射了資料時發射一條資料。

使用執行個體:

public void combineLatest() {
        Observable.combineLatest(createCombineLatest(), createCombineLatest(), (num1, num2) -> {
            logger("left:" + num1 + " right:" + num2);
            return num1 + num2;
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                logger("combineList:" + integer);
            }
        });
    }
           

列印結果:

left:1 right:2

CombineLatest:3

left:2 right:2

CombineLatest:4

left:2 right:4

CombineLatest:6

Rxjava實作CombineLast操作符可以讓我們直接将組裝的Observable作為參數傳值,也可以将所有的Observable裝在一個List裡面穿進去。

使用執行個體:

List<Observable<Integer>> list = new ArrayList<>();

    public void combineList() {
        for (int i = ; i < ; i++) {
            list.add(createCombineLatest(i));
        }
        Observable.combineLatest(list, args -> {
            int temp = ;
            for (Object i : args) {
                logger(i);
                temp += (Integer) i;
            }
            return temp;
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                logger("CombineLatest:" + integer);
            }
        });
    }
           

列印結果:

1

2

combineList:3

2

2

combineList:4

2

4

combineList:6

Switch

将一個發射多個Observables的Observable轉換成另一個單獨的Observable,後者發射那些Observables最近發射的資料項

switch操作符在Rxjava上的實作為switchOnNext.

用來将一個發射多個小Observable的源Observable轉化為一個Observable,

然後發射這多個小Observable所發射的資料。

當原始Observable發射了一個新的Observable時(不是這個新的Observable發射了一條資料時),它将取消訂閱之前的那個Observable。

這意味着,在後來那個Observable産生之後到它開始發射資料之前的這段時間裡,前一個Observable發射的資料将被丢棄

執行個體代碼:

private Observable<String> createSwitchObserver(int index) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = ; i < ; i++) {
                    subscriber.onNext(index + "-" + i);
                    try {
                        Thread.sleep();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).subscribeOn(Schedulers.newThread());
    }

    public void switchObserver()

    {
        Observable.switchOnNext(Observable.create(
                new Observable.OnSubscribe<Observable<String>>() {
                    @Override
                    public void call(Subscriber<? super Observable<String>> subscriber) {
                        for (int i = ; i < ; i++) {
                            subscriber.onNext(createSwitchObserver(i));
                            try {
                                Thread.sleep();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
        ))
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        logger("switch:" + s);
                    }
                });
    }
           

列印結果:

switch:1-1

switch:1-2

switch:2-1

switch:2-2

switch:2-3

switch:2-4

從列印結果看,第一個小Observable隻發射出了兩個資料,

第二個小Observable就被源Observable發射出來了,是以 第一個 接下來的兩個資料被丢棄。

StartWith

StartWith操作符會在源Observable發射的資料前面插上一些資料。

startWith可接受一個Iterable或者多個Observable作為函數的參數。

執行個體代碼:

public void startWith() {
        Observable.just(, , ).startWith(-, )
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        logger("startWith:" + integer);
                    }
                });
    }
           

列印結果:

startWith:-1

startWith:0

startWith:1

startWith:2

startWith:3

可以看到-1和0插入到發射序列的前面

使用Demo:Combining.java

參考:ReactiveX中文翻譯文檔

RxJava部分操作符介紹

RxJava開發精要6 - 組合Observables