作者:晉中望
接上篇RxJava && Agera 從源碼簡要分析基本調用流程(1)我們從"1.訂閱過程"、“2.變換過程”進行分析,下篇文章我們繼續分析"3.線程切換過程"
從上文中我們知道了RxJava能夠幫助我們對資料流進行靈活的變換,以達到鍊式結構操作的目的,然而它的強大不止于此。下面我們就來看看它的又一利器,排程器<code>Scheduler</code>:就像我們所知道的,<code>Scheduler</code>是給<code>Observable</code>資料流添加多線程功能所準備的,一般我們會通過使用<code>subscribeOn()</code>、<code>observeOn()</code>方法傳入對應的<code>Scheduler</code>去指定資料流的每部分操作應該以何種方式運作在何種線程。對于我們而言,最常見的莫過于在非主線程擷取并處理資料之後在主線程更新UI這樣的場景了:

這是我們十分常見的調用方法,一氣呵成就把不同線程之間的處理都搞定了,因為是鍊式是以結構也很清晰,我們現在來看看這其中的線程切換流程。
<code>subscribeOn()</code>
當我們調用<code>subscribeOn()</code>的時候:
可以看到這裡也是調用了<code>create()</code>去生成一個<code>Observable</code>,而 <code>OperatorSubscribeOn</code>則是實作了<code>OnSubscribe</code>接口,同時将原始的<code>Observable</code>和我們需要的<code>scheduler</code>傳入:
可以看出來,這裡對<code>subscriber</code>的處理與前文中<code>OperatorMap</code>中<code>call()</code>對<code>subscriber</code>的處理很相似。在這裡我們同樣會根據傳入的<code>subscriber</code>構造出新的<code>Subscribers</code>,不過這一系列的過程大部分都是由worker通過<code>schedule()</code>去執行的,從後面<code>setProducer()</code>中對于線程的判斷,再結合<code>subscribeOn()</code>方法的目的我們能大概推測出,這個worker在一定程度上就相當于一個新線程的代理執行者,<code>schedule()</code>所實作的與Thread類中run()應該十分類似。我們現在來看看這個worker的執行過程。
首先從<code>Schedulers.io()</code>進入:
這個通過hook拿到scheduler的過程我們先不管,直接進<code>CachedThreadScheduler</code>,看它的<code>createWorker()</code>方法:
這裡的pool是一個原子變量引用<code>AtomicReference</code>,所持有的則是<code>CachedWorkerPool</code>,因而這個pool顧名思義就是用來儲存worker的緩存池啦,我們從緩存池裡拿到需要的worker并作了一層封裝成為<code>EventLoopWorker</code>:
在這裡我們終于發現目标<code>ThreadWorker</code>,它繼承自<code>NewThreadWorker</code>,之前的schedule()方法最終都會到這個<code>scheduleActual()</code>方法裡:
這裡我們看到了executor線程池,我們用<code>Schedulers.io()</code>最終實作的線程切換的本質就在這裡了。現在再結合之前的過程我們從頭梳理一下:
在<code>subscribeOn()</code>時,我們會新生成一個<code>Observable</code>,它的成員<code>onSubscribe</code>會在目标<code>Subscriber</code>訂閱時使用傳入的<code>Scheduler</code>的worker作為線程排程執行者,在對應的線程中通知原始<code>Observable</code>發送消息給這個過程中臨時生成的<code>Subscriber</code>,這個<code>Subscriber</code>又會通知到目标Subscriber,這樣就完成了<code>subscribeOn()</code>的過程。
<code>observeOn()</code>
下面我們接着來看看<code>observeOn()</code>:
我們直接看最終調用的部分,可以看到這裡又是一個<code>lift()</code>,在這裡傳入了<code>OperatorObserveOn</code>,它與<code>OperatorSubscribeOn</code>不同,是一個<code>Operator</code>(<code>Operator</code>的功能我們上文中已經講過就不贅述了),它構造出了新的觀察者<code>ObserveOnSubscriber</code>并實作了<code>Action0</code>接口:
可以看出來,這裡<code>ObserveOnSubscriber</code>所有的發送給目标<code>Subscriber child</code>的消息都被切換到了<code>recursiveScheduler</code>的線程作處理,也就達到了将線程切回的目的。
總結<code>observeOn()</code>整體流程如下:
對比<code>subscribeOn()</code>和<code>observeOn()</code>這兩個過程,我們不難發現兩者的差別:<code>subscribeOn()</code>将初始<code>Observable</code>的訂閱事件整體都切換到了另一個線程;而<code>observeOn()</code>則是将初始<code>Observable</code>發送的消息切換到另一個線程通知到目标Subscriber。前者把 “訂閱 + 發送” 的切換了一個線程,後者把 “發送” 切換了一個線程。是以,我們的代碼中所實作的功能其實是:
這樣就能很容易實作耗時任務在子線程操作,在主線程作更新操作等這些常見場景的功能啦。
Subject
Subject在Rx系列是一個比較特殊的角色,它繼承了Observable的同時也實作了Observer接口,也就是說它既可作為觀察者,也可作為被觀察者,他一般被用來作為連接配接多個不同Observable、Observer之間的紐帶。可能你會奇怪,我們不是已經有了像<code>map()</code>、<code>flatMap()</code>這類的操作符去變化 Observable資料流了嗎,為什麼還要引入Subject這個東西呢?這是因為Subject所承擔的工作并非是針對Observable資料流内容的轉換連接配接,而是資料流本身在Observable、Observer之間的排程。光這麼說可能還是很模糊,我們舉個《RxJava Essentials》中的例子:
我們通過<code>create()</code>建立了一個PublishSubject,觀察者成功訂閱了這個subject,然而這個subject卻沒有任何資料要發送,我們隻是知道他未來會發送的會是String值而已。之後,當我們調用<code>subject.onNext()</code>時,消息才被發送,Observer的<code>onNext()</code>被觸發調用,輸出了"Hello World"。
這裡我們注意到,當訂閱事件發生時,我們的subject是沒有産生資料流的,直到它發射了"Hello World",資料流才開始運轉,試想我們如果将訂閱過程和<code>subject.onNext()</code>調換一下位置,那麼Observer就一定不會接受到"Hello World"了(這不是廢話嗎- -|||),因而這也在根本上反映了Observable的冷熱差別。
一般而言,我們的Observable都屬于Cold Observables,就像看視訊,每次點開新視訊我們都要從頭開始播放;而<code>Subject</code>則預設屬于Hot Observables,就像看直播,視訊資料永遠都是新的。
基于這種屬性,<code>Subject</code>自然擁有了對接收到的資料流進行選擇排程等的能力了,是以,我們對于<code>Subject</code>的使用也就通常基于如下的思路:
在前面的例子裡我們用到的是PublishSubject,它隻會把在訂閱發生的時間點之後來自原始Observable的資料發射給觀察者。等一下,這功能聽起來是不是有些似曾相識呢?
沒錯,就是EventBus和Otto。(RxJava的出現慢慢讓Otto退出了舞台,現在Otto的Repo已經是Deprecated狀态了,而EventBus依舊堅挺)基于RxJava的觀察訂閱取消的能力和PublishSubject的功能,我們十分容易就能寫出實作了最基本功能的簡易事件總線架構:
當然Subject還有其他如<code>BehaviorSubject</code>、<code>ReplaySubject</code>、<code>AsyncSubject</code>等類型,大家可以去看官方文檔,寫得十分詳細,這裡就不介紹了。
前面相信最近這段日子裡,提到RxJava,大家就會想到Google最近剛剛開源的Agera。Agera作為專門為Android打造的Reactive Programming架構,難免會被拿來與RxJava做對比。本文前面RxJava的主體流程分析已近尾聲,現在我們再來看看Agera這東東又是怎麼一回事。
首先先上結論:
Agera最初是為了Google Play Movies而開發的一個内部架構,現在開源出來了,它雖然是在RxJava之後才出現,但是完全獨立于RxJava,與它沒有任何關系(隻不過開源的時間十分微妙罷了233333)。 與RxJava比起來,Agera更加專注于Android的生命周期,而RxJava則更加純粹地面向Java平台而非Android。
也許你可能會問:“那麼RxAndroid呢,不是還有它嗎?”事實上,RxAndroid早在1.0版本的時候就進行了很大的重構,很多子產品被拆分到其他的項目中去了,同時也删除了部分代碼,僅存下來的部分多是和Android線程相關的部分,比如<code>AndroidSchedulers</code>、<code>MainThreadSubscription</code>等。鑒于這種情況,我們暫且不去關注RxAndroid,先把目光放在Agera上。
同樣也是基于觀察者模式,Agera和RxJava的角色分類大緻相似,在Agera中,主要角色有兩個:<code>Observable</code>(被觀察者)、<code>Updatable</code>(觀察者)。
是的,相較于RxJava中的<code>Observable</code>,Agera中的<code>Observable</code>隻是一個簡單的接口,也沒有範性的存在,Updatable亦是如此,這樣我們要如何做到消息的傳遞呢?這就需要另外一個接口了:
終于看到了泛型T,我們的消息的傳遞能力就是依賴于此接口了。是以我們将這個接口和基礎的<code>Observable</code>結合一下:
這裡的<code>Repository<T></code>在一定程度上就是我們想要的RxJava中的<code>Observable<T></code>啦。類似地,Repository<T>也有兩種類型的實作:
Direct - 所包含的資料總是可用的或者是可被同步計算出來的;一個Direct的Repository總是處于活躍(active)狀态下
Deferred - 所包含的資料是異步計算或拉去所得;一個Deffered的Repository直到有Updatable被添加進來之前都會是非活躍(inactive)狀态下
是不是感到似曾相識呢?沒錯,Repository也是有冷熱區分的,不過我們現在暫且不去關注這一點。回到上面接着看,既然現在發資料的角色有了,那麼我們要如何接收資料呢?答案就是<code>Receiver</code>:
相信看到這裡,大家應該也隐約感覺到了:在Agera的世界裡,資料的傳輸與事件的傳遞是互相隔離開的,這是目前Agera與Rx系列的最大學質差別。Agera所使用的是一種push event, pull data的模型,這意味着event并不會攜帶任何data,<code>Updatable</code>在需要更新時,它自己會承擔起從資料源拉取資料的任務。這樣,提供資料的責任就從<code>Observable</code>中拆分了出來交給了<code>Repository</code>,讓其自身能夠專注于發送一些簡單的事件如按鈕點選、一次下拉重新整理的觸發等等。
那麼,這樣的實作有什麼好處呢?
當這兩種處理分發邏輯分離開時,<code>Updatable</code>就不必觀察到來自<code>Repository</code>的完整資料變化的曆史,畢竟在大多數場景下,尤其是更新UI的場景下,最新的資料往往才是有用的資料。
但是我就是需要看到變化的曆史資料,怎麼辦?
不用擔心,這裡我們再請出一個角色<code>Reservoir</code>:
顧名思義,<code>Reservoir</code>就是我們用來存儲變化中的資料的地方,它繼承了<code>Receiver</code>、<code>Repository</code>,也就相當于同時具有了接收資料,發送資料的能力。通過檢視其具體實作我們可以知道它的本質操作都是使用内部的Queue實作的:通過accept()接收到資料後入列,通過<code>get()</code>拿到資料後出列。若一個<code>Updatable</code>觀察了此<code>Reservoir</code>,其隊列中發生排程變化後即将出列的下一個資料如果是可用的(非空),就會通知該Updatable,進一步拉取這個資料發送給<code>Receiver</code>。
現在,我們已經大概了解了這幾個角色的功能屬性了,接下來我們來看一段官方示例代碼:
是不是有些雲裡霧裡的感覺呢?多虧有注釋,我們大概能夠猜出到底上面都做了什麼:使用需要的圖檔規格作為參數拼接到url中,拉取對應的圖檔并用ImageView顯示出來。我們結合API來看看整個過程:
Repositories.repositoryWithInitialValue(Result.<Bitmap>absent())
建立一個可運作(抑或說執行)的repository。
初始化傳入值是Result,它用來概括一些諸如<code>apply()</code>、<code>merge()</code>的操作的結果的不可變對象,并且存在兩種狀态<code>succeeded()</code>、<code>failed()</code>。
傳回REventSource
observe()
用于添加新的Observable作為更新我們的圖檔的Event source,本例中不需要。
傳回<code>RFrequency</code>
onUpdatesPerLoop()
在每一個Looper Thread loop中若有來自多個Event Source的update()處理時,隻需開啟一個資料處理流。
傳回RFlow
getFrom(new Supplier(…))
忽略輸入值,使用來自給定Supplier的新擷取的資料作為輸出值。
goTo(executor)
切換到給定的executor繼續資料處理流。
attemptTransform(function())
使用給定的function()變換輸入值,若變換失敗,則終止資料流;若成功,則取新的變換後的值作為目前流指令的輸出。
傳回RTermination
orSkip()
若前面的操作檢查為失敗,就跳過剩下的資料處理流,并且不會通知所有已添加的Updatable。
thenTransform(function())
與attemptTransform(function())相似,差別在于當必要時會發出通知。
傳回RConfig
onDeactivation(SEND_INTERRUPT)
用于明确repository不再active時的行為。
compile()
執行這個repository。
傳回Repository
整體流程乍看起來并沒有什麼特别的地方,但是真正的玄機其實藏在執行每一步的傳回值裡:
初始的<code>REventSource<T, T></code>代表着事件源的開端,它從傳入值接收了<code>T initialValue</code>,這裡的<T, T>中,第一個T是目前repository的資料的類型,第二個T則是資料處理流開端的時候的資料的類型。
之後,當observe()調用後,我們傳入事件源給<code>REventSource</code>,相當于設定好了需要的事件源和對應的開端,這裡傳回的是<code>RFrequency<T, T></code>,它繼承自<code>REventSource</code>,為其添加了事件源的發送頻率的屬性。
之後,我們來到了<code>onUpdatesPerLoop()</code>,這裡明确了所開啟的資料流的個數(也就是前面所講的頻率)後,傳回了RFlow,這裡也就意味着我們的資料流正式生成了。同時,這裡也是流式調用的起點。
拿到我們的RFlow之後,我們就可以為其提供資料源了,也就是前面說的<code>Supplier</code>,于是調用<code>getFrom()</code>,這樣我們的資料流也就真正意義擁有了資料“幹貨”。
有了資料之後我們就可以按具體需要進行資料轉換了,這裡我們可以直接使用<code>transform()</code>,傳回RFlow,以便進一步進行流式調用;也可以調用attemptTransform()來對可能出現的異常進行處理,比如orSkip()、orEnd()之後繼續進行流式調用。
經過一系列的流式調用之後,我們終于對資料處理完成啦,現在我們可以選擇先對成型的資料在做一次最後的包裝<code>thenTransform()</code>,或是與另一個Supplier合并<code>thenMergeIn()</code>等。這些處理之後,我們的傳回值也就轉為了RConfig,進入了最終配置和repository聲明結束的狀态。
在最終的這個配置過程中,我們調用了<code>onDeactivation()</code>,為這個repository明确了最終進入非活躍狀态時的行為,如果不需要其他多餘的配置的話,我們就可以進入最終的<code>compile()</code>方法了。當我們調用<code>compile()</code>時,就會按照前面所走過的所有流程與配置去執行并生成這個repository。到此,我們的repository才真正被建立了出來。
以上就是repository從無到有的全過程。當repository誕生後,我們也就可以傳輸需要的資料啦。再回到上面的示例代碼:
我們在<code>onResume()</code>、<code>onPause()</code>這兩個生命周期下分别添加、移除了Updatable。相較于RxJava中通過Subscription去取消訂閱的做法,Agera的這種寫法顯然更為清晰也更為整潔。我們的Activity實作了Updatable和Receiver接口,直接看其實作方法:
可以看到這裡<code>repository</code>将資料發送給了<code>receiver</code>,也就是自己,在對應的accept()方法中接收到我們想要的bitmap後,這張圖檔也就顯示出來了,示例代碼中的完整流程也就結束了。
總結一下上述過程:
首先<code>Repositories.repositoryWithInitialValue()</code>生成原點REventSource。
配置完Observable之後進入RFrequency狀态,接着配置資料流的流數。
前面配置完成後,資料流RFlow生成,之後通過<code>getFrom()</code>、<code>mergeIn()</code>、transform()等方法可進一步進行流式調用;也可以使用<code>attemptXXX()</code>方法代替原方法,後面接着調用<code>orSkip()</code>、<code>orEnd()</code>進行<code>error handling</code>處理。當使用<code>attemptXXX()</code>方法時,資料流狀态會變為RTermination,它代表此時的狀态已具有終結資料流的能力,是否終結資料流要根據failed check觸發,結合後面跟着調用的<code>orSkip()</code>、<code>orEnd()</code>,我們的資料流會從<code>RTermination</code>再次切換為<code>RFlow</code>,以便進行後面的流式調用。
經過前面一系列的流式處理,我們需要結束資料流時,可以選擇調用<code>thenXXX()</code>方法,對資料流進行最終的處理,處理之後,資料流狀态會變為 RConfig;也可以為此行為添加error handling處理,選擇<code>thenAttemptXXX()</code>方法,後面同樣接上<code>orSkip()</code>、<code>orEnd()</code>即可,最終資料流也會轉為Rconfig狀态。
此時,我們可以在結束前按需要選擇對資料流進行最後的配置,例如:調用<code>onDeactivation()</code>配置從“訂閱”到“取消訂閱”的過程是否需要繼續執行資料流等等。
一切都部署完畢後,我們<code>compile()</code>這個RConfig,得到最終的成型的Repository,它具有添加Updatable、發送資料通知Receiver的能力。
我們根據需要添加<code>Updatable</code>,<code>repository</code>在資料流處理完成後會通過<code>update()</code>發送event通知<code>Updatable</code>。
Updatable收到通知後則會拉取repository的成果資料,并将資料通過accept()發送給Receiver。完成 Push event, pull data 的流程。
以上就是一次Agera的流式調用的内部基本流程。可以看到,除了 Push event, pull data 這一特點、goLazy的加載模式(本文未介紹)等,依托于較為精簡的方法,Agera的流式調用過程同樣也能夠做到過程清晰,并且上手難度相較于RxJava也要簡單一些,開源作者是Google的團隊也讓一些G粉對其好感度提升不少。不過Agera在本文撰寫時則是 agera-1.0.0-rc2,未來的版本還有很多不确定因素,相比之下Rx系列發展了這麼久,架構已經相對成熟。究竟用Agera還是RxJava,大家按自己的喜好選擇吧。
新人處女作,文章中難免會有錯誤遺漏以及表述不清晰的地方,希望大家多多批評指正,謝謝!
參考&拓展:
RxJava Wiki
Agera Wiki
給 Android 開發者的 RxJava 詳解
Google Agera vs. ReactiveX
When Iron Man becomes reactive
Top 7 Tips for RxJava on Android
How to Keep your RxJava Subscribers from Leaking
RxJava – the production line
文章來源公衆号:QQ空間終端開發團隊(qzonemobiledev)
相關推薦