1.為什麼需要批處理事務
在流式計算中,我們經常需要保證 exactly-once 語義。Storm的一個Spout在發送資料後如果處理失敗,由于其ack/fail機制,我們可以得知是那一批資料處理失敗,進而重新發送資料進行處理,但是這時會有一個問題,有可能會重複處理了同一批資料,尤其在一些要求比較高的場景(比如支付場景),這樣會造成嚴重的後果,是以為了確定 exactly-once 語義,保證資料僅且被執行一次,在Storm 0.7.0之後引入了transactional topologies。
2.事務機制原理
對于隻需要處理一次的場景,為了確定 exactly-once 語義,我們可以設計為每一個tuple設定一個tid進行辨別,我們可以通過tid來判斷目前事務是否被執行過,所有比tid小的事務必須執行完畢。對于這種設計,如果我們需要連接配接資料判斷tid,那麼對于每一個tuple都必須和資料進行互動,消耗了大量的資源而且降低了處理速度;是以,我們可以以batch為處理機關,為每一批tuple設定一個tid。

雖然上面這種設定避免了資源的浪費,但是對于每一個batch都必須等待其他batch處理完畢,如下面topology:
為了提高并行度,storm采用了pipeline(管道)處理模型,将一個batch分為兩個階段,processing和commit階段。在processing階段,多個batch可以并行計算,在commit階段,必須按照強順序性執行,最後送出事務。
在使用Transactional Topologies的時候,Storm會做以下幾點:
- 管理狀态:Storm使用zookeeper來儲存事務狀态,包括一些tid以及中繼資料。
- 協調事務:Strom内部幫你管理一切事務的執行,如在任何一點是processing階段還是commit階段。
- 錯誤檢測:Storm利用acking架構高效的來處理失敗的事務,當事務失敗時會replay相應的batch,你不需要手動的進行acking或者anchoring。
- 内置的批處理API:Storm在普通的bolt之上封裝了一層API來提供對tuple事務的處理。Storm管理所有的協調工作,保證一個bolt什麼時候收到一個特定的transaction的所有tuple,同時也會清除每一個transaction産生的中間資料。
3.事務API的簡單介紹
與之前的Storm程式一樣,我們需要使用一個TopologyBuilder來設定Spout和Bolt。相應的事務topology使用TransactionalTopologyBuilder來設定。對于Spout我們可以實作ITransactionalSpout接口,在這個接口中包含兩個内部接口類,Coordinator和Emitter,ITransactionalSpout保證了相同的批處理事務必須發送相同的tid。實際上,實作ITransactionalSpout接口的Spout是一個sub-topology,如下圖所示:
Coordinator開啟一個事務準備發射一個batch的時候,進入一個事務的processing階段,會發射一個事務tuple(包括transactionAttempt和metadata)到“batch emit”流中。(其中transactionAttempt包含"transaction id"和"attempt id","transaction id"對batch進行辨別,"attempt id"唯一辨別了對于同一batch發射不同的tuple;metadata中包含目前事務可以從目前哪個point進行重放資料,存放在zookeeper中,spout可以通過kryo從ZK中序列化和反序列化中繼資料)。
Emitter以all grouping(廣播)的方式訂閱coordinator的"batch emit",負責為每一個batch發射tuple。發送的tuple都必須以transactionAttempt作為第一個field,storm會根據它來判斷發送的tuple屬于哪一個batch。
其中Coordinator隻有一個,Emitter可以通過并行度來設定。
對于普通的批處理Bolt可以實作IBatchBolt接口,對于事務Bolt可以實作BaseTransactionalBolt。在BaseTransactionalBolt接口中會繼承父類的幾個方法,如下:
void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
void execute(Tuple tuple);
void finishBatch();
在處理batch的時候,處理每一個tuple的時候都可以調用execute方法,而在整個batch處理完畢(processing階段完畢)的時候才能調用finishBatch方法。如果一個Bolt被标記為Committer,那麼隻有在commit階段才能調用finshBatch方法。storm保證該bolt之前的所有bolt執行完畢。
下面有兩個問題,Storm如何保證之前的所有Bolt都執行完畢?怎樣将一個Bolt設定為Committer?
首先第一個問題,在bolt内部,有一個CoordinatedBolt模型,在CoordinatedBolt中記錄着兩個值,有哪些task給我發送了tuple,我需要給哪些task發送tuple。等所有的tuple發送完畢之後,CoordinateBolt通過另外一個特殊的stream以emitDirect的方式告訴所有發送過它tuple的task,它發送了多少tuple給這個task,下遊task會将這個數字和自己接受到的tuple數進行對比,如果相等,則表示處理完了所有tuple。下遊的CoordinateBolt重複上述步驟,繼續通知其下遊。
對于第二個問題,如何設定一個Bolt為Committer,總共有兩種方式。可以實作ICommitter接口來辨別為Committer或者使用TransactionalTopologyBuilder 的setCommitterBolt 方法來設定一個Committer。
最後要提的是盡管Strom的批處理事務被标記為deprecated ,使用Trident架構來替代,但是了解了批處理事務的原理也是學習Trident架構的基礎和關鍵,是以,值得了解該部分内容。
歡迎加入大資料開發交流群 731423890: