天天看點

《Storm分布式實時計算模式》——3.3 Trident spout

本節書摘來自華章計算機《storm分布式實時計算模式》一書中的第3章,第3.3節,作者:(美)p. taylor goetz brian o’neill 更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。

讓我們先來看topology中的spout。和storm相比,trident引入了“資料批次”(batch)的概念。不像storm的spout,trident spout必須成批地發送tuple。

每個batch會配置設定一個唯一的事務辨別符。spout基于約定決定batch的組成方式。spout有三種約定:非事務型(non-transactional)、事務型(transactional)、非透明型(opaque)。

非事務型spout對batch的組成部分不提供保障,并且可能出現重複。兩個不同的batch可能含有相同的tuple。事務型spout保證batch是非重複的,并且batch總是包含相同的tuple。非透明型spout保證資料是非重複的,但不能保證batch的内容是不變的。

表3-1描述了這些特性。

《Storm分布式實時計算模式》——3.3 Trident spout

spout接口如下面代碼片段所示:

《Storm分布式實時計算模式》——3.3 Trident spout

在trident中,spout沒有真的發射tuple,而是把這項工作分解給了batchcoordinator和emitter方法。emitter負責發送tuple,batchcoordinator負責管理批次和中繼資料,emitter需要依靠中繼資料來恰當地進行批次的資料重放。tridentspout函數僅僅是簡單地提供了到batchcoordinator和emitter的通路方法,并且聲明發射的tuple包括哪些字段。下面列出了示例中的diagnosiseventspout方法:

《Storm分布式實時計算模式》——3.3 Trident spout

https://yqfile.alicdn.com/5982a1410c9ced7b857f57df5b027a5f2acbd30b.png

" >

如上述代碼中的getoutputfields()方法所示,在我們的執行個體topology中,spout發射一個字段event,值是一個diagnosisevent類。

batchcoordinator類實作下述接口:

《Storm分布式實時計算模式》——3.3 Trident spout

https://yqfile.alicdn.com/543b295a73f41c1b15d2ebf08d7fe93c925607da.png

batchcoordinator是一個泛型類。這個泛型類是重放一個batch所需要的中繼資料。在本例中,spout發送随機事件,是以中繼資料可以忽略。實際系統中,中繼資料可能包含組成了這個batch的消息或者對象的辨別符。通過這個資訊,非透明型和事務型spout可以實作約定,確定batch的内容不出現重複,在事務型spout中,batch的内容不會出現變化。

batchcoordinator類作為一個storm bolt運作在一個單線程中。storm會在zookeeper中持久化存儲這個中繼資料。當事務處理完成時會通知到對應的coordinator。

在我們的例子中,沒有做特定的協調操作,下面就是diagnosiseventspout類中使用的協調操作:

《Storm分布式實時計算模式》——3.3 Trident spout
《Storm分布式實時計算模式》——3.3 Trident spout

trident spout的第二個組成部分是emitter。在storm裡,spout使用collector來發送tuple,emmiter函數在trident spout中執行這種功能。唯一的差別是,使用tridentcollector類,發送出去的tuple是通過batchcoordinator類初始化的一批資料。

emitter方法的接口格式如下所示:

《Storm分布式實時計算模式》——3.3 Trident spout

如前面代碼所示,emitter函數隻有一個功能,将tuple打包發射出去。為了實作這個功能,函數接收的參數包括batch(由coordinator生成的)的中繼資料、事務的資訊和emitter用來發送tuple的collector。diagnosiseventemitter類的代碼如下所示:

《Storm分布式實時計算模式》——3.3 Trident spout

https://yqfile.alicdn.com/41005299c40f1557e89f4faa0a3025c6b13f2f5d.png

《Storm分布式實時計算模式》——3.3 Trident spout

發送的工作在emitbatch()中進行。例子中,我們随機配置設定一個經度和緯度,大體保持在美國範圍内,使用system.currenttimemillis()方法生成診斷的時間戳。

實際場景中,icd-9-cm的範圍在000到999之間。針對本示例,我們僅使用320到327之間的診斷代碼。這些代碼如下所示:

《Storm分布式實時計算模式》——3.3 Trident spout

這些診斷代碼随機配置設定給事件。

在這個例子裡,我們使用對象來封裝診斷事件。為簡化起見,我們将事件的每個組成部分作為tuple的一個獨立字段。這裡,對象封裝還是使用tuple字段進行封裝,需要權衡。通常會限制tuple的字段在易于管理的數量之内,但為了資料流控制或tuple的分組政策,将資料放在tuple的字段裡還是有意義的。

在我們的例子中,diagnosisevent類表示topology處理的關鍵資料。對象的代碼如下所示:

《Storm分布式實時計算模式》——3.3 Trident spout
《Storm分布式實時計算模式》——3.3 Trident spout

https://yqfile.alicdn.com/5a3c17d0c95fa54fc745c49201f50749647949fb.png" >

這個對象是一個簡單的javabean。時間戳使用long變量存儲,存儲的是紀元時間的秒數。經度和緯度使用dobule存儲。diagnosiscode類使用string,以防系統可能需要處理非icd-9資料,比如有字母的代碼。

至此,topology已經可以發射事件了。在實際場景中,我們可能将topology內建到一個醫療請求處理系統或者一個電子健康記錄系統來進行實踐演練。