目錄
- 一、RxJava的基礎使用
-
- 1. 定義
- 2. 作用
- 3. 特點
- 4.Rxjava原理介紹
- 5. 基本使用
-
- 5.1.1 使用步驟
- 5.1.2 步驟詳解
- 二、RxJava操作符
-
- 1、建立操作符 :建立被觀察者(Observable)對象&發送事件
-
- 1)Create() 操作符
- 2)Just() 操作符
- 3)fromIterable() 操作符
- 4)timer()操作符
- 5)fromArray() 操作符
- 6)interval() 定時器
- 7)intervalRange() 操作符
- 8)Range() 操作符
- 2、轉換操作符:變換被觀察者(Observable)發送的事件。将Observable發送的資料按照一定的規則做一些變換,然後再将變換的資料發射出去。變換的操作符有map,flatMap,concatMap,switchMap,buffer,groupBy等等。
-
- 1)map()操作符
- 2)flatMap()操作符
- 3)concatMap() 操作符
- 4)buffer() 操作符
- 3、 合并操作符:組合多個被觀察者(Observable)&合并需要發送的事件。包含:concatMap(),concat(),merge(),mergeArray(),concateArray(),reduce(),collect(),startWith(),zip(),count()等
-
- 1)merge(),concat ()操作符
- 2)concatDelayError()/mergeDelayError() 操作符
- 3)zip 操作符
- 4)combineLatest 操作符
- 5)reduce ()操作符
- 6)collect() 操作符
- 7)startWith()/startWithArray() 操作符
- 8)count() 操作符
- 4、功能操作符:輔助被觀察者(Observable) 發送事件時實作一些功能性需求,如錯誤處理,線程排程。
-
- 1) subscribe() 操作符
- 2) delay() 操作符
- 3) do 系列操作符
- 4) onErrorReturn() 操作符
- 5) onExceptionResumeNext()/onErrorResumeNext() 操作符
- 6) retry() 操作符
- 7) retryUntil() 操作符
- 8) retryWhen() 操作符
- 9) repeat() 操作符
- 10)repeatWhen() 操作符
- 11)debounce() 操作符
- 12)subscribeOn / ObserverOn 操作符
- 5、過濾操作符:用于将Observable發送的資料進行過濾和選擇。讓Observable傳回我們所需要的資料。過濾操作符有buffer(),filter(),skip(),take(),skipLast(),takeLast(),throttleFirst(),distainctUntilChange()。
-
- 1)filter() 操作符
- 2)distinct() 操作符
- 3)distinctUntilChanged() 操作符
- 4)skip(),skipLast()操作符
- 5)take() 操作符
- 6)takeLast() 操作符
- 7)elementAt() 操作符
- 8)elementAtOrError()操作符
- 9)ignoreElements() 操作符
- 10)ofType() 操作符
- 11)throttleFirst()/throttleLast() 操作符
- 12)sample()
- 13)firstElement()/lastElement()
- 6、條件 / 布爾操作符:通過設定函數,判斷被觀察者(Observable)發送的事件是否符合條件
一、RxJava的基礎使用
1. 定義
RxJava 在 GitHub 的介紹:
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻譯:RxJava 是一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程式的庫
總結:RxJava 是一個 基于事件流、實作異步操作的庫
2. 作用
實作異步操作
類似于 Android中的 AsyncTask 、Handler作用
3. 特點
由于 RxJava的使用方式是:基于事件流的鍊式調用,是以使得 RxJava:
- 邏輯簡潔
- 實作優雅
-
使用簡單
更重要的是,随着程式邏輯的複雜性提高,它依然能夠保持簡潔 & 優雅
4.Rxjava原理介紹
- Rxjava原理 基于 一種擴充的觀察者模式
- Rxjava的擴充觀察者模式中有4個角色:
角色 | 作用 | 類比 |
---|---|---|
被觀察者(Observable) | 産生事件 | 顧客 |
觀察者(Observer) | 接收事件,并給出響應動作 | 廚房 |
訂閱(Subscribe) | 連接配接 被觀察者 & 觀察者 | 服務員 |
事件(Event) | 被觀察者 & 觀察者 溝通的載體 | 菜式 |
具體原理
結合 顧客到飯店吃飯 的生活例子了解:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL1smeNl3YU10dNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwIjM2UDNzETM0ETNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
即RxJava原理可總結為:被觀察者 (Observable) 通過 訂閱(Subscribe) 按順序發送事件 給觀察者 (Observer), 觀察者(Observer) 按順序接收事件 & 作出對應的響應動作。具體如下圖:
5. 基本使用
5.1.1 使用步驟
5.1.2 步驟詳解
加入依賴:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
具體實作:
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 步驟1:建立被觀察者 Observable & 生産事件
// 即 顧客入飯店 - 坐下餐桌 - 點菜
// 1. 建立被觀察者 Observable 對象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 2. 在複寫的subscribe()裡定義需要發送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 通過 ObservableEmitter類對象産生事件并通知觀察者
// ObservableEmitter類介紹
// a. 定義:事件發射器
// b. 作用:定義需要發送的事件 & 向觀察者發送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
// 步驟2:建立觀察者 Observer 并 定義響應事件行為
// 即 開廚房 - 确定對應菜式
Observer<Integer> observer = new Observer<Integer>() {
// 通過複寫對應方法來 響應 被觀察者
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接配接");
}
// 預設最先調用複寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
};
// 步驟3:通過訂閱(subscribe)連接配接觀察者和被觀察者
// 即 顧客找到服務員 - 點菜 - 服務員下單到廚房 - 廚房烹調
observable.subscribe(observer);
基于事件流的鍊式調用方式:
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// RxJava的流式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 建立被觀察者 & 生産事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 通過通過訂閱(subscribe)連接配接觀察者和被觀察者
// 3. 建立觀察者 & 定義響應事件的行為
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接配接");
}
// 預設最先調用複寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
}
}
二、RxJava操作符
1、建立操作符 :建立被觀察者(Observable)對象&發送事件
1)Create() 操作符
/**
*
* =======================基本用法Create====================================
*
* Observable 被觀察者
*
* ObservableOnSubscribe 觀察者與被觀察者的橋接(事件發射器)
*
* ObServer 觀察者
*
* 被觀察者 --> 觀察者與被觀察者的橋接 --> 觀察者
*
* 被觀察者.create(觀察者與被觀察者的橋接).subscribe(觀察者)
*
*/
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
emitter.onNext(String.valueOf(i));
}
emitter.onComplete();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
}
輸出如下:
2)Just() 操作符
/**
* ====================just 操作符 ====================
*
* 此操作符的作用是将傳入的資料依次發送出去.最多可以傳10個參數
*
* 以下代碼會依次把 1-10的字元串發送出去。執行10此觀察者的onNext方法,最後預設執行onComplete方法
*/
public static void just() {
Observable
.just("1", "2", "3", "4", "5",
"6", "7", "8", "9", "10")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "just", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "just", "complete");
}
});
}
輸出如下:
3)fromIterable() 操作符
/**
* ====================fromIterable 操作符 ====================
*
* 此操作符的作用是将傳入的數組集合按腳标依次發送出去
*
* 以下代碼會依次把 0-9的字元串發送出去。執行10此觀察者的onNext方法,最後預設執行onComplete方法
*/
public static void fromIterable() {
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(String.valueOf(i));
}
Observable
.fromIterable(list)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "fromIterable", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "fromIterable", "complete");
}
});
}
輸出如下:
4)timer()操作符
/**
* ==========================timer操作符 ==============================
*
* 延遲指定時間發送一個0數值(Long類型)
*
* timer操作符主要運作在一個新線程中,也可以自定義線程排程器(第三個參數)
*/
public static void timer() {
Observable
.timer(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "timer", String.valueOf(aLong));
}
});
}
輸出如下:
5)fromArray() 操作符
/**
* ====================fromArray 操作符============================
*
* 對一個數組集合進行觀察,把數組一次性發給觀察者,隻會執行一次觀察者的onNext,最後預設執行onComplete方法
*/
public static void fromArray() {
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(String.valueOf(i));
}
Observable
.fromArray(list)
.subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<String> strings) {
Log.d(TAG + "fromArray", strings.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "fromArray", "complete");
}
});
}
輸出如下:
6)interval() 定時器
/**
* ====================interval 定時器====================
*
* 這個相當于定時器,用它可以取代CountDownTimer。它會按照設定的間隔時間,每次發送一個事件,發送的事件序列:預設從0開始,無限遞增的整數序列
*
* 以下代碼輸出: 0 ----(5秒後)-----1-----(5秒後)------2---------(5秒後)--------3-------(5秒後)-----.......
*/
public static void interval() {
Observable
.interval(5, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "interval", String.valueOf(aLong));//從0開始輸出
}
});
}
輸出如下:
7)intervalRange() 操作符
/**
* intervalRange 操作符
* <p>
* 作用和interval相同,但可以指定發送資料的數量
*/
public static void intervalRange() {
/**
* 參數1: 起始發送值
* 參數2:發送數量
* 參數3:首次發送延遲事件
* 參數4:每次發送事件間隔
* 參數5:時間機關
*
*/
Observable
.intervalRange(2, 10, 3, 1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "intervalRan", String.valueOf(aLong));//從2開始輸出
}
});
}
輸出如下:
8)Range() 操作符
/**
* Range 操作符
* <p>
* 作用發送指定範圍的序列,可指定範圍.作用類似intervalRange,但不同的是range是無延遲發送
*/
public static void range() {
Observable
.range(2, 6)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "interval", String.valueOf(integer));//從2開始輸出
}
});
}
輸出如下:
2、轉換操作符:變換被觀察者(Observable)發送的事件。将Observable發送的資料按照一定的規則做一些變換,然後再将變換的資料發射出去。變換的操作符有map,flatMap,concatMap,switchMap,buffer,groupBy等等。
1)map()操作符
/**
* ======================map============================
*
* map操作符,可以說是的被觀察者轉換器。 通過指定一個Funcation對象,将被觀察者(Observable)轉換成新的被觀察者(Observable)對象并發射,觀察者會收到新的被觀察者并處理
*
*
* 本來發射的資料是 數字1,然後觀察者接收到的是 “ 這是新的觀察資料===: 1”
*
* 流程: 被觀察者.create(事件發射器).map(轉換器).subscribe(觀察者)
*/
public static void map() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "這是新的觀察資料===:" + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG + "map", s);
}
});
}
輸出如下:
2)flatMap()操作符
/**
* ======================flatMap============================
*
* flatMap操作符, 将Observable每一次發射的事件都轉換成一個Observable,也就是說把Observable的發射事件集合轉換成Observable集合。
* 然後觀察者Observer最終觀察的是Observable集合。但是觀察者不能保證接收到這Observable集合發送事件的順序。
*
* 是不是很抽象? 先來看看這一個流程: 觀察者.create(事件發射器).flatMap(轉換器).subscribe(觀察者)
*
* 再來看看例子 : 下面的代碼,一開始Observable通過發射器的onNext發送了0-9這10個事件發送出去,正常來說Observer接收到就是 0 - 9 這10個資料
* 然而中間經過了flatMap的轉換。這 10個事件都分别在Funcation轉換器上新的Observable。而最終觀察者接收的就是這10個新的Observable的發送事件。
*/
public static void flatMap() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
list.add(String.valueOf(0));
list.add(String.valueOf(1));
return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
}
})
.subscribe(
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "flatMap", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "flatMap", "complete");
}
});
}
輸出如下:
3)concatMap() 操作符
/**
* ======================concatMap============================
*
* 與上面的flatMap作用基本一樣,與flatMap唯一不同的是concat能保證Observer接收到Observable集合發送事件的順序
*/
public static void concatMap() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
})
.concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
list.add(String.valueOf(0));
list.add(String.valueOf(1));
return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
}
})
.subscribe(
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "flatMap", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "flatMap", "complete");
}
});
}
輸出如下:
4)buffer() 操作符
/**
* ========================buffer 操作符 ======================================
*
* 把發射資料按照一定間隔分成若幹段。按每段的資料轉換成新的Observable,這個Observable把一段資料一次性發射出去。
* 可以簡單地了解為把一組資料分成若幹小組發射出去,而不是單個單個地發射出去
*/
public static void buffer() {
Observable
.just(1, 2, 3, 4, 5, 6)
.buffer(2)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> integers) {
for (Integer integer : integers) {
Log.d(TAG + "buffer", String.valueOf(integer));
}
Log.d(TAG + "buffer", "============================");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "buffer", "onComplete");
}
});
}
輸出如下:
3、 合并操作符:組合多個被觀察者(Observable)&合并需要發送的事件。包含:concatMap(),concat(),merge(),mergeArray(),concateArray(),reduce(),collect(),startWith(),zip(),count()等
1)merge(),concat ()操作符
/**
* ========================merge,concat 操作符 ======================================
*
* merge操作符是把多個Observable合并成一個進行發射。merge可能會讓合并到Observable的資料順序發生錯亂(組合被觀察者數量<=4個)(并行無序)
* mergeArray操作符和merge作用一樣,但不同的是組合被觀察者數量>4個)(并行無序)
*
* concat操作符也是把多個Observable合并成一個進行發射。但concat則保證合并的每個Observable的事件按順序發射出去。(組合被觀察者數量<=4個)(串行有序)
* concatArray操作符和concat作用一樣,但不同的是組合被觀察者數量>4個)(串行有序)
*/
public static void merge() {
Observable observable1 = Observable.just(1, 2, 3);
Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");
Observable
.merge(observable1, observable2).delay(1, TimeUnit.SECONDS)
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.d(TAG + "merge", o.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "merge", "onComplete");
}
});
}
2)concatDelayError()/mergeDelayError() 操作符
/**
* ========================concatDelayError()/mergeDelayError() 操作符 ======================================
*
* 這兩個操作符的作用是: 使用concat()和merge()操作符時,若其中一個被觀察者發送onError事件,則會馬上終止其它被觀察者繼續發送事件。是以呐,這時使用concatError()/
* mergeDelayError()事件可以使onError事件推遲到其它被觀察者發送事件結束後在再觸發
*/
public static void concatDelayError() {
Observable
.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new NullPointerException());
emitter.onNext(3);
emitter.onNext(4);
}
}), Observable.just(5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "cDelayError", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG + "cDelayError", "onError");
}
@Override
public void onComplete() {
Log.d(TAG + "cDelayError", "onComplete");
}
});
}
3)zip 操作符
/**
* ========================zip 操作符 ======================================
*
* 把多個Observable合并後,并且把這些Observable的資料進行轉換再發射出去。轉換之後的資料數目由最短資料長度的那個Observable決定。發射完最終會自動調用觀察者的onComplete方法()
*
* 如以下代碼: 資料長度為4的observable1和資料長度為3的observable2進行合并轉換後,觀察者隻接收到3個資料
*/
public static void zip() {
Observable observable1 = Observable.just(1, 2, 3, 4);
Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");
Observable
.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return s + integer;
}
})
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.d(TAG + "zip", o.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "merge", "onComplete");
}
});
}
4)combineLatest 操作符
/**
* ========================combineLatest 操作符 ======================================
*
* 當兩個Observable 中的任何一個發送了資料,将先發送了資料的Observable的最新(最後)一個資料和另一個Observable發送的每個資料結合,最終基于該結合的結果發送資料
*
* 與zip()的差別: zip()是按個數合并,即1對1合并;而combineLatest()是基于時間合并,,即在同一時間點上合并
*/
/**
*
* ======================combineLatestDelayError =================================
*
* 作用類似于concatDelayError() / mergeDelayError(),用于錯誤處理
public static void combineLatest() {
Observable
.combineLatest(Observable.just(1, 2, 3)
, Observable.intervalRange(1, 4, 2, 1, TimeUnit.SECONDS)
, new BiFunction<Integer, Long, String>() {
@Override
public String apply(Integer integer, Long aLong) throws Exception {
return "合并後的資料為:" + integer + aLong;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "combineLatest", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "combineLatest", "onComplete");
}
});
}
5)reduce ()操作符
/**
* ======================reduce 操作符=================================
*
* 把被觀察者需要發送的資料按照指定規則聚合成一個資料發送
*
* 聚合的規則需要我們編寫,内部流程是前兩個資料按照我們的規則合并後,再與後面的資料按規則合并,依次類推。這樣說有點抽象,看下面的例子。
*/
public static void reduce() {
Observable
.just(1, 2, 3, 4, 5)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG + "reduce", "本次合并的過程是: " + integer + "+" + integer2);
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "reduce", "最終計算的結果是 : " + integer);
}
});
}
6)collect() 操作符
/**
* ========================collect 操作符=================================
*
* 作用是把 Observable(被觀察者)發送的事件收集到一個資料結構中
*/
public static void collect() {
Observable
.just(1, 2, 3, 4, 5)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
integers.add(integer);
}
})
.subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> integers) throws Exception {
Log.d(TAG + "collect", integers.toString());
}
});
}
7)startWith()/startWithArray() 操作符
/**
* ========================startWith/startWithArray 操作符=================================
*
* 在一個被觀察者發送時間前,追加發送一些資料/一個新的被觀察者
*/
public static void startWith() {
Observable.just(7, 8, 9)
.startWith(6) //在發送序列去追加單個資料
.startWithArray(4, 5) //在發送序列去追加多個資料
.startWith(Observable.just(1, 2, 3)) //在發送序列去追加單個被觀察者
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "startWith", String.valueOf(integer));
}
});
}
8)count() 操作符
/**
* ========================count 操作符=================================
*
* 統計被觀察者發送事件數量
*/
public static void count() {
Observable
.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "count", "發送事件的數量 : " + aLong);
}
});
}
4、功能操作符:輔助被觀察者(Observable) 發送事件時實作一些功能性需求,如錯誤處理,線程排程。
1) subscribe() 操作符
/**
* ==================subscribe 操作符===========================
*
* 連接配接被觀察者和觀察者
*/
public static void subscribe() {
//建立被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("事件");
}
});
//建立觀察者
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG + "subscribe", "開始連接配接");
}
@Override
public void onNext(Object o) {
Log.d(TAG + "subscribe", "收到事件");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//通過subscribe 進行 被觀察者(Observable)與觀察者(Observer)的連接配接
observable.subscribe(observer);
}
2) delay() 操作符
/**
* ==================delay 操作符=======================================
*
* 延遲發送事件
*
* delay有多個重載方法:
*
* delay(long delay,TimeUnit unit) :指定延遲時間。 參數一:時間 ; 參數二:時間機關
*
* delay(long delay, TimeUnit unit, Scheduler scheduler) 指定延遲時間&線程排程器。參數一:時間 ; 參數二:時間機關;參數三: 線程排程器
*
* delay(long delay, TimeUnit unit, boolean delayError) 指定延遲時間&線程排程器。參數一:時間 ; 參數二:時間機關;參數三: 是否錯誤延遲
*
* delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) 指定延遲時間&線程排程器&錯誤延遲。參數一:時間 ; 參數二:時間機關;
* 參數三: 線程排程器; 參數四:是否錯誤延遲(若中間發生錯誤,是否如常執行,執行完在執行onError())
*/
public static void delay() {
Observable
.just(1, 2)
.delay(10, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "delay", String.valueOf(integer));
}
});
}
3) do 系列操作符
/**
* ========================do 系列操作符 =======================================
*
* 在事件發送&接收的整個周期過程中進行操作。
*
* 如發送事件前的操作,發送事件後的回調請求
*
* do系列操作符包含以下:
*
* doOnEach() :當Observable每發送一次事件就會調用一次(包含onNext(),onError(),onComplete())
* doOnNext(): 執行 onNext()前調用
* doAfterNext(): 執行onNext()後調用
* doOnComplete():執行onComplete()前調用
* doOnError():執行 onError()前調用
* doOnTerminate(): 執行終止(無論正常發送完畢/異常終止)
* doFinally(): 最後執行
* doOnSubscribe() :觀察者訂閱是調用
* doOnUnScbscribe(): 觀察者取消訂閱時調用
*/
public static void dos() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException());
}
})
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG + "doOnEach", "doOnEach: " + String.valueOf(integerNotification.getValue()));
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "doOnNext", "doOnNext: " + String.valueOf(integer));
}
})
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "doAfterNext", "doAfterNext: " + String.valueOf(integer));
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doOnComplete", "doOnComplete");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG + "doOnError", "doOnError");
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doOnTerminate", "doOnTerminate");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doAfterTermi", "doAfterTerminate");
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG + "doOnSubscribe", "doOnSubscribe");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doFinally", "doFinally");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "收到的資料: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
4) onErrorReturn() 操作符
/** ====================onErrorReturn() 操作符 ======================
*
* 可以捕獲錯誤。遇到錯誤時,發送一個特殊事件,并且正常終止.注意後面的事件不會再發送
*/
public static void onErrorReturn() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("Throwable"));
emitter.onNext(3);
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.e(TAG, "發生了錯誤: " + throwable.getMessage());
return 404;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG, e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
5) onExceptionResumeNext()/onErrorResumeNext() 操作符
/**
* ====================onExceptionResumeNext()/onErrorResumeNext() 操作符 ======================
*
* 遇到錯誤時發送一個新的Observable 。并且正常終止.注意原Observable後面的事件不會再發送
*
* 如果捕獲Exception的話使用onExceptionResumeNext() ,捕獲錯誤的用onErrorResumeNext()
*/
public static void onExceptionResumeNext() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new NullPointerException("NullPointerException"));
emitter.onNext(3);
}
}).onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(4);
observer.onNext(5);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG, e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
6) retry() 操作符
/**
* ====================retry() 操作符 ======================
*
* 作用是:出現錯誤時,讓被觀察者重新發送資料
* 注:若發送錯誤,則一直重新發送
*
* 有幾個重載方法:
* retry() : 出現錯誤時,讓被觀察者重新發送資料。若錯誤一直發生,則一直重新發送
*
* retry(long time):與retry不同的書,若錯誤一直發生,被觀察者則一直重新發送資料,但這持續重新發送有次數限制
*
* retry(Predicate predicate) : 出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料
*
* retry(new BiPredicate<Integer, Throwable>):出現錯誤時,根據指定邏輯(可以捕獲重發的次數和發生的錯誤)決定是否讓被觀察者重新發送資料
*
* retry(long time,Predicate predicate) : 出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料。并且有持續重發的次數限制
*/
public static void retry() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("發生錯誤了"));
emitter.onNext(3);
}
})
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
// interger 為重試次數 ,throwable 為捕獲到的異常
Log.e(TAG + "retry", throwable.getMessage());
Log.e(TAG + "integer", "重試次數: " + integer);
//return true : 重新發送請求(若持續遇到錯誤,就持續重新發送)
//return false : 不重新發送資料 并且調用觀察者的onError()方法結束
if (integer > 2)
return false;
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG + "retry", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
7) retryUntil() 操作符
/**
* ===================retryUntil() 操作符============================
*
* 發送事件遇到錯誤,指定規則是否重新發送。retry(Predicate predicate)。
*
* return true : 不重新發送請求,并且調用觀察者的onError()方法結束
* return false : 重新發送資料(若持續遇到錯誤,就持續重新發送)
*/
public static void retryUntil() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("發生錯誤了"));
emitter.onNext(3);
}
})
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
//return true : 不重新發送請求,并且調用觀察者的onError()方法結束
// return false : 重新發送資料(若持續遇到錯誤,就持續重新發送)
return false;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG + "retryUntil", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "retryUntil", "onComplete");
}
});
}
8) retryWhen() 操作符
/**
* ===================retryWhen() 操作符============================
*
* 遇到錯誤時,将發生的錯誤傳遞給一個新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable) & 發送事件
*/
public static void retryWhen() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("發送了錯誤"));
emitter.onNext(3);
}
})
//遇到Error時會回調
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
//1、若傳回的Observable發送的事件 = Error ,則原始的Observable則不重新發送事件。該異常資訊可在觀察者的onError中獲得
//return Observable.error(throwable);
//2、若傳回的Observable發送的事件= Next事件(和next的内容無關),則原始的Observable重新發送事件(若持續遇到錯誤,則持續發送)
return Observable.just(5); //僅僅是作為一個觸發重新訂閱原被觀察者的通知,什麼資料并不重要,隻有不是onComplete/onError事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "retryWhen", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG + "retryWhen", e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG + "retryWhen", "onComplete");
}
});
}
9) repeat() 操作符
/**
* ===============repeat() 操作符==============
*
* repeat操作符的作用是重複發射 observable的資料序列,可以使無限次也可以是指定次數.不傳時為重複無限次
*/
public static void repeat() {
Observable
.just(1, 2, 3)
.repeat(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "repeat", String.valueOf(integer));
}
});
}
10)repeatWhen() 操作符
/**
* ===============repeatWhen() 操作符==============
*
* 将原始 Observable 停止發送事件的辨別(Complete() / Error())轉換成1個 Object 類型資料傳遞給1個新被觀察者(Observable)
* ,以此決定是否重新訂閱 & 發送原來的 Observable
*/
public static void repeatWhen() {
Observable
.just(1, 2, 4)
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
//若新被觀察者(Observable)傳回1個Complete()/ Error()事件,則不重新訂閱 & 發送原來的 Observable
//Observable.empty() = 發送Complete事件,但不會回調觀察者的onComplete()
return Observable.empty();
// return Observable.error(new Throwable("不再重新訂閱事件"));
// 傳回Error事件 = 回調onError()事件,并接收傳過去的錯誤資訊。
// 情況2:若新被觀察者(Observable)傳回其餘事件,則重新訂閱 & 發送原來的 Observable
// return Observable.just(1);
// 僅僅是作為1個觸發重新訂閱被觀察者的通知,發送的是什麼資料并不重要,隻要不是Complete() / Error()事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
11)debounce() 操作符
/**
* ===============debounce() 操作符==============
* <p>
* 一定的時間内沒有操作就會發送事件(隻會發送最後一次操作的事件)。
* <p>
* 以下的例子: 發送5個事件,每個事件間隔1秒。但是debounce限定了2秒内沒有任何操作才會真正發送事件。是以隻有最後一次滿足條件,隻能接收到事件 5
*/
public static void debounce() {
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.debounce(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "debounce", String.valueOf(aLong));
}
});
}
12)subscribeOn / ObserverOn 操作符
/**
* ===============subscribeOn() 操作符==============
* ===============observerOn() 操作符==============
* <p>
* <p>
* subscribeOn : 發送事件的線程
* observerOn: 接收事件的線程
* <p>
* 線程排程器:
* Schedulers.io(): 代表io操作的線程,通常用于網絡,讀寫檔案等io密集型的操作
* Schedulers.compucation(): 代表CPU計算密集型的操作,例如需要大量計算的操作
* Schedulers.newThread(): 代表一個正常的新線程
* AndroidSchedulers。mainThread(): 代表Android的主線程
*/
public static void subscribeOn_observerOn() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("事件");
Log.d(TAG + "subscribeOn_ObserverOn", "發送事件:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG + "subscribeOn_ObserverOn", "接收事件: " + Thread.currentThread().getName());
}
});
}
5、過濾操作符:用于将Observable發送的資料進行過濾和選擇。讓Observable傳回我們所需要的資料。過濾操作符有buffer(),filter(),skip(),take(),skipLast(),takeLast(),throttleFirst(),distainctUntilChange()。
1)filter() 操作符
/**
* ========================filter() 操作符 ======================================
*
* 對被觀察者發送的事件做過濾操作。隻有符合篩選條件的事件才會被觀察者所接收。
*
* return true : 繼續發送
*
* return false : 不發送
*/
public static void filter() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 3; i++) {
emitter.onNext(i);
}
}
})
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
//return true : 繼續發送
//return false : 不發送
return integer != 2; // 本例子過濾了 等于2的情況
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "filter", String.valueOf(integer));
}
});
}
2)distinct() 操作符
/**
* ========================distinct 操作符 ======================================
*
* 簡單地說就是去重。發射的資料包含重複的,會将重複的篩選掉。也就是,它隻允許還沒有被發射過的資料通過,被觀察者接收。發射完資料會自動調用onComplete()方法
*
* y以下代碼:觀察者隻會接收到 : 1,2,3,5 四個數值
*/
public static void distinct() {
Observable
.just(1, 2, 3, 2, 3, 5)
.distinct()
.subscribe(
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "distinct", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "distinct", "onComplete");
}
});
}
3)distinctUntilChanged() 操作符
/**
* ========================distinctUntilChanged 操作符 ======================================
*
* 過濾掉連續重複的事件
*/
public static void distinctUntilChanged() {
Observable.just(1, 2, 3, 1, 2, 3, 3, 4, 4)
.distinctUntilChanged()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, String.valueOf(integer));
}
});
}
4)skip(),skipLast()操作符
/**
* ========================skip,skipLast======================================
*
* skip 操作符是把Observable發射的資料過濾點掉前n項。而take操作符隻能接收前n項。另外還有skipLast和takeLast則是從後往前進行過濾.接收完會調用onComplete()方法
*
*/
/**
* 以下代碼輸出: 3,4,5
*/
public static void skip() {
Observable
.just(1, 2, 3, 4, 5, 6, 7)
.skip(2) //過濾前2項
.skipLast(3) //過濾後3項
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "skip", "根據順序過濾" + String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "skip", "onComplete");
}
});
//-------------------------------------------------------------------------------------------------
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skip(1, TimeUnit.SECONDS) //過濾地1s發送的資料
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG + "skip", "根據事件過濾" + String.valueOf(aLong));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
5)take() 操作符
/**
* ====================== take 操作符===========================
* 隻能接收兩個事件
*/
public static void take() {
Observable
.just(1, 2, 3, 4, 5, 6, 7)
.take(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "skip", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "skip", "onComplete");
}
});
}
6)takeLast() 操作符
/**
* =======================takeLast() 操作符 ======================
*
* 隻能接收被觀察者發送的最後幾個事件
*/
public static void takeLast() {
Observable
.just(1, 2, 3, 4, 5, 6, 7)
.takeLast(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "skip", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "skip", "onComplete");
}
});
}
7)elementAt() 操作符
/**
* ========================elementAt() 操作符 ======================================
*
* 隻發射第n項資料.
* 一個參數和兩個參數時:
* elementAt(第n項)
* elementAt(第n項,第N項不存在時預設值)
*
* n為負數時,報IndexOUtOfBoundExection。為正數但超過發射資料長度不會報異常會使用預設值代替
*/
public static void elementAt() {
Observable.range(1, 5)
.elementAt(6, 10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "elementAt", String.valueOf(integer));
}
});
}
8)elementAtOrError()操作符
/**
* ==============elementAtOrError()===================================================
*
* 在elementAtError()的基礎上,當出現越界情況(當擷取位置的索引>事件序列的長度),即抛出異常
*/
public static void elementAtOrError() {
Observable.range(1, 5)
.elementAtOrError(6)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "elementAtOrErr", String.valueOf(integer));
}
});
}
9)ignoreElements() 操作符
/**
* ========================ignoreElements() 操作符 ======================================
*
* 不管發射的資料.隻希望在它完成時和遇到錯誤時收到通知
*/
public static void ignoreElements() {
Observable
.range(0, 10)
.ignoreElements()
.subscribe(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "ignoreEles", "完成了");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG + "ignoreEles", "出錯了");
}
});
}
10)ofType() 操作符
/**
* ========================ofType() 操作符 ======================================
*
* 通過資料的類型過濾資料,隻發送指定類型資料。
*
* 以下代碼觀察者隻接收到: 1,2
*/
public static void ofType() {
Observable.just("哈哈", 1, "嘻嘻", 2, 3.5)
.ofType(Integer.class)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "ofType", String.valueOf(integer));
}
});
}
11)throttleFirst()/throttleLast() 操作符
/**
* ========================throttleFirst()/throttleLast() 操作符 ======================================
*
* throttleFirst() 在某段時間内,隻發送該段事件第一次事件
*
* throttleLast() 在某段時間内,隻發送該段事件最後一次事件
*/
public static void throttleFirst() {
Observable.interval(300, TimeUnit.MILLISECONDS) //每個0.3秒發送一個事件
.throttleFirst(1, TimeUnit.SECONDS) //隻接收每秒内發送的第一個資料
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "throttleFirst", String.valueOf(aLong));
}
});
}
12)sample()
/**
* ===================sample()=================================
*
* 在某段時間内,隻發送該段時間内最新(最後)1次事件
*
* 與throttleLast類似
*/
public static void sample() {
Observable.interval(300, TimeUnit.MILLISECONDS) //每個0.3秒發送一個事件
.sample(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "sample", String.valueOf(aLong));
}
});
}
13)firstElement()/lastElement()
/**
* ========================firstElement()/lastElement()==========================
*
* 選取第一個元素/最後一個元素
*/
public static void firstElement() {
Observable.just(1, 2, 3)
.firstElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "firstElement", String.valueOf(integer));
}
});
}
public static void lastElement() {
Observable
.just(1, 2, 3)
.lastElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "lastElement", String.valueOf(integer));
}
});
}