目錄
Android RxJava操作符詳解 - 建立操作符

1、基本建立:create()
完整建立1個被觀察者對象(Observable):
// 1. 通過creat()建立被觀察者對象
Observable.create(new ObservableOnSubscribe<Integer>() {
// 2. 在複寫的subscribe()裡定義需要發送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
} // 至此,一個被觀察者對象(Observable)就建立完畢
}).subscribe(new Observer<Integer>() {
// 以下步驟僅為展示一個完整demo,可以忽略
// 3. 通過通過訂閱(subscribe)連接配接觀察者和被觀察者
// 4. 建立觀察者 & 定義響應事件的行為
@Override
public void onSubscribe(Disposable d) {
// Log.d(TAG, "開始采用subscribe連接配接");
}
// 預設最先調用複寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
}
輸出結果: 接收到了事件1 接收到了事件2 接收到了事件3 對Complete事件作出相應
2、快速建立&發送事件
just():
快速建立1個被觀察者對象(Observable)直接發送并傳入事件(最多發送10個參數):
// 1. 建立時傳入整型1、2、3、4
// 在建立後就會發送這些對象,相當于執行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
Observable.just(1, 2, 3,4)
// 至此,一個Observable對象建立完畢,以下步驟僅為展示一個完整demo,可以忽略
// 2. 通過通過訂閱(subscribe)連接配接觀察者和被觀察者
// 3. 建立觀察者 & 定義響應事件的行為
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接配接");
}
// 預設最先調用複寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
}
輸出結果同上。
fromArray():
快速建立被觀察者對象(Observable)直接傳入數組資料,将數組中的資料傳入到被觀察者(Observable)中:
// 1. 設定需要傳入的數組
Integer[] items = { 0, 1, 2, 3, 4 };
// 2. 建立被觀察者對象(Observable)時傳入數組
// 在建立後就會将該數組轉換成Observable & 發送該對象中的所有資料
Observable.fromArray(items)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接配接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
}
// 注:
// 可發送10個以上參數
// 若直接傳遞一個list集合進去,否則會直接把list當做一個資料元素發送
/*
* 數組周遊
**/
// 1. 設定需要傳入的數組
Integer[] items = { 0, 1, 2, 3, 4 };
// 2. 建立被觀察者對象(Observable)時傳入數組
// 在建立後就會将該數組轉換成Observable & 發送該對象中的所有資料
Observable.fromArray(items)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "數組周遊");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "數組中的元素 = "+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "周遊結束");
}
});
輸出結果: 接收到了事件0 接收到了事件1 接收到了事件2 接收到了事件3 接收到了事件4 對Complete時間做出回應 數組周遊 數組中的元素 = 0 數組中的元素 = 1 數組中的元素 = 2 數組中的元素 = 3 數組中的元素 = 4 周遊結束
fromIterable()
快速建立被觀察者對象(Observable)直接傳入集合List資料,将數組中的資料傳入到被觀察者(Observable)中:
/*
* 快速發送集合
**/
// 1. 設定一個集合
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
// 2. 通過fromIterable()将集合中的對象 / 資料發送出去
Observable.fromIterable(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接配接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
/*
* 集合周遊
**/
// 1. 設定一個集合
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
// 2. 通過fromIterable()将集合中的對象 / 資料發送出去
Observable.fromIterable(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "集合周遊");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "集合中的資料元素 = "+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "周遊結束");
}
});
另外
// 下列方法一般用于測試使用
<-- empty() -->
// 該方法建立的被觀察者對象發送事件的特點:僅發送Complete事件,直接通知完成
Observable observable1=Observable.empty();
// 即觀察者接收後會直接調用onCompleted()
<-- error() -->
// 該方法建立的被觀察者對象發送事件的特點:僅發送Error事件,直接通知異常
// 可自定義異常
Observable observable2=Observable.error(new RuntimeException())
// 即觀察者接收後會直接調用onError()
<-- never() -->
// 該方法建立的被觀察者對象發送事件的特點:不發送任何事件
Observable observable3=Observable.never();
// 即觀察者接收後什麼都不調用
3、延時建立
需求場景
- 定時操作:在經過了x秒後,需要自動執行y操作
- 周期性操作:每隔x秒後,需要自動執行y操作
defer():
-
作用
直到有觀察者(
)訂閱時,才動态建立被觀察者對象(Observer
) & 發送事件Observable
- 應用場景
動态建立被觀察者對象(
Observable
) & 擷取最新的
Observable
對象資料
<-- 1. 第1次對i指派 ->>
Integer i = 10;
// 2. 通過defer 定義被觀察者對象
// 注:此時被觀察者對象還沒建立
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
<-- 2. 第2次對i指派 ->>
i = 15;
<-- 3. 觀察者開始訂閱 ->>
// 注:此時,才會調用defer()建立被觀察者對象(Observable)
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接配接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到的整數是"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
輸出結果: 因為是在訂閱時才建立,是以i值會取第2次的指派15
timer():
- 作用
- 快速建立1個被觀察者對象(
)Observable
- 發送事件的特點:延遲指定時間後,發送1個數值0(
類型)Long
- 應用場景
延遲指定事件,發送一個0,一般用于檢測
// 該例子 = 延遲2s後,發送一個long類型數值
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接配接");
}
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
// 注:timer操作符預設運作在一個新線程上
// 也可自定義線程排程器(第3個參數):timer(long,TimeUnit,Scheduler)
interval():
作用
- 快速建立1個被觀察者對象(
)Observable
- 發送事件的特點:每隔指定時間 就發送 事件
- 發送的事件序列 = 從0開始、無限遞增1的的整數序列
// 參數說明:
// 參數1 = 第1次延遲時間;
// 參數2 = 間隔時間數字;
// 參數3 = 時間機關;
Observable.interval(3,1,TimeUnit.SECONDS)
// 該例子發送的事件序列特點:延遲3s後發送事件,每隔1秒産生1個數字(從0開始遞增1,無限個)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接配接");
}
// 預設最先調用複寫的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
// 注:interval預設在computation排程器上執行
// 也可自定義指定線程排程器(第3個參數):interval(long,TimeUnit,Scheduler)
測試結果:延遲3s後發送事件,之後每隔1s産生一個數字發送
intervalRange():
作用
- 快速建立1個被觀察者對象(
)Observable
- 發送事件的特點:每隔指定時間 就發送 事件,可指定發送的資料的數量
3. 發送的事件序列 = 從0開始、無限遞增1的的整數序列
4. 作用類似于
interval()
,但可指定發送的資料的數量
// 參數說明:
// 參數1 = 事件序列起始點;
// 參數2 = 事件數量;
// 參數3 = 第1次事件延遲發送時間;
// 參數4 = 間隔時間數字;
// 參數5 = 時間機關
Observable.intervalRange(3,10,2, 1, TimeUnit.SECONDS)
// 該例子發送的事件序列特點:
// 1. 從3開始,一共發送10個事件;
// 2. 第1次延遲2s發送,之後每隔2秒産生1個數字(從0開始遞增1,無限個)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接配接");
}
// 預設最先調用複寫的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
range()
- 作用
- 快速建立1個被觀察者對象(
)Observable
- 發送事件的特點:連續發送 1個事件序列,可指定範圍
- 快速建立1個被觀察者對象(
a. 發送的事件序列 = 從0開始、無限遞增1的的整數序列
b. 作用類似于
,但差別在于:無延遲發送事件
intervalRange()
// 參數說明:
// 參數1 = 事件序列起始點;
// 參數2 = 事件數量;
// 注:若設定為負數,則會抛出異常
Observable.range(3,10)
// 該例子發送的事件序列特點:從3開始發送,每次發送事件遞增1,一共發送10個事件
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接配接");
}
// 預設最先調用複寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
輸出結果同上。
rangeLong()
- 作用:類似于
,差別在于該方法支援資料類型 =range()
Long
-
具體使用
與
類似,此處不作過多描述range()