天天看點

RxJava----Utility 輔助操作(響應式處理)Delay

這個頁面列出了很多用于Observable的輔助操作符

  • materialize( )

     — 将Observable轉換成一個通知清單convert an Observable into a list of Notifications
  • dematerialize( )

     — 将上面的結果逆轉回一個Observable
  • timestamp( )

     — 給Observable發射的每個資料項添加一個時間戳
  • serialize( )

     — 強制Observable按次序發射資料并且要求功能是完好的
  • cache( )

     — 記住Observable發射的資料序列并發射相同的資料序列給後續的訂閱者
  • observeOn( )

     — 指定觀察者觀察Observable的排程器
  • subscribeOn( )

     — 指定Observable執行任務的排程器
  • doOnEach( )

     — 注冊一個動作,對Observable發射的每個資料項使用
  • doOnCompleted( )

     — 注冊一個動作,對正常完成的Observable使用
  • doOnError( )

     — 注冊一個動作,對發生錯誤的Observable使用
  • doOnTerminate( )

     — 注冊一個動作,對完成的Observable使用,無論是否發生錯誤
  • doOnSubscribe( )

     — 注冊一個動作,在觀察者訂閱時使用
  • doOnUnsubscribe( )

     — 注冊一個動作,在觀察者取消訂閱時使用
  • finallyDo( )

     — 注冊一個動作,在Observable完成時使用
  • delay( )

     — 延時發射Observable的結果
  • delaySubscription( )

     — 延時處理訂閱請求
  • timeInterval( )

     — 定期發射資料
  • using( )

     — 建立一個隻在Observable生命周期存在的資源
  • single( )

     — 強制傳回單個資料,否則抛出異常
  • singleOrDefault( )

     — 如果Observable完成時傳回了單個資料,就傳回它,否則傳回預設資料
  • toFuture( )

    toIterable( )

    toList( )

     — 将Observable轉換為其它對象或資料結構

=========================================================

Materialize/Dematerialize

Materialize

将資料項和事件通知都當做資料項發射,

Dematerialize

剛好相反。

RxJava----Utility 輔助操作(響應式處理)Delay

一個合法的有限的Obversable将調用它的觀察者的

onNext

方法零次或多次,然後調用觀察者的

onCompleted

onError

正好一次。

Materialize

操作符将這一系列調用,包括原來的

onNext

通知和終止通知

onCompleted

onError

都轉換為一個Observable發射的資料序列。

RxJava的

materialize

将來自原始Observable的通知轉換為

Notification

對象,然後它傳回的Observable會發射這些資料。

materialize

預設不在任何特定的排程器 (

Scheduler

) 上執行。

  • Javadoc: materialize()
RxJava----Utility 輔助操作(響應式處理)Delay

Dematerialize

操作符是

Materialize

的逆向過程,它将

Materialize

轉換的結果還原成它原本的形式。

dematerialize

反轉這個過程,将原始Observable發射的

Notification

對象還原成Observable的通知。

dematerialize

預設不在任何特定的排程器 (

Scheduler

) 上執行。

  • Javadoc: dematerialize()

Timestamp

給Observable發射的資料項附加一個時間戳

RxJava----Utility 輔助操作(響應式處理)Delay

RxJava中的實作為

timestamp

,它将一個發射T類型資料的Observable轉換為一個發射類型為

Timestamped<T>

的資料的Observable,每一項都包含資料的原始發射時間。

timestamp

預設在

immediate

排程器上執行,但是可以通過參數指定其它的排程器。

  • Javadoc: timestamp()
  • Javadoc: timestamp(Scheduler)

Serialize

強制一個Observable連續調用并保證行為正确

RxJava----Utility 輔助操作(響應式處理)Delay

一個Observable可以異步調用它的觀察者的方法,可能是從不同的線程調用。這可能會讓Observable行為不正确,它可能會在某一個

onNext

調用之前嘗試調用

onCompleted

onError

方法,或者從兩個不同的線程同時調用

onNext

方法。使用

Serialize

操作符,你可以糾正這個Observable的行為,保證它的行為是正确的且是同步的。

RxJava中的實作是

serialize

,它預設不在任何特定的排程器上執行。

  • Javadoc: serialize()

Replay

保證所有的觀察者收到相同的資料序列,即使它們在Observable開始發射資料之後才訂閱

RxJava----Utility 輔助操作(響應式處理)Delay

可連接配接的Observable (connectable Observable)與普通的Observable差不多,不過它并不會在被訂閱時開始發射資料,而是直到使用了

Connect

操作符時才會開始。用這種方法,你可以在任何時候讓一個Observable開始發射資料。

如果在将一個Observable轉換為可連接配接的Observable之前對它使用

Replay

操作符,産生的這個可連接配接Observable将總是發射完整的資料序列給任何未來的觀察者,即使那些觀察者在這個Observable開始給其它觀察者發射資料之後才訂閱。

RxJava----Utility 輔助操作(響應式處理)Delay

RxJava的實作為

replay

,它有多個接受不同參數的變體,有的可以指定

replay

的最大緩存數量,有的還可以指定排程器。

  • Javadoc: replay()
  • Javadoc: replay(int)
  • Javadoc: replay(long,TimeUnit)
  • Javadoc: replay(int,long,TimeUnit)
RxJava----Utility 輔助操作(響應式處理)Delay

有一種 

replay

傳回一個普通的Observable。它可以接受一個變換函數為參數,這個函數接受原始Observable發射的資料項為參數,傳回結果Observable要發射的一項資料。是以,這個操作符其實是

replay

變換之後的資料項。

  • Javadoc: replay(Func1)
  • Javadoc: replay(Func1,int)
  • Javadoc: replay(Func1,long,TimeUnit)
  • Javadoc: replay(Func1,int,long,TimeUnit)

ObserveOn

指定一個觀察者在哪個排程器上觀察這個Observable

RxJava----Utility 輔助操作(響應式處理)Delay

很多ReactiveX實作都使用排程器 “

Scheduler

”來管理多線程環境中Observable的轉場。你可以使用

ObserveOn

操作符指定Observable在一個特定的排程器上發送通知給觀察者 (調用觀察者的

onNext

,

onCompleted

onError

方法)。

RxJava----Utility 輔助操作(響應式處理)Delay

注意:當遇到一個異常時

ObserveOn

會立即向前傳遞這個

onError

終止通知,它不會等待慢速消費的Observable接受任何之前它已經收到但還沒有發射的資料項。這可能意味着

onError

通知會跳到(并吞掉)原始Observable發射的資料項前面,正如圖例上展示的。

SubscribeOn

操作符的作用類似,但它是用于指定Observable本身在特定的排程器上執行,它同樣會在那個排程器上給觀察者發通知。

RxJava中,要指定Observable應該在哪個排程器上調用觀察者的

onNext

onCompleted

onError

方法,你需要使用

observeOn

操作符,傳遞給它一個合适的

Scheduler

  • Javadoc: observeOn(Scheduler)

SubscribeOn

指定Observable自身在哪個排程器上執行

RxJava----Utility 輔助操作(響應式處理)Delay

很多ReactiveX實作都使用排程器 “

Scheduler

”來管理多線程環境中Observable的轉場。你可以使用

SubscribeOn

操作符指定Observable在一個特定的排程器上運轉。

ObserveOn

操作符的作用類似,但是功能很有限,它訓示Observable在一個指定的排程器上給觀察者發通知。

在某些實作中還有一個

UnsubscribeOn

操作符。

  • Javadoc: subscribeOn(Scheduler)
  • Javadoc: unsubscribeOn(Scheduler)

Do

注冊一個動作作為原始Observable生命周期事件的一種占位符

RxJava----Utility 輔助操作(響應式處理)Delay

你可以注冊回調,當Observable的某個事件發生時,Rx會在與Observable鍊關聯的正常通知集合中調用它。Rx實作了多種操作符用于達到這個目的。

RxJava實作了很多

Do

操作符的變體。

doOnEach

RxJava----Utility 輔助操作(響應式處理)Delay

doOnEach

操作符讓你可以注冊一個回調,它産生的Observable每發射一項資料就會調用它一次。你可以以

Action

的形式傳遞參數給它,這個Action接受一個

onNext

的變體

Notification

作為它的唯一參數,你也可以傳遞一個Observable給

doOnEach

,這個Observable的

onNext

會被調用,就好像它訂閱了原始的Observable一樣。

  • Javadoc: doOnEach(Action1)
  • Javadoc: doOnEach(Observer)

doOnNext

RxJava----Utility 輔助操作(響應式處理)Delay

doOnNext

操作符類似于

doOnEach(Action1)

,但是它的Action不是接受一個

Notification

參數,而是接受發射的資料項。

示例代碼

Observable.just(1, 2, 3)
          .doOnNext(new Action1<Integer>() {
          @Override
          public void call(Integer item) {
            if( item > 1 ) {
              throw new RuntimeException( "Item exceeds maximum value" );
            }
          }
        }).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
           

輸出

Next: 1
Error: Item exceeds maximum value
           

doOnSubscribe

RxJava----Utility 輔助操作(響應式處理)Delay

doOnSubscribe

操作符注冊一個動作,當觀察者訂閱它生成的Observable它就會被調用。

  • Javadoc: doOnSubscribe(Action0)

doOnUnsubscribe

RxJava----Utility 輔助操作(響應式處理)Delay

doOnUnsubscribe

操作符注冊一個動作,當觀察者取消訂閱它生成的Observable它就會被調用。

  • Javadoc: doOnUnsubscribe(Action0)

doOnCompleted

RxJava----Utility 輔助操作(響應式處理)Delay

doOnCompleted

 操作符注冊一個動作,當它産生的Observable正常終止調用

onCompleted

時會被調用。

  • Javadoc: doOnCompleted(Action0)

doOnError

RxJava----Utility 輔助操作(響應式處理)Delay

doOnError

 操作符注冊一個動作,當它産生的Observable異常終止調用

onError

時會被調用。

  • Javadoc: doOnError(Action0)

doOnTerminate

RxJava----Utility 輔助操作(響應式處理)Delay

doOnTerminate

 操作符注冊一個動作,當它産生的Observable終止之前會被調用,無論是正常還是異常終止。

  • Javadoc: doOnTerminate(Action0)

finallyDo

RxJava----Utility 輔助操作(響應式處理)Delay

finallyDo

 操作符注冊一個動作,當它産生的Observable終止之後會被調用,無論是正常還是異常終止。

  • Javadoc: finallyDo(Action0)

Delay

延遲一段指定的時間再發射來自Observable的發射物

RxJava----Utility 輔助操作(響應式處理)Delay

Delay

操作符讓原始Observable在發射每項資料之前都暫停一段指定的時間段。效果是Observable發射的資料項在時間上向前整體平移了一個增量。

RxJava的實作是 

delay

delaySubscription

RxJava----Utility 輔助操作(響應式處理)Delay

第一種

delay

接受一個定義時長的參數(包括數量和機關)。每當原始Observable發射一項資料,

delay

就啟動一個定時器,當定時器過了給定的時間段時,

delay

傳回的Observable發射相同的資料項。

注意:

delay

不會平移

onError

通知,它會立即将這個通知傳遞給訂閱者,同時丢棄任何待發射的

onNext

通知。然而它會平移一個

onCompleted

通知。

delay

預設在

computation

排程器上執行,你可以通過參數指定使用其它的排程器。

  • Javadoc: delay(long,TimeUnit)
  • Javadoc: delay()
RxJava----Utility 輔助操作(響應式處理)Delay

另一種

delay

不實用常數延時參數,它使用一個函數針對原始Observable的每一項資料傳回一個Observable,它監視傳回的這個Observable,當任何那樣的Observable終止時,

delay

傳回的Observable就發射關聯的那項資料。

這種

delay

預設不在任何特定的排程器上執行。

  • Javadoc: delay(Func1)
RxJava----Utility 輔助操作(響應式處理)Delay

這個版本的

delay

對每一項資料使用一個Observable作為原始Observable的延時定時器。

這種

delay

預設不在任何特定的排程器上執行。

  • Javadoc: delay(Func0,Func1)
RxJava----Utility 輔助操作(響應式處理)Delay

還有一個操作符

delaySubscription

讓你你可以延遲訂閱原始Observable。它結合搜一個定義延時的參數。

delaySubscription

預設在

computation

排程器上執行,你可以通過參數指定使用其它的排程器。

  • Javadoc: delaySubscription(long,TimeUnit)
  • Javadoc: delaySubscription(long,TimeUnit,Scheduler)
RxJava----Utility 輔助操作(響應式處理)Delay

還有一個版本的

delaySubscription

使用一個Obseable而不是一個固定的時長來設定訂閱延時。

這種

delaySubscription

預設不在任何特定的排程器上執行。

  • Javadoc: delaySubscription(Func0)

TimeInterval

将一個發射資料的Observable轉換為發射那些資料發射時間間隔的Observable

RxJava----Utility 輔助操作(響應式處理)Delay

TimeInterval

操作符攔截原始Observable發射的資料項,替換為發射表示相鄰發射物時間間隔的對象。

RxJava中的實作為

timeInterval

,這個操作符将原始Observable轉換為另一個Obserervable,後者發射一個标志替換前者的資料項,這個标志表示前者的兩個連續發射物之間流逝的時間長度。新的Observable的第一個發射物表示的是在觀察者訂閱原始Observable到原始Observable發射它的第一項資料之間流逝的時間長度。不存在與原始Observable發射最後一項資料和發射

onCompleted

通知之間時長對應的發射物。

timeInterval

預設在

immediate

排程器上執行,你可以通過傳參數修改。

  • Javadoc: timeInterval()
  • Javadoc: timeInterval(Scheduler)

Using

建立一個隻在Observable生命周期記憶體在的一次性資源

RxJava----Utility 輔助操作(響應式處理)Delay

Using

操作符讓你可以訓示Observable建立一個隻在它的生命周期記憶體在的資源,當Observable終止時這個資源會被自動釋放。

RxJava----Utility 輔助操作(響應式處理)Delay

using

操作符接受三個參數:

  1. 一個使用者建立一次性資源的工廠函數
  2. 一個用于建立Observable的工廠函數
  3. 一個用于釋放資源的函數

當一個觀察者訂閱

using

傳回的Observable時,

using

将會使用Observable工廠函數建立觀察者要觀察的Observable,同時使用資源工廠函數建立一個你想要建立的資源。當觀察者取消訂閱這個Observable時,或者當觀察者終止時(無論是正常終止還是因錯誤而終止),

using

使用第三個函數釋放它建立的資源。

using

預設不在任何特定的排程器上執行。

  • Javadoc: using(Func0,Func1,Action1)

First

隻發射第一項(或者滿足某個條件的第一項)資料

RxJava----Utility 輔助操作(響應式處理)Delay

如果你隻對Observable發射的第一項資料,或者滿足某個條件的第一項資料感興趣,你可以使用

First

操作符。

在某些實作中,

First

沒有實作為一個傳回Observable的過濾操作符,而是實作為一個在當時就發射原始Observable指定資料項的阻塞函數。在這些實作中,如果你想要的是一個過濾操作符,最好使用

Take(1)

或者

ElementAt(0)

在一些實作中還有一個

Single

操作符。它的行為與

First

類似,但為了確定隻發射單個值,它會等待原始Observable終止(否則,不是發射那個值,而是以一個錯誤通知終止)。你可以使用它從原始Observable擷取第一項資料,而且也確定隻發射一項資料。

在RxJava中,這個操作符被實作為

first

firstOrDefault

takeFirst

可能容易混淆,

BlockingObservable

也有名叫

first

firstOrDefault

的操作符,它們會阻塞并傳回值,不是立即傳回一個Observable。

還有幾個其它的操作符執行類似的功能。

過濾操作符

RxJava----Utility 輔助操作(響應式處理)Delay

隻發射第一個資料,使用沒有參數的

first

操作符。

示例代碼

Observable.just(1, 2, 3)
          .first()
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
           

輸出

Next: 1
Sequence complete.
           
  • Javadoc: first()

first(Func1)

RxJava----Utility 輔助操作(響應式處理)Delay

傳遞一個謂詞函數給

first

,然後發射這個函數判定為

true

的第一項資料。

  • Javadoc: first(Func1)

firstOrDefault

RxJava----Utility 輔助操作(響應式處理)Delay

firstOrDefault

first

類似,但是在Observagle沒有發射任何資料時發射一個你在參數中指定的預設值。

  • Javadoc: firstOrDefault(T)

firstOrDefault(Func1)

RxJava----Utility 輔助操作(響應式處理)Delay

傳遞一個謂詞函數給

firstOrDefault

,然後發射這個函數判定為

true

的第一項資料,如果沒有資料通過了謂詞測試就發射一個預設值。

  • Javadoc firstOrDefault(T, Func1)

takeFirst

RxJava----Utility 輔助操作(響應式處理)Delay

takeFirst

first

類似,除了這一點:如果原始Observable沒有發射任何滿足條件的資料,

first

會抛出一個

NoSuchElementException

takeFist

會傳回一個空的Observable(不調用

onNext()

但是會調用

onCompleted

)。

  • Javadoc: takeFirst(Func1)

single

RxJava----Utility 輔助操作(響應式處理)Delay

single

操作符也與

first

類似,但是如果原始Observable在完成之前不是正好發射一次資料,它會抛出一個

NoSuchElementException

  • Javadoc: single()

single(Func1)

RxJava----Utility 輔助操作(響應式處理)Delay

single

的變體接受一個謂詞函數,發射滿足條件的單個值,如果不是正好隻有一個資料項滿足條件,會以錯誤通知終止。

  • Javadoc: single(Func1)

singleOrDefault

RxJava----Utility 輔助操作(響應式處理)Delay

firstOrDefault

類似,但是如果原始Observable發射超過一個的資料,會以錯誤通知終止。

  • Javadoc: singleOrDefault(T)

singleOrDefault(T,Func1)

RxJava----Utility 輔助操作(響應式處理)Delay

firstOrDefault(T, Func1)

類似,如果沒有資料滿足條件,傳回預設值;如果有多個資料滿足條件,以錯誤通知終止。

  • Javadoc: singleOrDefault(Func1,T)

first系列的這幾個操作符預設不在任何特定的排程器上執行。

To

将Observable轉換為另一個對象或資料結構

RxJava----Utility 輔助操作(響應式處理)Delay

ReactiveX的很多語言特定實作都有一種操作符讓你可以将Observable或者Observable發射的資料序列轉換為另一個對象或資料結構。它們中的一些會阻塞直到Observable終止,然後生成一個等價的對象或資料結構;另一些傳回一個發射那個對象或資料結構的Observable。

在某些ReactiveX實作中,還有一個操作符用于将Observable轉換成阻塞式的。一個阻塞式的Ogbservable在普通的Observable的基礎上增加了幾個方法,用于操作Observable發射的資料項。

getIterator

RxJava----Utility 輔助操作(響應式處理)Delay

getIterator

操作符隻能用于

BlockingObservable

的子類,要使用它,你首先必須把原始的Observable轉換為一個

BlockingObservable

。可以使用這兩個操作符:

BlockingObservable.from

the Observable.toBlocking

這個操作符将Observable轉換為一個

Iterator

,你可以通過它疊代原始Observable發射的資料集。

  • Javadoc: BlockingObservable.getIterator()

toFuture

RxJava----Utility 輔助操作(響應式處理)Delay

toFuture

操作符也是隻能用于

BlockingObservable

。這個操作符将Observable轉換為一個傳回單個資料項的

Future

,如果原始Observable發射多個資料項,

Future

會收到一個

IllegalArgumentException

;如果原始Observable沒有發射任何資料,

Future

會收到一個

NoSuchElementException

如果你想将發射多個資料項的Observable轉換為

Future

,可以這樣用:

myObservable.toList().toBlocking().toFuture()

  • Javadoc: BlockingObservable.toFuture()

toIterable

RxJava----Utility 輔助操作(響應式處理)Delay

toFuture

操作符也是隻能用于

BlockingObservable

。這個操作符将Observable轉換為一個

Iterable

,你可以通過它疊代原始Observable發射的資料集。

  • Javadoc: BlockingObservable.toIterable()

toList

RxJava----Utility 輔助操作(響應式處理)Delay

通常,發射多項資料的Observable會為每一項資料調用

onNext

方法。你可以用

toList

操作符改變這個行為,讓Observable将多項資料組合成一個

List

,然後調用一次

onNext

方法傳遞整個清單。

如果原始Observable沒有發射任何資料就調用了

onCompleted

toList

傳回的Observable會在調用

onCompleted

之前發射一個空清單。如果原始Observable調用了

onError

toList

傳回的Observable會立即調用它的觀察者的

onError

方法。

toList

預設不在任何特定的排程器上執行。

  • Javadoc: toList()

toMap

RxJava----Utility 輔助操作(響應式處理)Delay

toMap

收集原始Observable發射的所有資料項到一個Map(預設是HashMap)然後發射這個Map。你可以提供一個用于生成Map的Key的函數,還可以提供一個函數轉換資料項到Map存儲的值(預設資料項本身就是值)。

toMap

預設不在任何特定的排程器上執行。

  • Javadoc: toMap(Func1)
  • Javadoc: toMap(Func1,Func1)
  • Javadoc: toMap(Func1,Func1,Func0)

toMultiMap

RxJava----Utility 輔助操作(響應式處理)Delay

toMultiMap

類似于

toMap

,不同的是,它生成的這個Map同時還是一個

ArrayList

(預設是這樣,你可以傳遞一個可選的工廠方法修改這個行為)。

toMultiMap

預設不在任何特定的排程器上執行。

  • Javadoc: toMultiMap(Func1)
  • Javadoc: toMultiMap(Func1,Func1)
  • Javadoc: toMultiMap(Func1,Func1,Func0)
  • Javadoc: toMultiMap(Func1,Func1,Func0,Func1)

toSortedList

RxJava----Utility 輔助操作(響應式處理)Delay

toSortedList

類似于

toList

,不同的是,它會對産生的清單排序,預設是自然升序,如果發射的資料項沒有實作

Comparable

接口,會抛出一個異常。然而,你也可以傳遞一個函數作為用于比較兩個資料項,這是

toSortedList

不會使用

Comparable

接口。

toSortedList

預設不在任何特定的排程器上執行。

  • Javadoc: toSortedList()
  • Javadoc: toSortedList(Func2)

nest

RxJava----Utility 輔助操作(響應式處理)Delay

nest

操作符有一個特殊的用途:将一個Observable轉換為一個發射這個Observable的Observable。

版權聲明:本文為CSDN部落客「weixin_33681778」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。

原文連結:https://blog.csdn.net/weixin_33681778/article/details/92347089