天天看點

ObservableObservable

Observable

概述

在ReactiveX中,一個觀察者(Observer)訂閱一個可觀察對象(Observable)。觀察者對Observable發射的資料或資料序列作出響應。這種模式可以極大地簡化并發操作,因為它建立了一個處于待命狀态的觀察者哨兵,在未來某個時刻響應Observable的通知,不需要阻塞等待Observable發射資料。

這篇文章會解釋什麼是響應式程式設計模式(reactive pattern),以及什麼是可觀察對象(Observables)和觀察者(observers),其它幾篇文章會展示如何用操作符組合和改變Observable的行為。

ObservableObservable

相關參考:

  • Single - 一個特殊的Observable,隻發射單個資料。

背景知識

在很多軟體程式設計任務中,或多或少你都會期望你寫的代碼能按照編寫的順序,一次一個的順序執行和完成。但是在ReactiveX中,很多指令可能是并行執行的,之後他們的執行結果才會被觀察者捕獲,順序是不确定的。為達到這個目的,你定義一種擷取和變換資料的機制,而不是調用一個方法。在這種機制下,存在一個可觀察對象(Observable),觀察者(Observer)訂閱(Subscribe)它,當資料就緒時,之前定義的機制就會分發資料給一直處于等待狀态的觀察者哨兵。

這種方法的優點是,如果你有大量的任務要處理,它們互相之間沒有依賴關系。你可以同時開始執行它們,不用等待一個完成再開始下一個(用這種方式,你的整個任務隊列能耗費的最長時間,不會超過任務裡最耗時的那個)。

有很多術語可用于描述這種異步程式設計和設計模式,在在本文裡我們使用這些術語:一個觀察者訂閱一個可觀察對象 (An observer subscribes to an Observable)。通過調用觀察者的方法,Observable發射資料或通知給它的觀察者。

在其它的文檔和場景裡,有時我們也将Observer叫做Subscriber、Watcher、Reactor。這個模型通常被稱作Reactor模式。

建立觀察者

本文使用類似于Groovy的僞代碼舉例,但是ReactiveX有多種語言的實作。

普通的方法調用(不是某種異步方法,也不是Rx中的并行調用),流程通常是這樣的:

  1. 調用某一個方法
  2. 用一個變量儲存方法傳回的結果
  3. 使用這個變量和它的新值做些有用的事

用代碼描述就是:

// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal           

在異步模型中流程更像這樣的:

  1. 定義一個方法,它完成某些任務,然後從異步調用中傳回一個值,這個方法是觀察者的一部分
  2. 将這個異步調用本身定義為一個Observable
  3. 觀察者通過訂閱(Subscribe)操作關聯到那個Observable
  4. 繼續你的業務邏輯,等方法傳回時,Observable會發射結果,觀察者的方法會開始處理結果或結果集
// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business
           

回調方法 (onNext, onCompleted, onError)

Subscribe方法用于将觀察者連接配接到Observable,你的觀察者需要實作以下方法的一個子集:

  • onNext(T item)

    Observable調用這個方法發射資料,方法的參數就是Observable發射的資料,這個方法可能會被調用多次,取決于你的實作。

  • onError(Exception ex)

    當Observable遇到錯誤或者無法傳回期望的資料時會調用這個方法,這個調用會終止Observable,後續不會再調用onNext和onCompleted,onError方法的參數是抛出的異常。

  • onComplete

    正常終止,如果沒有遇到錯誤,Observable在最後一次調用onNext之後調用此方法。

根據Observable協定的定義,onNext可能會被調用零次或者很多次,最後會有一次onCompleted或onError調用(不會同時),傳遞資料給onNext通常被稱作發射,onCompleted和onError被稱作通知。

下面是一個更完整的例子:

def myOnNext     = { item -> /* do something useful with item */ };
def myError      = { throwable -> /* react sensibly to a failed call */ };
def myComplete   = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business
           

取消訂閱 (Unsubscribing)

在一些ReactiveX實作中,有一個特殊的觀察者接口Subscriber,它有一個unsubscribe方法。調用這個方法表示你不關心目前訂閱的Observable了,是以Observable可以選擇停止發射新的資料項(如果沒有其它觀察者訂閱)。

取消訂閱的結果會傳遞給這個Observable的操作符鍊,而且會導緻這個鍊條上的每個環節都停止發射資料項。這些并不保證會立即發生,然而,對一個Observable來說,即使沒有觀察者了,它也可以在一個while循環中繼續生成并嘗試發射資料項。

關于命名約定

ReactiveX的每種特定語言的實作都有自己的命名偏好,雖然不同的實作之間有很多共同點,但并不存在一個統一的命名标準。

而且,在某些場景中,一些名字有不同的隐含意義,或者在某些語言看來比較怪異。

例如,有一個onEvent命名模式(onNext, onCompleted, onError),在一些場景中,這些名字可能意味着事件處理器已經注冊。然而在ReactiveX裡,他們是事件處理器的名字。

Observables的”熱”和”冷”

Observable什麼時候開始發射資料序列?這取決于Observable的實作,一個”熱”的Observable可能一建立完就開始發射資料,是以所有後續訂閱它的觀察者可能從序列中間的某個位置開始接受資料(有一些資料錯過了)。一個”冷”的Observable會一直等待,直到有觀察者訂閱它才開始發射資料,是以這個觀察者可以確定會收到整個資料序列。

在一些ReactiveX實作裡,還存在一種被稱作Connectable的Observable,不管有沒有觀察者訂閱它,這種Observable都不會開始發射資料,除非Connect方法被調用。

用操作符組合Observable

對于ReactiveX來說,Observable和Observer僅僅是個開始,它們本身不過是标準觀察者模式的一些輕量級擴充,目的是為了更好的處理事件序列。

ReactiveX真正強大的地方在于它的操作符,操作符讓你可以變換、組合、操縱和處理Observable發射的資料。

Rx的操作符讓你可以用聲明式的風格組合異步操作序列,它擁有回調的所有效率優勢,同時又避免了典型的異步系統中嵌套回調的缺點。

下面是常用的操作符清單:

  1. 建立操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. 變換操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
  3. 過濾操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  4. 組合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. 錯誤處理 Catch和Retry
  6. 輔助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  7. 條件和布爾操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  8. 算術和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
  9. 轉換操作 To
  10. 連接配接操作 Connect, Publish, RefCount, Replay
  11. 反壓操作 ,用于增加特殊的流程控制政策的操作符

這些操作符并不全都是ReactiveX的核心組成部分,有一些是語言特定的實作或可選的子產品。

RxJava

在RxJava中,一個實作了Observer接口的對象可以訂閱(subscribe)一個Observable 類的執行個體。訂閱者(subscriber)對Observable發射(emit)的任何資料或資料序列作出響應。這種模式簡化了并發操作,因為它不需要阻塞等待Observable發射資料,而是建立了一個處于待命狀态的觀察者哨兵,哨兵在未來某個時刻響應Observable的通知。

繼續閱讀