RxJava各類型操作符詳解如下:
RxJava操作符彙總
RxJava操作符(一) —-建立操作符
RxJava操作符(二)—-轉換操作符
RxJava操作符(三)—-合并操作符
RxJava操作符(四)—-功能操作符
RxJava操作符(五) —-過濾操作符
RxJava操作符(六)—-條件操作符
功能操作符: 輔助被觀察者(Observable) 發送事件時實作一些功能性需求,如錯誤處理,線程排程
1、subscribe() 操作符
/**
* ==================subscribe 操作符===========================
*
* 連接配接被觀察者和觀察者
*/
public static void subscribe() {
//建立被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("事件");
}
});
//建立觀察者
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG + "subscribe", "開始連接配接");
}
@Override
public void onNext(Object o) {
Log.d(TAG + "subscribe", "收到事件");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//通過subscribe 進行 被觀察者(Observable)與觀察者(Observer)的連接配接
observable.subscribe(observer);
}
輸出如下:
2、delay() 操作符
/**
* ==================delay 操作符=======================================
*
* 延遲發送事件
*
* delay有多個重載方法:
*
* delay(long delay,TimeUnit unit) :指定延遲時間。 參數一:時間 ; 參數二:時間機關
*
* delay(long delay, TimeUnit unit, Scheduler scheduler) 指定延遲時間&線程排程器。參數一:時間 ; 參數二:時間機關;參數三: 線程排程器
*
* delay(long delay, TimeUnit unit, boolean delayError) 指定延遲時間&線程排程器。參數一:時間 ; 參數二:時間機關;參數三: 是否錯誤延遲
*
* delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) 指定延遲時間&線程排程器&錯誤延遲。參數一:時間 ; 參數二:時間機關;
* 參數三: 線程排程器; 參數四:是否錯誤延遲(若中間發生錯誤,是否如常執行,執行完在執行onError())
*/
public static void delay() {
Observable
.just(, )
.delay(, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "delay", String.valueOf(integer));
}
});
}
輸出如下:
3、do 系列操作符
/**
* ========================do 系列操作符 =======================================
*
* 在事件發送&接收的整個周期過程中進行操作。
*
* 如發送事件前的操作,發送事件後的回調請求
*
* do系列操作符包含以下:
*
* doOnEach() :當Observable每發送一次事件就會調用一次(包含onNext(),onError(),onComplete())
* doOnNext(): 執行 onNext()前調用
* doAfterNext(): 執行onNext()後調用
* doOnComplete():執行onComplete()前調用
* doOnError():執行 onError()前調用
* doOnTerminate(): 執行終止(無論正常發送完畢/異常終止)
* doFinally(): 最後執行
* doOnSubscribe() :觀察者訂閱是調用
* doOnUnScbscribe(): 觀察者取消訂閱時調用
*/
public static void dos() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext();
emitter.onNext();
emitter.onNext();
emitter.onError(new NullPointerException());
}
})
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG + "doOnEach", "doOnEach: " + String.valueOf(integerNotification.getValue()));
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "doOnNext", "doOnNext: " + String.valueOf(integer));
}
})
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "doAfterNext", "doAfterNext: " + String.valueOf(integer));
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doOnComplete", "doOnComplete");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG + "doOnError", "doOnError");
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doOnTerminate", "doOnTerminate");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doAfterTermi", "doAfterTerminate");
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG + "doOnSubscribe", "doOnSubscribe");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doFinally", "doFinally");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "收到的資料: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出如下:
4、onErrorReturn() 操作符
/** ====================onErrorReturn() 操作符 ======================
*
* 可以捕獲錯誤。遇到錯誤時,發送一個特殊事件,并且正常終止.注意後面的事件不會再發送
*/
public static void onErrorReturn() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext();
emitter.onNext();
emitter.onError(new Throwable("Throwable"));
emitter.onNext();
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.e(TAG, "發生了錯誤: " + throwable.getMessage());
return ;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG, e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出如下:
5、onExceptionResumeNext()/onErrorResumeNext() 操作符
/**
* ====================onExceptionResumeNext()/onErrorResumeNext() 操作符 ======================
*
* 遇到錯誤時發送一個新的Observable 。并且正常終止.注意原Observable後面的事件不會再發送
*
* 如果捕獲Exception的話使用onExceptionResumeNext() ,捕獲錯誤的用onErrorResumeNext()
*/
public static void onExceptionResumeNext() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext();
emitter.onNext();
emitter.onError(new NullPointerException("NullPointerException"));
emitter.onNext();
}
}).onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext();
observer.onNext();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG, e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出如下:
6、retry() 操作符
/**
* ====================retry() 操作符 ======================
*
* 作用是:出現錯誤時,讓被觀察者重新發送資料
* 注:若發送錯誤,則一直重新發送
*
* 有幾個重載方法:
* retry() : 出現錯誤時,讓被觀察者重新發送資料。若錯誤一直發生,則一直重新發送
*
* retry(long time):與retry不同的書,若錯誤一直發生,被觀察者則一直重新發送資料,但這持續重新發送有次數限制
*
* retry(Predicate predicate) : 出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料
*
* retry(new BiPredicate<Integer, Throwable>):出現錯誤時,根據指定邏輯(可以捕獲重發的次數和發生的錯誤)決定是否讓被觀察者重新發送資料
*
* retry(long time,Predicate predicate) : 出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料。并且有持續重發的次數限制
*/
public static void retry() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext();
emitter.onNext();
emitter.onError(new Throwable("發生錯誤了"));
emitter.onNext();
}
})
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
// interger 為重試次數 ,throwable 為捕獲到的異常
Log.e(TAG + "retry", throwable.getMessage());
Log.e(TAG + "integer", "重試次數: " + integer);
//return true : 重新發送請求(若持續遇到錯誤,就持續重新發送)
//return false : 不重新發送資料 并且調用觀察者的onError()方法結束
if (integer > )
return false;
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG + "retry", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
輸出如下:
7、retryUntil() 操作符
/**
* ===================retryUntil() 操作符============================
*
* 發送事件遇到錯誤,指定規則是否重新發送。retry(Predicate predicate)。
*
* return true : 不重新發送請求,并且調用觀察者的onError()方法結束
* return false : 重新發送資料(若持續遇到錯誤,就持續重新發送)
*/
public static void retryUntil() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext();
emitter.onNext();
emitter.onError(new Throwable("發生錯誤了"));
emitter.onNext();
}
})
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
//return true : 不重新發送請求,并且調用觀察者的onError()方法結束
// return false : 重新發送資料(若持續遇到錯誤,就持續重新發送)
return false;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG + "retryUntil", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "retryUntil", "onComplete");
}
});
}
輸出如下:
8、 retryWhen() 操作符
/**
* ===================retryWhen() 操作符============================
*
* 遇到錯誤時,将發生的錯誤傳遞給一個新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable) & 發送事件
*/
public static void retryWhen() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext();
emitter.onNext();
emitter.onError(new Throwable("發送了錯誤"));
emitter.onNext();
}
})
//遇到Error時會回調
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
//1、若傳回的Observable發送的事件 = Error ,則原始的Observable則不重新發送事件。該異常資訊可在觀察者的onError中獲得
//return Observable.error(throwable);
//2、若傳回的Observable發送的事件= Next事件(和next的内容無關),則原始的Observable重新發送事件(若持續遇到錯誤,則持續發送)
return Observable.just(); //僅僅是作為一個觸發重新訂閱原被觀察者的通知,什麼資料并不重要,隻有不是onComplete/onError事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "retryWhen", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG + "retryWhen", e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG + "retryWhen", "onComplete");
}
});
}
輸出如下:
9、repeat() 操作符
/**
* ===============repeat() 操作符==============
*
* repeat操作符的作用是重複發射 observable的資料序列,可以使無限次也可以是指定次數.不傳時為重複無限次
*/
public static void repeat() {
Observable
.just(, , )
.repeat()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "repeat", String.valueOf(integer));
}
});
}
輸出如下:
10、repeatWhen() 操作符
/**
* ===============repeatWhen() 操作符==============
*
* 将原始 Observable 停止發送事件的辨別(Complete() / Error())轉換成1個 Object 類型資料傳遞給1個新被觀察者(Observable)
* ,以此決定是否重新訂閱 & 發送原來的 Observable
*/
public static void repeatWhen() {
Observable
.just(, , )
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
//若新被觀察者(Observable)傳回1個Complete()/ Error()事件,則不重新訂閱 & 發送原來的 Observable
//Observable.empty() = 發送Complete事件,但不會回調觀察者的onComplete()
return Observable.empty();
// return Observable.error(new Throwable("不再重新訂閱事件"));
// 傳回Error事件 = 回調onError()事件,并接收傳過去的錯誤資訊。
// 情況2:若新被觀察者(Observable)傳回其餘事件,則重新訂閱 & 發送原來的 Observable
// return Observable.just(1);
// 僅僅是作為1個觸發重新訂閱被觀察者的通知,發送的是什麼資料并不重要,隻要不是Complete() / Error()事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
輸出如下:
11、debounce() 操作符
/**
* ===============debounce() 操作符==============
* <p>
* 一定的時間内沒有操作就會發送事件(隻會發送最後一次操作的事件)。
* <p>
* 以下的例子: 發送5個事件,每個事件間隔1秒。但是debounce限定了2秒内沒有任何操作才會真正發送事件。是以隻有最後一次滿足條件,隻能接收到事件 5
*/
public static void debounce() {
Observable.intervalRange(, , , , TimeUnit.SECONDS)
.debounce(, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "debounce", String.valueOf(aLong));
}
});
}
輸出如下:
12、subscribeOn / ObserverOn 操作符
/**
* ===============subscribeOn() 操作符==============
* ===============observerOn() 操作符==============
* <p>
* <p>
* subscribeOn : 發送事件的線程
* observerOn: 接收事件的線程
* <p>
* 線程排程器:
* Schedulers.io(): 代表io操作的線程,通常用于網絡,讀寫檔案等io密集型的操作
* Schedulers.compucation(): 代表CPU計算密集型的操作,例如需要大量計算的操作
* Schedulers.newThread(): 代表一個正常的新線程
* AndroidSchedulers。mainThread(): 代表Android的主線程
*/
public static void subscribeOn_observerOn() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("事件");
Log.d(TAG + "subscribeOn_ObserverOn", "發送事件:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG + "subscribeOn_ObserverOn", "接收事件: " + Thread.currentThread().getName());
}
});
}
輸出如下:
上面代碼位址
RxJava各類型操作符詳解如下:
RxJava操作符彙總
RxJava操作符(一) —-建立操作符
RxJava操作符(二)—-轉換操作符
RxJava操作符(三)—-合并操作符
RxJava操作符(四)—-功能操作符
RxJava操作符(五) —-過濾操作符
RxJava操作符(六)—-條件操作符