Subject
Subject可以看成是一個橋梁或者代理,在某些ReactiveX實作中(如RxJava),它同時充當了Observer和Observable的角色。因為它是一個Observer,它可以訂閱一個或多個Observable;又因為它是一個Observable,它可以轉發它收到(Observe)的資料,也可以發射新的資料。
由于一個Subject訂閱一個Observable,它可以觸發這個Observable開始發射資料(如果那個Observable是”冷”的–就是說,它等待有訂閱才開始發射資料)。是以有這樣的效果,Subject可以把原來那個”冷”的Observable變成”熱”的。
Subject的種類
針對不同的場景一共有四種類型的Subject。他們并不是在所有的實作中全部都存在,而且一些實作使用其它的命名約定(例如,在RxScala中Subject被稱作PublishSubject)。
AsyncSubject
一個AsyncSubject隻在原始Observable完成後,發射來自原始Observable的最後一個值。(如果原始Observable沒有發射任何值,AsyncObject也不發射任何值)它會把這最後一個值發射給任何後續的觀察者。
然而,如果原始的Observable因為發生了錯誤而終止,AsyncSubject将不會發射任何資料,隻是簡單的向前傳遞這個錯誤通知。
BehaviorSubject
當觀察者訂閱BehaviorSubject時,它開始發射原始Observable最近發射的資料(如果此時還沒有收到任何資料,它會發射一個預設值),然後繼續發射其它任何來自原始Observable的資料。
然而,如果原始的Observable因為發生了一個錯誤而終止,BehaviorSubject将不會發射任何資料,隻是簡單的向前傳遞這個錯誤通知。
PublishSubject
PublishSubject隻會把在訂閱發生的時間點之後來自原始Observable的資料發射給觀察者。需要注意的是,PublishSubject可能會一建立完成就立刻開始發射資料(除非你可以阻止它發生),是以這裡有一個風險:在Subject被建立後到有觀察者訂閱它之前這個時間段内,一個或多個資料可能會丢失。如果要確定來自原始Observable的所有資料都被分發,你需要這樣做:或者使用Create建立那個Observable以便手動給它引入”冷”Observable的行為(當所有觀察者都已經訂閱時才開始發射資料),或者改用ReplaySubject。
如果原始的Observable因為發生了一個錯誤而終止,PublishSubject将不會發射任何資料,隻是簡單的向前傳遞這個錯誤通知。
ReplaySubject
ReplaySubject會發射所有來自原始Observable的資料給觀察者,無論它們是何時訂閱的。也有其它版本的ReplaySubject,在重放緩存增長到一定大小的時候或過了一段時間後會丢棄舊的資料(原始Observable發射的)。
如果你把ReplaySubject當作一個觀察者使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能導緻同時(非順序)調用,這會違反Observable協定,給Subject的結果增加了不确定性。
RxJava的對應類
假設你有一個Subject,你想把它傳遞給其它的代理或者暴露它的Subscriber接口,你可以調用它的asObservable方法,這個方法傳回一個Observable。具體使用方法可以參考Javadoc文檔。
串行化
如果你把
Subject
當作一個
Subscriber
使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能導緻同時(非順序)調用,這會違反Observable協定,給Subject的結果增加了不确定性。
要避免此類問題,你可以将
Subject
轉換為一個
SerializedSubject
,類似于這樣:
mySafeSubject = new SerializedSubject( myUnsafeSubject );