這個頁面列出了很多用于Observable的輔助操作符
-
— 将Observable轉換成一個通知清單convert an Observable into a list of Notificationsmaterialize( )
-
— 将上面的結果逆轉回一個Observabledematerialize( )
-
— 給Observable發射的每個資料項添加一個時間戳timestamp( )
-
— 強制Observable按次序發射資料并且要求功能是完好的serialize( )
-
— 記住Observable發射的資料序列并發射相同的資料序列給後續的訂閱者cache( )
-
— 指定觀察者觀察Observable的排程器observeOn( )
-
— 指定Observable執行任務的排程器subscribeOn( )
-
— 注冊一個動作,對Observable發射的每個資料項使用doOnEach( )
-
— 注冊一個動作,對正常完成的Observable使用doOnCompleted( )
-
— 注冊一個動作,對發生錯誤的Observable使用doOnError( )
-
— 注冊一個動作,對完成的Observable使用,無論是否發生錯誤doOnTerminate( )
-
— 注冊一個動作,在觀察者訂閱時使用doOnSubscribe( )
-
— 注冊一個動作,在觀察者取消訂閱時使用doOnUnsubscribe( )
-
— 注冊一個動作,在Observable完成時使用finallyDo( )
-
— 延時發射Observable的結果delay( )
-
— 延時處理訂閱請求delaySubscription( )
-
— 定期發射資料timeInterval( )
-
— 建立一個隻在Observable生命周期存在的資源using( )
-
— 強制傳回單個資料,否則抛出異常single( )
-
— 如果Observable完成時傳回了單個資料,就傳回它,否則傳回預設資料singleOrDefault( )
-
,toFuture( )
,toIterable( )
— 将Observable轉換為其它對象或資料結構toList( )
=========================================================
Materialize/Dematerialize
Materialize
将資料項和事件通知都當做資料項發射,
Dematerialize
剛好相反。

一個合法的有限的Obversable将調用它的觀察者的
onNext
方法零次或多次,然後調用觀察者的
onCompleted
或
onError
正好一次。
Materialize
操作符将這一系列調用,包括原來的
onNext
通知和終止通知
onCompleted
或
onError
都轉換為一個Observable發射的資料序列。
RxJava的
materialize
将來自原始Observable的通知轉換為
Notification
對象,然後它傳回的Observable會發射這些資料。
materialize
預設不在任何特定的排程器 (
Scheduler
) 上執行。
- Javadoc: materialize()
Dematerialize
操作符是
Materialize
的逆向過程,它将
Materialize
轉換的結果還原成它原本的形式。
dematerialize
反轉這個過程,将原始Observable發射的
Notification
對象還原成Observable的通知。
dematerialize
預設不在任何特定的排程器 (
Scheduler
) 上執行。
- Javadoc: dematerialize()
Timestamp
給Observable發射的資料項附加一個時間戳
RxJava中的實作為
timestamp
,它将一個發射T類型資料的Observable轉換為一個發射類型為
Timestamped<T>
的資料的Observable,每一項都包含資料的原始發射時間。
timestamp
預設在
immediate
排程器上執行,但是可以通過參數指定其它的排程器。
- Javadoc: timestamp()
- Javadoc: timestamp(Scheduler)
Serialize
強制一個Observable連續調用并保證行為正确
一個Observable可以異步調用它的觀察者的方法,可能是從不同的線程調用。這可能會讓Observable行為不正确,它可能會在某一個
onNext
調用之前嘗試調用
onCompleted
或
onError
方法,或者從兩個不同的線程同時調用
onNext
方法。使用
Serialize
操作符,你可以糾正這個Observable的行為,保證它的行為是正确的且是同步的。
RxJava中的實作是
serialize
,它預設不在任何特定的排程器上執行。
- Javadoc: serialize()
Replay
保證所有的觀察者收到相同的資料序列,即使它們在Observable開始發射資料之後才訂閱
可連接配接的Observable (connectable Observable)與普通的Observable差不多,不過它并不會在被訂閱時開始發射資料,而是直到使用了
Connect
操作符時才會開始。用這種方法,你可以在任何時候讓一個Observable開始發射資料。
如果在将一個Observable轉換為可連接配接的Observable之前對它使用
Replay
操作符,産生的這個可連接配接Observable将總是發射完整的資料序列給任何未來的觀察者,即使那些觀察者在這個Observable開始給其它觀察者發射資料之後才訂閱。
RxJava的實作為
replay
,它有多個接受不同參數的變體,有的可以指定
replay
的最大緩存數量,有的還可以指定排程器。
- Javadoc: replay()
- Javadoc: replay(int)
- Javadoc: replay(long,TimeUnit)
- Javadoc: replay(int,long,TimeUnit)
有一種
replay
傳回一個普通的Observable。它可以接受一個變換函數為參數,這個函數接受原始Observable發射的資料項為參數,傳回結果Observable要發射的一項資料。是以,這個操作符其實是
replay
變換之後的資料項。
- Javadoc: replay(Func1)
- Javadoc: replay(Func1,int)
- Javadoc: replay(Func1,long,TimeUnit)
- Javadoc: replay(Func1,int,long,TimeUnit)
ObserveOn
指定一個觀察者在哪個排程器上觀察這個Observable
很多ReactiveX實作都使用排程器 “
Scheduler
”來管理多線程環境中Observable的轉場。你可以使用
ObserveOn
操作符指定Observable在一個特定的排程器上發送通知給觀察者 (調用觀察者的
onNext
,
onCompleted
,
onError
方法)。
注意:當遇到一個異常時
ObserveOn
會立即向前傳遞這個
onError
終止通知,它不會等待慢速消費的Observable接受任何之前它已經收到但還沒有發射的資料項。這可能意味着
onError
通知會跳到(并吞掉)原始Observable發射的資料項前面,正如圖例上展示的。
SubscribeOn
操作符的作用類似,但它是用于指定Observable本身在特定的排程器上執行,它同樣會在那個排程器上給觀察者發通知。
RxJava中,要指定Observable應該在哪個排程器上調用觀察者的
onNext
,
onCompleted
,
onError
方法,你需要使用
observeOn
操作符,傳遞給它一個合适的
Scheduler
。
- Javadoc: observeOn(Scheduler)
SubscribeOn
指定Observable自身在哪個排程器上執行
很多ReactiveX實作都使用排程器 “
Scheduler
”來管理多線程環境中Observable的轉場。你可以使用
SubscribeOn
操作符指定Observable在一個特定的排程器上運轉。
ObserveOn
操作符的作用類似,但是功能很有限,它訓示Observable在一個指定的排程器上給觀察者發通知。
在某些實作中還有一個
UnsubscribeOn
操作符。
- Javadoc: subscribeOn(Scheduler)
- Javadoc: unsubscribeOn(Scheduler)
Do
注冊一個動作作為原始Observable生命周期事件的一種占位符
你可以注冊回調,當Observable的某個事件發生時,Rx會在與Observable鍊關聯的正常通知集合中調用它。Rx實作了多種操作符用于達到這個目的。
RxJava實作了很多
Do
操作符的變體。
doOnEach
doOnEach
操作符讓你可以注冊一個回調,它産生的Observable每發射一項資料就會調用它一次。你可以以
Action
的形式傳遞參數給它,這個Action接受一個
onNext
的變體
Notification
作為它的唯一參數,你也可以傳遞一個Observable給
doOnEach
,這個Observable的
onNext
會被調用,就好像它訂閱了原始的Observable一樣。
- Javadoc: doOnEach(Action1)
- Javadoc: doOnEach(Observer)
doOnNext
doOnNext
操作符類似于
doOnEach(Action1)
,但是它的Action不是接受一個
Notification
參數,而是接受發射的資料項。
示例代碼
Observable.just(1, 2, 3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer item) {
if( item > 1 ) {
throw new RuntimeException( "Item exceeds maximum value" );
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
輸出
Next: 1
Error: Item exceeds maximum value
doOnSubscribe
doOnSubscribe
操作符注冊一個動作,當觀察者訂閱它生成的Observable它就會被調用。
- Javadoc: doOnSubscribe(Action0)
doOnUnsubscribe
doOnUnsubscribe
操作符注冊一個動作,當觀察者取消訂閱它生成的Observable它就會被調用。
- Javadoc: doOnUnsubscribe(Action0)
doOnCompleted
doOnCompleted
操作符注冊一個動作,當它産生的Observable正常終止調用
onCompleted
時會被調用。
- Javadoc: doOnCompleted(Action0)
doOnError
doOnError
操作符注冊一個動作,當它産生的Observable異常終止調用
onError
時會被調用。
- Javadoc: doOnError(Action0)
doOnTerminate
doOnTerminate
操作符注冊一個動作,當它産生的Observable終止之前會被調用,無論是正常還是異常終止。
- Javadoc: doOnTerminate(Action0)
finallyDo
finallyDo
操作符注冊一個動作,當它産生的Observable終止之後會被調用,無論是正常還是異常終止。
- Javadoc: finallyDo(Action0)
Delay
延遲一段指定的時間再發射來自Observable的發射物
Delay
操作符讓原始Observable在發射每項資料之前都暫停一段指定的時間段。效果是Observable發射的資料項在時間上向前整體平移了一個增量。
RxJava的實作是
delay
和
delaySubscription
。
第一種
delay
接受一個定義時長的參數(包括數量和機關)。每當原始Observable發射一項資料,
delay
就啟動一個定時器,當定時器過了給定的時間段時,
delay
傳回的Observable發射相同的資料項。
注意:
delay
不會平移
onError
通知,它會立即将這個通知傳遞給訂閱者,同時丢棄任何待發射的
onNext
通知。然而它會平移一個
onCompleted
通知。
delay
預設在
computation
排程器上執行,你可以通過參數指定使用其它的排程器。
- Javadoc: delay(long,TimeUnit)
- Javadoc: delay()
另一種
delay
不實用常數延時參數,它使用一個函數針對原始Observable的每一項資料傳回一個Observable,它監視傳回的這個Observable,當任何那樣的Observable終止時,
delay
傳回的Observable就發射關聯的那項資料。
這種
delay
預設不在任何特定的排程器上執行。
- Javadoc: delay(Func1)
這個版本的
delay
對每一項資料使用一個Observable作為原始Observable的延時定時器。
這種
delay
預設不在任何特定的排程器上執行。
- Javadoc: delay(Func0,Func1)
還有一個操作符
delaySubscription
讓你你可以延遲訂閱原始Observable。它結合搜一個定義延時的參數。
delaySubscription
預設在
computation
排程器上執行,你可以通過參數指定使用其它的排程器。
- Javadoc: delaySubscription(long,TimeUnit)
- Javadoc: delaySubscription(long,TimeUnit,Scheduler)
還有一個版本的
delaySubscription
使用一個Obseable而不是一個固定的時長來設定訂閱延時。
這種
delaySubscription
預設不在任何特定的排程器上執行。
- Javadoc: delaySubscription(Func0)
TimeInterval
将一個發射資料的Observable轉換為發射那些資料發射時間間隔的Observable
TimeInterval
操作符攔截原始Observable發射的資料項,替換為發射表示相鄰發射物時間間隔的對象。
RxJava中的實作為
timeInterval
,這個操作符将原始Observable轉換為另一個Obserervable,後者發射一個标志替換前者的資料項,這個标志表示前者的兩個連續發射物之間流逝的時間長度。新的Observable的第一個發射物表示的是在觀察者訂閱原始Observable到原始Observable發射它的第一項資料之間流逝的時間長度。不存在與原始Observable發射最後一項資料和發射
onCompleted
通知之間時長對應的發射物。
timeInterval
預設在
immediate
排程器上執行,你可以通過傳參數修改。
- Javadoc: timeInterval()
- Javadoc: timeInterval(Scheduler)
Using
建立一個隻在Observable生命周期記憶體在的一次性資源
Using
操作符讓你可以訓示Observable建立一個隻在它的生命周期記憶體在的資源,當Observable終止時這個資源會被自動釋放。
using
操作符接受三個參數:
- 一個使用者建立一次性資源的工廠函數
- 一個用于建立Observable的工廠函數
- 一個用于釋放資源的函數
當一個觀察者訂閱
using
傳回的Observable時,
using
将會使用Observable工廠函數建立觀察者要觀察的Observable,同時使用資源工廠函數建立一個你想要建立的資源。當觀察者取消訂閱這個Observable時,或者當觀察者終止時(無論是正常終止還是因錯誤而終止),
using
使用第三個函數釋放它建立的資源。
using
預設不在任何特定的排程器上執行。
- Javadoc: using(Func0,Func1,Action1)
First
隻發射第一項(或者滿足某個條件的第一項)資料
如果你隻對Observable發射的第一項資料,或者滿足某個條件的第一項資料感興趣,你可以使用
First
操作符。
在某些實作中,
First
沒有實作為一個傳回Observable的過濾操作符,而是實作為一個在當時就發射原始Observable指定資料項的阻塞函數。在這些實作中,如果你想要的是一個過濾操作符,最好使用
Take(1)
或者
ElementAt(0)
。
在一些實作中還有一個
Single
操作符。它的行為與
First
類似,但為了確定隻發射單個值,它會等待原始Observable終止(否則,不是發射那個值,而是以一個錯誤通知終止)。你可以使用它從原始Observable擷取第一項資料,而且也確定隻發射一項資料。
在RxJava中,這個操作符被實作為
first
,
firstOrDefault
和
takeFirst
。
可能容易混淆,
BlockingObservable
也有名叫
first
和
firstOrDefault
的操作符,它們會阻塞并傳回值,不是立即傳回一個Observable。
還有幾個其它的操作符執行類似的功能。
過濾操作符
隻發射第一個資料,使用沒有參數的
first
操作符。
示例代碼
Observable.just(1, 2, 3)
.first()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
輸出
Next: 1
Sequence complete.
- Javadoc: first()
first(Func1)
傳遞一個謂詞函數給
first
,然後發射這個函數判定為
true
的第一項資料。
- Javadoc: first(Func1)
firstOrDefault
firstOrDefault
與
first
類似,但是在Observagle沒有發射任何資料時發射一個你在參數中指定的預設值。
- Javadoc: firstOrDefault(T)
firstOrDefault(Func1)
傳遞一個謂詞函數給
firstOrDefault
,然後發射這個函數判定為
true
的第一項資料,如果沒有資料通過了謂詞測試就發射一個預設值。
- Javadoc firstOrDefault(T, Func1)
takeFirst
takeFirst
與
first
類似,除了這一點:如果原始Observable沒有發射任何滿足條件的資料,
first
會抛出一個
NoSuchElementException
,
takeFist
會傳回一個空的Observable(不調用
onNext()
但是會調用
onCompleted
)。
- Javadoc: takeFirst(Func1)
single
single
操作符也與
first
類似,但是如果原始Observable在完成之前不是正好發射一次資料,它會抛出一個
NoSuchElementException
。
- Javadoc: single()
single(Func1)
single
的變體接受一個謂詞函數,發射滿足條件的單個值,如果不是正好隻有一個資料項滿足條件,會以錯誤通知終止。
- Javadoc: single(Func1)
singleOrDefault
和
firstOrDefault
類似,但是如果原始Observable發射超過一個的資料,會以錯誤通知終止。
- Javadoc: singleOrDefault(T)
singleOrDefault(T,Func1)
和
firstOrDefault(T, Func1)
類似,如果沒有資料滿足條件,傳回預設值;如果有多個資料滿足條件,以錯誤通知終止。
- Javadoc: singleOrDefault(Func1,T)
first系列的這幾個操作符預設不在任何特定的排程器上執行。
To
将Observable轉換為另一個對象或資料結構
ReactiveX的很多語言特定實作都有一種操作符讓你可以将Observable或者Observable發射的資料序列轉換為另一個對象或資料結構。它們中的一些會阻塞直到Observable終止,然後生成一個等價的對象或資料結構;另一些傳回一個發射那個對象或資料結構的Observable。
在某些ReactiveX實作中,還有一個操作符用于将Observable轉換成阻塞式的。一個阻塞式的Ogbservable在普通的Observable的基礎上增加了幾個方法,用于操作Observable發射的資料項。
getIterator
getIterator
操作符隻能用于
BlockingObservable
的子類,要使用它,你首先必須把原始的Observable轉換為一個
BlockingObservable
。可以使用這兩個操作符:
BlockingObservable.from
或
the Observable.toBlocking
。
這個操作符将Observable轉換為一個
Iterator
,你可以通過它疊代原始Observable發射的資料集。
- Javadoc: BlockingObservable.getIterator()
toFuture
toFuture
操作符也是隻能用于
BlockingObservable
。這個操作符将Observable轉換為一個傳回單個資料項的
Future
,如果原始Observable發射多個資料項,
Future
會收到一個
IllegalArgumentException
;如果原始Observable沒有發射任何資料,
Future
會收到一個
NoSuchElementException
。
如果你想将發射多個資料項的Observable轉換為
Future
,可以這樣用:
myObservable.toList().toBlocking().toFuture()
。
- Javadoc: BlockingObservable.toFuture()
toIterable
toFuture
操作符也是隻能用于
BlockingObservable
。這個操作符将Observable轉換為一個
Iterable
,你可以通過它疊代原始Observable發射的資料集。
- Javadoc: BlockingObservable.toIterable()
toList
通常,發射多項資料的Observable會為每一項資料調用
onNext
方法。你可以用
toList
操作符改變這個行為,讓Observable将多項資料組合成一個
List
,然後調用一次
onNext
方法傳遞整個清單。
如果原始Observable沒有發射任何資料就調用了
onCompleted
,
toList
傳回的Observable會在調用
onCompleted
之前發射一個空清單。如果原始Observable調用了
onError
,
toList
傳回的Observable會立即調用它的觀察者的
onError
方法。
toList
預設不在任何特定的排程器上執行。
- Javadoc: toList()
toMap
toMap
收集原始Observable發射的所有資料項到一個Map(預設是HashMap)然後發射這個Map。你可以提供一個用于生成Map的Key的函數,還可以提供一個函數轉換資料項到Map存儲的值(預設資料項本身就是值)。
toMap
預設不在任何特定的排程器上執行。
- Javadoc: toMap(Func1)
- Javadoc: toMap(Func1,Func1)
- Javadoc: toMap(Func1,Func1,Func0)
toMultiMap
toMultiMap
類似于
toMap
,不同的是,它生成的這個Map同時還是一個
ArrayList
(預設是這樣,你可以傳遞一個可選的工廠方法修改這個行為)。
toMultiMap
預設不在任何特定的排程器上執行。
- Javadoc: toMultiMap(Func1)
- Javadoc: toMultiMap(Func1,Func1)
- Javadoc: toMultiMap(Func1,Func1,Func0)
- Javadoc: toMultiMap(Func1,Func1,Func0,Func1)
toSortedList
toSortedList
類似于
toList
,不同的是,它會對産生的清單排序,預設是自然升序,如果發射的資料項沒有實作
Comparable
接口,會抛出一個異常。然而,你也可以傳遞一個函數作為用于比較兩個資料項,這是
toSortedList
不會使用
Comparable
接口。
toSortedList
預設不在任何特定的排程器上執行。
- Javadoc: toSortedList()
- Javadoc: toSortedList(Func2)
nest
nest
操作符有一個特殊的用途:将一個Observable轉換為一個發射這個Observable的Observable。
版權聲明:本文為CSDN部落客「weixin_33681778」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。
原文連結:https://blog.csdn.net/weixin_33681778/article/details/92347089