天天看點

《Storm分布式實時計算模式》——3.5 Trident聚合器

本節書摘來自華章計算機《storm分布式實時計算模式》一書中的第3章,第3.5節,作者:(美)p. taylor goetz brian o’neill 更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。

和function類似,aggregator(聚合器)允許topology組合tuple。不同的是,它會替換tuple的字段和值。有三種聚合器:combineraggregator、reduceraggregator和aggregator。

3.5.1 combineraggregator

combineraggregator用來将一個集合的tuple組合到一個單獨的字段中,combiner的簽名(signature)如下所示:

《Storm分布式實時計算模式》——3.5 Trident聚合器

storm對每個tuple調用init()方法,然後重複調用combine()方法直到一個分片的資料處理完成。傳遞給combine()方法的兩個參數是局部聚合的結果,以及調用了init()傳回的值。分片會在後面的部分詳細介紹,分片實際上就是tuples組成的資料流在同一個機器上的一個子集。将tuple生成的值進行組合之後,storm發送組合結果作為一個新的字段。如果分片是空的,storm會發送zero()方法執行的傳回。

3.5.2 reduceraggregator

reduceraggregator接口有一點差別,簽名如下:

《Storm分布式實時計算模式》——3.5 Trident聚合器

storm調用init()方法來擷取原始值。然後為分片中的每一個tuple調用reduce()方法,直到分片資料處理完成。第一個參數是局部的聚合結果。這個方法的實作需要将第二個參數tuple合并到局部聚合結果中傳回。

3.5.3 aggregator

最通用的聚合操作是aggregator。簽名如下所示:

《Storm分布式實時計算模式》——3.5 Trident聚合器

aggregator接口的aggregate()方法和function接口的execute()方法類似,但是多了一個value參數。這樣aggregator就可以在處理tuple的時候累積值。注意,在aggregator接口中,aggregate()和complete()方法都有collector這個參數,通過它可以發射任意個數的tuple。在我們的topology例子中,我們利用了一個内置的count的aggregator。count的實作如下面代碼片段所示:

《Storm分布式實時計算模式》——3.5 Trident聚合器

我們在示例topology中使用了分組和計數來統計在一個城市附近一個小時内發生疾病的次數。實作代碼如下所示:

《Storm分布式實時計算模式》——3.5 Trident聚合器

回顧storm在不同機器上的資料的分片,如圖3-2所示。

《Storm分布式實時計算模式》——3.5 Trident聚合器

https://yqfile.alicdn.com/899f99dbd07c00661c7faf29135d61beb6420e4e.png

" >

groupby()方法強制資料重新分片,将特定字段值相同的tuple分組到同一個分片中。為了做到這個,storm必須将相似的tuple發送到相同的主機上。圖3-3展示了資料被groupby()重新分組後的分片情況。

《Storm分布式實時計算模式》——3.5 Trident聚合器

https://yqfile.alicdn.com/3c5b95458c4e0eda57a90ddd66123171e5f7d382.png" >

分片後,agreagte函數在每個分片資料的每個分組中運作。在我們的例子裡,根據城市、小時、疾病代碼作為分組的關鍵詞。然後count aggregator在每個分組上執行,将計數發射給下遊的消費者元件。