前言
之前的幾天寫了 1,2,3,4,5,6 篇關于 RxJava 的操作符的基本用法的示範,琢磨琢磨了一下,感覺不太全,再補充點功能類的操作符,也就是常用的,但是不太在意的操作符。
正文
1、subscribe 操作符
1)、作用
訂閱,連接配接觀察者 & 被觀察者,組成訂閱關系
2)、代碼
/**
* subscribe 操作符
*/
@SuppressLint("CheckResult")
private void subscribeMethod() {
Observable.just(123, 234, 455, 677)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
logDUtils("accept:" + integer);
}
});
}
這裡就不貼 效果了,有了 subscribe 操作符,才可以讓 被觀察者與觀察者聯系到一起。
2、線程排程類 subscribeOn 和 observeOn
subscribeOn :被觀察者 的運作線程
observeOn : 觀察者 的運作線程
兩個操作符 内的參數:
Schedulers.immediate ( ) : 預設線程--------------> 目前線程 不指定是什麼具體線程;
AndroidSchedulers.mainThread ( ) :主線程
Schedulers.newThread ( ) :正常新工作線程 非主線程
Schedulers.io ( ) :i o 操作符線程 -----------------> 多用于網絡請求、讀寫檔案等 io 密集型操作
Schedulers.computation( ) : cpu 計算線程
建議檢視筆者仰慕的大佬 Season_zlc 的
給初學者的RxJava2.0教程(二)這篇文章
/**
* 線程排程類 操作符以及參數
*/
@SuppressLint("CheckResult")
private void threadMethod() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
logDUtils("create thread 1:" + Thread.currentThread().getName());
emitter.onNext("a");
logDUtils("create thread 2:" + Thread.currentThread().getName());
emitter.onNext("b");
}
//先将被觀察者 設定運作與 工作線程
}).subscribeOn(Schedulers.io())
//将 被觀察者 運作在 主線程
.subscribeOn(AndroidSchedulers.mainThread())
//将 觀察者 首先運作在 工作線程
.observeOn(Schedulers.newThread())
//将 觀察者轉換到 主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
logDUtils("accept thread:" + Thread.currentThread().getName() + " 列印資料: " + s);
}
});
}
3)、效果
注意:
從效果内 可以看出 被觀察者 對象 隻能 設定 1 次,而 觀察者所線上程 可以進行多次改變。
3、delay 操作符
将被觀察者延遲一段時間再發送事件
2)、代碼示範
/**
* delay 操作符
*/
private void delayMethod() {
Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
logDUtils("onSubscribe");
}
@Override
public void onNext(Integer integer) {
logDUtils("onNext: " + integer);
}
@Override
public void onError(Throwable e) {
logDUtils("onError" + e.getMessage());
}
@Override
public void onComplete() {
logDUtils("onComplete");
}
});
}
其他 重載方法
// 參數:時間 時間機關
public final Observable<T> delay(long delay, TimeUnit unit)
// 參數: 時間 時間機關 線程排程器
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)
// 參數: 時間 時間機關 錯誤延遲參數
public final Observable<T> delay(long delay, TimeUnit unit, boolean delayError)
// 參數:時間 時間機關 線程排程器 錯誤延遲參數
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError)
4、do 類 操作符
doOnEach:當被觀察者每發送 1 次資料事件就會調用 1 次;
doOnNext:執行 onNext 事件前調用;
doAfterNext: 執行onNext事件後調用;
doOnComplete:被觀察者正常發送事件完畢後調用;
doOnError: 被觀察者發送錯誤事件時調用;
doOnSubscribe: 觀察者訂閱時調用;
doAfterTerminate:被觀察者發送事件完 畢後調用,不管是正常終止還是非正常終止;
doFinally:最後執行
doOnUnsubscribe(): 取消訂閱執行
5、onErrorReturn 操作符
當發生錯誤時, 發送一個約定好的事件, 保證正常停止。
/**
* onErrorReturn 操作符
*/
private void onErrorReturnMethod() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("a");
emitter.onError(new Throwable());
emitter.onNext("b");
}
}).onErrorReturn(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) throws Exception {
logDUtils("發送錯誤");
return "c";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
logDUtils("onSubscribe");
}
@Override
public void onNext(String s) {
logDUtils("onNext:" + s);
}
@Override
public void onError(Throwable e) {
logDUtils("onError:" + e.getMessage());
}
@Override
public void onComplete() {
logDUtils("onComplete:");
}
});
}
當發送 onError 事件之後,後面的 b 并沒有被發送,而是直接走 onErrorReturn 操作符發送 c ,結束程式。
6、onErrorResumeNext 操作符
作用
當發生錯誤的時候 發送 一個被觀察者。 記住 與上面時有差別的哦!
7、onExceptionResumeNext 操作符
當發生 異常 的時候 發送一個被觀察者。記住這個與上面也不一樣哈!
8、 retry 操作符
當發生 錯誤 或者 異常時,重新讓被觀察者發送 一 次資料
重載方法 有 5個
public final Observable<T> retry()
// 參數 = 重試次數
public final Observable<T> retry(long times)
// 參數 = 判斷邏輯
public final Observable<T> retry(Predicate<? super Throwable> predicate)
// 參數 = 判斷邏輯(傳入目前重試次數 & 異常錯誤資訊)
public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate)
// 參數 = 設定重試次數 & 判斷邏輯
public final Observable<T> retry(long times, Predicate<? super Throwable> predicate)
9、retryUntil 操作符
出現錯誤後,判斷是否需要重新發送資料
若需要,則持續進行重試操作 類似于
public final Observable<T> retry(Predicate<? super Throwable> predicate)
方法
10、retryWhen 操作符
遇到錯誤時,将發生的錯誤傳遞給一個新的被觀察者,并決定是否需要重新訂閱 原 被觀察者 并且發送 原 事件;
11、repeat 操作符
無參 :表示 無條件重複 發送被觀察 的事件
有參:表示 重複次數
12、repeatWhen 操作符
有條件的 進行 重複發送
好了,馬上雙十一, 筒子們,準備好購物車,一起來剁手吧,額,還是再添兩本書吧。