天天看點

RxJava操作符(四)----功能操作符

RxJava各類型操作符詳解如下:

RxJava操作符彙總

RxJava操作符(一) —-建立操作符

RxJava操作符(二)—-轉換操作符

RxJava操作符(三)—-合并操作符

RxJava操作符(四)—-功能操作符

RxJava操作符(五) —-過濾操作符

RxJava操作符(六)—-條件操作符

功能操作符: 輔助被觀察者(Observable) 發送事件時實作一些功能性需求,如錯誤處理,線程排程

1、subscribe() 操作符

/**
     * ==================subscribe 操作符===========================
     *
     *  連接配接被觀察者和觀察者
     */
    public static void subscribe() {

        //建立被觀察者
        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                emitter.onNext("事件");
            }
        });

        //建立觀察者
        Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG + "subscribe", "開始連接配接");
            }

            @Override
            public void onNext(Object o) {
                Log.d(TAG + "subscribe", "收到事件");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

        //通過subscribe 進行 被觀察者(Observable)與觀察者(Observer)的連接配接
        observable.subscribe(observer);

    }
           

輸出如下:

RxJava操作符(四)----功能操作符

2、delay() 操作符

/**
     * ==================delay 操作符=======================================
     * 
     * 延遲發送事件
     * 
     * delay有多個重載方法:
     * 
     * delay(long delay,TimeUnit unit) :指定延遲時間。 參數一:時間 ; 參數二:時間機關
     * 
     * delay(long delay, TimeUnit unit, Scheduler scheduler)  指定延遲時間&線程排程器。參數一:時間 ; 參數二:時間機關;參數三: 線程排程器
     * 
     * delay(long delay, TimeUnit unit, boolean delayError)  指定延遲時間&線程排程器。參數一:時間 ; 參數二:時間機關;參數三: 是否錯誤延遲
     * 
     * delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError)  指定延遲時間&線程排程器&錯誤延遲。參數一:時間 ; 參數二:時間機關;
     * 參數三: 線程排程器; 參數四:是否錯誤延遲(若中間發生錯誤,是否如常執行,執行完在執行onError())
     */
    public static void delay() {

        Observable
                .just(, )
                .delay(, TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "delay", String.valueOf(integer));
                    }
                });
    }
           

輸出如下:

RxJava操作符(四)----功能操作符

3、do 系列操作符

/**
     * ========================do 系列操作符 =======================================
     * 
     * 在事件發送&接收的整個周期過程中進行操作。
     * 
     * 如發送事件前的操作,發送事件後的回調請求
     * 
     * do系列操作符包含以下:
     * 
     * doOnEach() :當Observable每發送一次事件就會調用一次(包含onNext(),onError(),onComplete())
     * doOnNext(): 執行 onNext()前調用
     * doAfterNext(): 執行onNext()後調用
     * doOnComplete():執行onComplete()前調用
     * doOnError():執行 onError()前調用
     * doOnTerminate(): 執行終止(無論正常發送完畢/異常終止)
     * doFinally(): 最後執行
     * doOnSubscribe() :觀察者訂閱是調用
     * doOnUnScbscribe(): 觀察者取消訂閱時調用
     */

    public static void dos() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext();
                        emitter.onNext();
                        emitter.onNext();
                        emitter.onError(new NullPointerException());
                    }
                })
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        Log.d(TAG + "doOnEach", "doOnEach:  " + String.valueOf(integerNotification.getValue()));
                    }
                })
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "doOnNext", "doOnNext:  " + String.valueOf(integer));
                    }
                })
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "doAfterNext", "doAfterNext:  " + String.valueOf(integer));
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG + "doOnComplete", "doOnComplete");
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG + "doOnError", "doOnError");
                    }
                })
                .doOnTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG + "doOnTerminate", "doOnTerminate");
                    }
                })
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG + "doAfterTermi", "doAfterTerminate");
                    }
                })
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d(TAG + "doOnSubscribe", "doOnSubscribe");
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG + "doFinally", "doFinally");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "收到的資料:  " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

    }
           

輸出如下:

RxJava操作符(四)----功能操作符

4、onErrorReturn() 操作符

/** ====================onErrorReturn() 操作符 ======================
     * 
     * 可以捕獲錯誤。遇到錯誤時,發送一個特殊事件,并且正常終止.注意後面的事件不會再發送
     */
    public static void onErrorReturn() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext();
                        emitter.onNext();
                        emitter.onError(new Throwable("Throwable"));
                        emitter.onNext();

                    }
                })
                .onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(Throwable throwable) throws Exception {
                        Log.e(TAG, "發生了錯誤:  " + throwable.getMessage());
                        return ;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, String.valueOf(integer));
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

    }
           

輸出如下:

RxJava操作符(四)----功能操作符

5、onExceptionResumeNext()/onErrorResumeNext() 操作符

/**
     * ====================onExceptionResumeNext()/onErrorResumeNext() 操作符 ======================
     * 
     * 遇到錯誤時發送一個新的Observable 。并且正常終止.注意原Observable後面的事件不會再發送
     * 
     * 如果捕獲Exception的話使用onExceptionResumeNext() ,捕獲錯誤的用onErrorResumeNext()
     */
    public static void onExceptionResumeNext() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext();
                emitter.onNext();
                emitter.onError(new NullPointerException("NullPointerException"));
                emitter.onNext();
            }
        }).onExceptionResumeNext(new Observable<Integer>() {
            @Override
            protected void subscribeActual(Observer<? super Integer> observer) {
                observer.onNext();
                observer.onNext();

            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, String.valueOf(integer));
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
           

輸出如下:

RxJava操作符(四)----功能操作符

6、retry() 操作符

/**
     * ====================retry() 操作符 ======================
     * 
     * 作用是:出現錯誤時,讓被觀察者重新發送資料
     * 注:若發送錯誤,則一直重新發送
     * 
     * 有幾個重載方法:
     * retry() : 出現錯誤時,讓被觀察者重新發送資料。若錯誤一直發生,則一直重新發送
     * 
     * retry(long time):與retry不同的書,若錯誤一直發生,被觀察者則一直重新發送資料,但這持續重新發送有次數限制
     * 
     * retry(Predicate predicate) :  出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料
     * 
     * retry(new BiPredicate<Integer, Throwable>):出現錯誤時,根據指定邏輯(可以捕獲重發的次數和發生的錯誤)決定是否讓被觀察者重新發送資料
     * 
     * retry(long time,Predicate predicate) : 出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料。并且有持續重發的次數限制
     */
    public static void retry() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext();
                        emitter.onNext();
                        emitter.onError(new Throwable("發生錯誤了"));
                        emitter.onNext();
                    }
                })
                .retry(new BiPredicate<Integer, Throwable>() {
                    @Override
                    public boolean test(Integer integer, Throwable throwable) throws Exception {

                        // interger 為重試次數 ,throwable 為捕獲到的異常

                        Log.e(TAG + "retry", throwable.getMessage());
                        Log.e(TAG + "integer", "重試次數: " + integer);

                        //return true : 重新發送請求(若持續遇到錯誤,就持續重新發送)
                        //return false :    不重新發送資料 并且調用觀察者的onError()方法結束

                        if (integer > )
                            return false;
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e(TAG + "retry", String.valueOf(integer));
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }
           

輸出如下:

RxJava操作符(四)----功能操作符

7、retryUntil() 操作符

/**
     * ===================retryUntil() 操作符============================
     * 
     * 發送事件遇到錯誤,指定規則是否重新發送。retry(Predicate predicate)。
     * 
     *  return true : 不重新發送請求,并且調用觀察者的onError()方法結束
     *  return false : 重新發送資料(若持續遇到錯誤,就持續重新發送)
     */
    public static void retryUntil() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext();
                        emitter.onNext();
                        emitter.onError(new Throwable("發生錯誤了"));
                        emitter.onNext();
                    }
                })
                .retryUntil(new BooleanSupplier() {
                    @Override
                    public boolean getAsBoolean() throws Exception {

                         //return true : 不重新發送請求,并且調用觀察者的onError()方法結束
                        // return false : 重新發送資料(若持續遇到錯誤,就持續重新發送)
                        return false;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e(TAG + "retryUntil", String.valueOf(integer));
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG + "retryUntil", "onComplete");
                    }
                });
    }
           

輸出如下:

RxJava操作符(四)----功能操作符

8、 retryWhen() 操作符

/**
     * ===================retryWhen() 操作符============================
     * 
     * 遇到錯誤時,将發生的錯誤傳遞給一個新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable) &  發送事件
     */
    public static void retryWhen() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext();
                        emitter.onNext();
                        emitter.onError(new Throwable("發送了錯誤"));
                        emitter.onNext();
                    }
                })
                //遇到Error時會回調
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(Throwable throwable) throws Exception {

                                //1、若傳回的Observable發送的事件 = Error ,則原始的Observable則不重新發送事件。該異常資訊可在觀察者的onError中獲得
                                //return Observable.error(throwable);

                                //2、若傳回的Observable發送的事件= Next事件(和next的内容無關),則原始的Observable重新發送事件(若持續遇到錯誤,則持續發送)
                                return Observable.just(); //僅僅是作為一個觸發重新訂閱原被觀察者的通知,什麼資料并不重要,隻有不是onComplete/onError事件
                            }
                        });

                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG + "retryWhen", String.valueOf(integer));
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG + "retryWhen", e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG + "retryWhen", "onComplete");
                    }
                });
    }
           

輸出如下:

RxJava操作符(四)----功能操作符

9、repeat() 操作符

/**
     * ===============repeat() 操作符==============
     * 
     * repeat操作符的作用是重複發射 observable的資料序列,可以使無限次也可以是指定次數.不傳時為重複無限次
     */
    public static void repeat() {

        Observable
                .just(, , )
                .repeat()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "repeat", String.valueOf(integer));
                    }
                });
    }
           

輸出如下:

RxJava操作符(四)----功能操作符

10、repeatWhen() 操作符

/**
     *  ===============repeatWhen() 操作符==============
     *
     * 将原始 Observable 停止發送事件的辨別(Complete() / Error())轉換成1個 Object 類型資料傳遞給1個新被觀察者(Observable)
     * ,以此決定是否重新訂閱 & 發送原來的 Observable
     */
    public static void repeatWhen() {

        Observable
                .just(, , )
                .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                        return  objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(Object o) throws Exception {

                                //若新被觀察者(Observable)傳回1個Complete()/  Error()事件,則不重新訂閱 & 發送原來的 Observable
                                //Observable.empty() = 發送Complete事件,但不會回調觀察者的onComplete()
                                return Observable.empty();

                                // return Observable.error(new Throwable("不再重新訂閱事件"));
                                // 傳回Error事件 = 回調onError()事件,并接收傳過去的錯誤資訊。

                                // 情況2:若新被觀察者(Observable)傳回其餘事件,則重新訂閱 & 發送原來的 Observable
                                // return Observable.just(1);
                                // 僅僅是作為1個觸發重新訂閱被觀察者的通知,發送的是什麼資料并不重要,隻要不是Complete() /  Error()事件
                            }
                        });
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }
           

輸出如下:

RxJava操作符(四)----功能操作符

11、debounce() 操作符

/**
     * ===============debounce() 操作符==============
     * <p>
     * 一定的時間内沒有操作就會發送事件(隻會發送最後一次操作的事件)。
     * <p>
     * 以下的例子: 發送5個事件,每個事件間隔1秒。但是debounce限定了2秒内沒有任何操作才會真正發送事件。是以隻有最後一次滿足條件,隻能接收到事件 5
     */
    public static void debounce() {

        Observable.intervalRange(, , , , TimeUnit.SECONDS)
                .debounce(, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "debounce", String.valueOf(aLong));
                    }
                });

    }
           

輸出如下:

RxJava操作符(四)----功能操作符

12、subscribeOn / ObserverOn 操作符

/**
     * ===============subscribeOn() 操作符==============
     * ===============observerOn() 操作符==============
     * <p>
     * <p>
     * subscribeOn : 發送事件的線程
     * observerOn: 接收事件的線程
     * <p>
     * 線程排程器:
     * Schedulers.io(): 代表io操作的線程,通常用于網絡,讀寫檔案等io密集型的操作
     * Schedulers.compucation(): 代表CPU計算密集型的操作,例如需要大量計算的操作
     * Schedulers.newThread(): 代表一個正常的新線程
     * AndroidSchedulers。mainThread(): 代表Android的主線程
     */
    public static void subscribeOn_observerOn() {

        Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("事件");
                        Log.d(TAG + "subscribeOn_ObserverOn", "發送事件:" + Thread.currentThread().getName());
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG + "subscribeOn_ObserverOn", "接收事件:   " + Thread.currentThread().getName());
                    }
                });

    }
           

輸出如下:

RxJava操作符(四)----功能操作符

上面代碼位址

RxJava各類型操作符詳解如下:

RxJava操作符彙總

RxJava操作符(一) —-建立操作符

RxJava操作符(二)—-轉換操作符

RxJava操作符(三)—-合并操作符

RxJava操作符(四)—-功能操作符

RxJava操作符(五) —-過濾操作符

RxJava操作符(六)—-條件操作符