天天看點

RxJava過濾操作符

概述

過濾操作符用于過濾和選擇Observable發射的資料序列,讓Observable隻傳回滿足我們條件的資料。

Debounce

Debounce會過濾掉發射速率過快的資料項,相當于限流,但是需要注意的是debounce過濾掉的資料會被丢棄掉。

如果在一個指定的時間間隔過去了仍舊沒有發射一個,那麼它将發射最後的那個。

RxJava将這個操作符實作為throttleWithTimeout和debounce.

簡單粗暴的說法:當N個結點發生的時間太靠近(即發生的時間差小于設定的值T),debounce就會自動過濾掉前N-1個結點。

場景:比如EidtText輸入聯想,可以使用debounce減少頻繁的網絡請求。避免每輸入(删除)一個字就做一次聯想。

和switchMap結合使用效果更佳,一個用于取消上次請求,一個用于節流。

throttleWithTimeOut

通過時間來限流,源Observable每次發射出來一個資料後就會進行計時,

如果在設定好的時間結束前源Observable有新的資料發射出來,這個資料就會被丢棄,同時重新開始計時。

如果每次都是在計時結束前發射資料,那麼這個限流就會走向極端:隻會發射最後一個資料。

預設在computation排程器上執行

執行個體:

public void throttleWithTimeout() {
        Subscription subscribe = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = ; i < ; i++) {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext(i);
                    }
                    int sleep = ;
                    if (i %  == ) {
                        sleep = ;
                    }
                    try {
                        Thread.sleep(sleep);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.computation())
                .throttleWithTimeout(, TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(i -> logger("throttleWithTimeout:" + i));
        addSubscription(subscribe);
           

列印結果:

throttleWithTimeout:0

throttleWithTimeout:3

throttleWithTimeout:6

throttleWithTimeout:9

結果分析:每隔100毫秒發射一個資料,當要發射的資料是3的倍數的時候,下一個資料就延遲到300毫秒再發射

即:0 -300ms-> 1 -100ms-> 2 -100ms-> 3 ..

設定過濾時間為200ms,則1,2都被過濾丢棄。

deounce

不僅可以使用時間來進行過濾,還可以根據一個函數來進行限流。

這個函數的傳回值是一個臨時Observable,

如果源Observable在發射一個新的資料的時候,

上一個資料根據函數所生成的臨時Observable還沒有結束,那麼上一個資料就會被過濾掉。

值得注意的是,如果源Observable産生的最後一個結果後在規定的時間間隔内調用了onCompleted,

那麼通過debounce操作符也會把這個結果送出給訂閱者。

public void debounce() {
        Observable.just(, , , , , , , , ).debounce(integer -> {
            return Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                //如果%2==0,則發射資料并調用了onCompleted結束,則不會被丢棄
                    if (integer %  ==  && !subscriber.isUnsubscribed()) {
                        subscriber.onNext(integer);
                        subscriber.onCompleted();
                    }
                }
            });
        })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        logger("debounce:" + integer);
                    }
                });
    }
           

列印結果:

debounce:2

debounce:4

debounce:6

debounce:8

debounce:9

由結果可知,9的列印證明預設調用了onCompleted

使用案例: 使用RxJava 提升使用者體驗

Distinct

Distinct操作符的用處就是用來去重,隻允許還沒有發射過的資料項通過

distinctUntilChanged和這個函數功能類似,是去掉連續重複的資料

執行個體:

public void distinct(){
        Observable.just(, , , , , )
                .distinct()
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onNext(Integer item) {
                        logger("Next: " + item);
                    }

                    @Override
                    public void onError(Throwable error) {
                        logger("Error: " + error.getMessage());
                    }

                    @Override
                    public void onCompleted() {
                        logger("Sequence complete.");
                    }
                });
    }
           

列印結果:

Next: 1

Next: 2

Next: 3

Sequence complete.

public void distinctUntilChangedObserver(){
        Observable.just(, , , , , , , , )
                .distinctUntilChanged()
                .subscribe(integer -> logger("UntilChanged:"+integer));
    }
           

列印結果:

UntilChanged: 1

UntilChanged: 2

UntilChanged: 3

UntilChanged: 1

UntilChanged: 2

UntilChanged: 3

ElementAt

從字面意思來看,ElementAt隻會傳回指定位置的資料。其相關方法有elementAtOrDefault(int,T),可以允許預設值

執行個體:

public void elementAt(){
        Observable.just(, , , , , ).elementAt()
                .subscribe(i -> logger("elementAt:" + i));
    }
           

列印結果:

elementAt:2

Filter

允許傳入一個Func,通過的資料才會被發射。

特殊形式ofType(Class):Observable隻傳回指定類型的資料。

執行個體:

public void filter() {
        Observable.just(, , , , )
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer item) {
                        return (item < );
                    }
                }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                logger("Next: " + item);
            }

            @Override
            public void onError(Throwable error) {
                logger("Error: " + error.getMessage());
            }

            @Override
            public void onCompleted() {
                logger("Sequence complete.");
            }
        });
    }
           

列印結果:

Next: 1

Next: 2

Next: 3

Sequence complete.

First、Last

First傳回滿足條件的第一條資料.被實作為first,firstOrDefault和takeFirst。

Last操作符隻傳回最後一條滿足條件的資料,被實作為last和lastOrDefault。

如果擷取不到資料,First和Last會抛出NoSuchElementException異常

takeFist會傳回一個空的Observable(不調用onNext()但是會調用onCompleted)。

First和Last 都沒有實作為一個傳回Observable的過濾操作符,

而是一個在當時就發射原始Observable指定資料項的阻塞函數。如果需要的是過濾操作符,

可以使用Take(1)、ElementAt(0)或者TakeLast(1),TakeLast(Func)

如果不想立即傳回Observable,而是需要阻塞并傳回值,可以使用BlockingObservable,

通過Observable.toBlocking或者BlockingObservable.from方法來轉化。

執行個體:

public void first() {
        BlockingObservable<Integer> integerBlockingObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = ; i < ; i++) {
                    try {
                        Thread.sleep();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (!subscriber.isUnsubscribed()) {
                        logger("onNext:" + i);
                        subscriber.onNext(i);
                    }
                }
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            }
        }).toBlocking();


        Integer first = integerBlockingObservable.first(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer > ;
            }
        });

        logger(first);
    }
           

2s後列印了:<– 阻塞了,知道大于3的資料發射出來

onNext:0

onNext:1

onNext:2

onNext:3

onNext:4

4

takeLast執行個體:

public void takeLast() {
        Observable.just(, , , , , , ).takeLast()
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onNext(Integer item) {
                        logger("Next: " + item);
                    }

                    @Override
                    public void onError(Throwable error) {
                        logger("Error: " + error.getMessage());
                    }

                    @Override
                    public void onCompleted() {
                        logger("Sequence complete.");
                    }
                });
    }
           

列印結果:

Next: 6

Next: 7

Sequence complete.

Skip、Take

Skip操作符将源Observable發射的資料過濾掉前n項,而Take操作符則隻取前n項

相關操作符:TakeLast:發射Observable發射的最後N項資料,

takeLastBuffer:最後N項資料收集到list再發射

SkipLast:忽略Observable’發射的後N項資料,隻保留前面的資料。

skipLast操作符送出滿足條件的結果給訂閱者存在延遲效果

執行個體:

public void skip(){
        Observable.just(, , , , , ).skip().subscribe(i -> logger("Skip:" + i));
    }

    public void take(){
        Observable.just(, , , , , ).take().subscribe(i -> logger("Take:" + i));
    }
           

列印結果:

Skip:2

Skip:3

Skip:4

Skip:5

Take:0

Take:1

Sample、ThrottleFirst

Sample操作符會定時地發射源Observable最近發射的資料,其他的都會被過濾掉。

RxJava将這個操作符實作為sample和throttleLast。

而ThrottleFirst操作符則會定期發射這個時間段裡源Observable發射的第一個資料

這兩個操作符都在computation排程器上執行。

執行個體:

private Observable<Integer> createObserver() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = ; i < ; i++) {
                    try {
                        Thread.sleep();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        });
    }

    public void sample() {
            createObserver().sample(, TimeUnit.MILLISECONDS)
            .subscribe(i -> logger("sample:" + i));
        }

    public void throttleFirst() {
            createObserver().throttleFirst(, TimeUnit.MILLISECONDS)
            .subscribe(i -> logger("throttleFirst:" + i));
        }
           

列印結果:

sample:3

sample:8

sample:13

sample:18

throttleFirst:0

throttleFirst:5

throttleFirst:10

throttleFirst:15

其中sample操作符會每隔5個數字發射出一個資料來,

而throttleFirst則會每隔5個資料發射第一個資料。

ThrottleFirst 與RxBinding結合

@OnClick(R.id.btn_click)
    public void btnClick(){
        RxView.clicks(btnClick)
                .throttleFirst(, TimeUnit.SECONDS)
                .subscribe(new Action1<Void>() {
                    @Override
                    public void call(Void aVoid) {
                        Toast.makeText(RxBindingButtonClick.this,"Click",Toast.LENGTH_SHORT).show();
                    }
                });
    }
           
在1s内隻響應一次點選,避免了重複點選的問題

ignoreElements

ignoreElements操作符忽略所有源Observable産生的結果,隻把Observable的onCompleted和onError事件通知給訂閱者。

ignoreElements操作符适用于不太關心Observable産生的結果,隻是在Observable結束時(onCompleted)或者出現錯誤時能夠收到通知。

執行個體:

public void ignoreElements(){
        Observable.just(,,,,,,,).ignoreElements()
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onNext(Integer item) {
                        logger("Next: " + item);
                    }

                    @Override
                    public void onError(Throwable error) {
                        logger("Error: " + error.getMessage());
                    }

                    @Override
                    public void onCompleted() {
                        logger("Sequence complete.");
                    }
                });
    }
           

列印結果:

Sequence complete.

使用Demo:Filtering.java

參考:

ReactiveX文檔中文翻譯

RxJava操作符(三)Filtering

Android RxJava使用介紹(三) RxJava的操作符