天天看點

Storm-源碼分析-Streaming Grouping (backtype.storm.daemon.executor)

executor在發送outbounding message的時候, 需要決定發送到next component的哪些tasks 

這裡就需要用到streaming grouping,

除了direct grouping, 傳回的是grouper function, 執行該grouper function得到target tasks list 

direct grouping傳回, :direct

使用.select取出group-fields在tuple中對應的values list, 你可以使用多個fields來進行group 

使用tuple/list-hash-code, 對values list産生hash code 

對num-tasks取mod, 并使用task-getter取出對應的target-tasks

fields類, 除了存放fields的list, 還有個用于快速field讀取的index  index的生成, 很簡單, 就是記錄fields以及自然排序  使用時調用select, 給出需要哪幾個fields的value, 以及tuple  從index讀出fields的index值, 直接從tuple中讀出對應index的value (當然生成tuple的時候, 也必須安裝fields的順序生成)

fields grouping, 但是field為空, 就代表globle grouping, 所有tuple都發送到一個task

預設選取第一個task

發送到所有的tasks

沒有采用比較簡單的直接用random取值的方式(差別于none-grouping) 

因為考慮到load balance, 是以采用下面這種僞随機的實作方式

對target-tasks, 先随機shuffle, 打亂次序 

在acquire-random-range-id, 會依次讀所有的task, 這樣保證, 雖然順序是随機的, 但是每個task都會被選中一次 

當curr越界時, 清空curr, 并從新shuffle target-tasks

local tasks優先選取, 并采用shuffle的方式  

不care grouping的方式, 現在的實作就是簡單的random  

可以自定義customstreamgrouping, 關鍵就是定義choosetasks邏輯, 來實作自己的tasks choose政策

:custom-object 和:custom-serialized 的不同僅僅是, thrift-grouping是否被序列化過 

沒有就可以直接讀出object, 否則需要反序列成object

producer of the tuple decides which task of the consumer will receive this tuple. 

direct groupings can only be declared on streams that have been declared as direct streams.

這裡直接傳回:direct, 因為direct-grouping, 發送到哪個tasks, 是由producer産生tuple的時候已經決定了, 是以這裡不需要做任何grouping相關工作  

outbound-components 

一個executor隻會對應于一個component, 是以給出目前executor的component-id 

gettargets, 可以得出所有outbound components, [streamid, [target-componentid, grouping]]

調用outbound-groupings, 

最終傳回[streamid [component grouper]]的hashmap, 并指派給executor-data中的stream->component->grouper

task在最終發送message的時候, 就會通過stream->component->grouper來産生真正的target tasks list

outbound-groupings 

對每個task不為空的target component調用mk-grouper 

mk-grouper傳回的是grouper fn, 是以, 最終的傳回, [component, grouper]

繼續閱讀