天天看點

Storm專題二:Storm Trident API 使用詳解

一、概述

     storm trident中的核心資料模型就是“stream”,也就是說,storm trident處理的是stream,但是實際上stream是被成批處理的,stream被切分成一個個的batch分布到叢集中,所有應用在stream上的函數最終會應用到每個節點的batch中,實作并行計算,具體如下圖所示:

Storm專題二:Storm Trident API 使用詳解

在trident中有五種操作類型:

apply locally:本地操作,所有操作應用在本地節點資料上,不會産生網絡傳輸     

repartitioning:資料流重定向,單純的改變資料流向,不會改變資料内容,這部分會有網絡傳輸

aggragation:聚合操作,會有網絡傳輸

grouped streams上的操作

merge和join

小結:上面提到了trident實際上是通過把函數應用到每個節點的batch上的資料以實作并行,而應用的這些函數就是tridentapi,下面我們就具體介紹一下tridentapi的各種操作。  

二、trident五種操作詳解

2.1 apply locally本地操作:操作都應用在本地節點的batch上,不會産生網絡傳輸

2.1.1 functions:函數操作

     函數的作用是接收一個tuple(需指定接收tuple的哪個字段),輸出0個或多個tuples。輸出的新字段值會被追加到原始輸入tuple的後面,如果一個function不輸出tuple,那就意味這這個tuple被過濾掉了,下面舉例說明:

定義一個function:

     小結:function實際上就是對經過function函的tuple做一些操作以改變其内容。

比如我們處理一個“mystream”的資料流,它有三個字段分别是[“a”, “b”, “c”] ,資料流中tuple的内容是:

     [1,

2, 3] [4, 1, 6] [3, 0, 8]

我們運作我們的function:  

     它意思是接收輸入的每個tuple “b”字段得值,把函數結算結果做為新字段“d”追加到每個tuple後面,然後發射出去。

最終運作結果會是每個tuple有四個字段[“a”, “b”, “c”, “d”],每個tuple的内容變成了:

2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0]

    小結:我們注意到,如果一個function發射多個tuple時,每個發射的新tuple中仍會保留原來老tuple的資料。

2.1.2 filters:過濾操作

filters很簡單,接收一個tuple并決定是否保留這個tuple。舉個例子,定義一個filter:

假設我們的tuples有這個幾個字段 [“a”,

“b”, “c”]: 

     [1, 2, 3] [2, 1, 1] [2, 3, 4]

然後運作我們的filter:

則最終得到的tuple是 :

     [2,

1, 1]

     說明第一個和第三個不滿足條件,都被過濾掉了。

     小結:filter就是一個過濾器,它決定是否需要保留目前tuple。

2.1.3 partitionaggregate

    partitionaggregate的作用對每個partition中的tuple進行聚合,與前面的函數在原tuple後面追加資料不同,partitionaggregate的輸出會直接替換掉輸入的tuple,僅資料partitionaggregate中發射的tuple。下面舉例說明:

定義一個累加的partitionaggregate:

假設我們的stream包含兩個字段 [“a”,

“b”],各個partition的tuple内容是:

     ```

partition 0: [“a”, 1] [“b”, 2]

     partition 1: [“a”, 3] [“c”, 8]

     partition 2: [“e”, 1] [“d”, 9] [“d”, 10] ```

輸出的内容隻有一個字段“sum”,值是:

     ```

partition 0: [3]

     partition 1: [11]

     partition 2: [20] ```

    tridentapi提供了三個聚合器的接口:combineraggregator, reduceraggregator,

and aggregator.

我們先看一下combineraggregator接口:   

    combineraggregator接口隻傳回一個tuple,并且這個tuple也隻包含一個field。init方法會先執行,它負責預處理每一個接收到的tuple,然後再執行combine函數來計算收到的tuples直到最後一個tuple到達,當所有tuple處理完時,combineraggregator會發射zero函數的輸出,舉個例子:

定義一個combineraggregator實作來計數:  

     小結:當你使用aggregate 方法代替partitionaggregate時,combineraggregator的好處就展現出來了,因為trident會自動優化計算,在網絡傳輸tuples之前做局部聚合。

我們再看一下reduceraggregator:

     reduceraggregator通過init方法提供一個初始值,然後為每個輸入的tuple疊代這個值,最後生産處一個唯一的tuple輸出,下面舉例說明:

定義一個reduceraggregator接口實作技術器的例子:

    aggregator接口可以發射含任意數量屬性的任意資料量的tuples,并且可以在執行過程中的任何時候發射:

init:在處理資料之前被調用,它的傳回值會作為一個狀态值傳遞給aggregate和complete方法

aggregate:用來處理每一個輸入的tuple,它可以更新狀态值也可以發射tuple

complete:當所有tuple都被處理完成後被調用     

定義一個實作來完成一個計數器:

shuffle:通過随機配置設定算法來均衡tuple到各個分區

broadcast:每個tuple都被廣播到所有的分區,這種方式在drcp時非常有用,比如在每個分區上做statequery

partitionby:根據指定的字段清單進行劃分,具體做法是用指定字段清單的hash值對分區個數做取模運算,確定相同字段清單的資料被劃分到同一個分區

global:所有的tuple都被發送到一個分區,這個分區用來處理整個stream

batchglobal:一個batch中的所有tuple都被發送到同一個分區,不同的batch會去往不同的分區

partition:通過一個自定義的分區函數來進行分區,這個自定義函數實作了 backtype.storm.grouping.customstreamgrouping

2.3 aggragation聚合操作

和paritionaggregate一樣,aggregators的聚合也可以串聯起來,但是如果你把一個 combineraggregator和一個非combineraggregator串聯在一起,trident是無法完成局部聚合優化的。

另一種合并流的方式就是join。一個标準的join就像是一個sql,必須有标準的輸入,是以,join隻針對符合條件的stream。join應用在來自spout的每一個小batch中。join時候的tuple會包含: