天天看点

Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)

executor在发送outbounding message的时候, 需要决定发送到next component的哪些tasks 

这里就需要用到streaming grouping,

除了direct grouping, 返回的是grouper function, 执行该grouper function得到target tasks list 

direct grouping返回, :direct

使用.select取出group-fields在tuple中对应的values list, 你可以使用多个fields来进行group 

使用tuple/list-hash-code, 对values list产生hash code 

对num-tasks取mod, 并使用task-getter取出对应的target-tasks

fields类, 除了存放fields的list, 还有个用于快速field读取的index  index的生成, 很简单, 就是记录fields以及自然排序  使用时调用select, 给出需要哪几个fields的value, 以及tuple  从index读出fields的index值, 直接从tuple中读出对应index的value (当然生成tuple的时候, 也必须安装fields的顺序生成)

fields grouping, 但是field为空, 就代表globle grouping, 所有tuple都发送到一个task

默认选取第一个task

发送到所有的tasks

没有采用比较简单的直接用random取值的方式(区别于none-grouping) 

因为考虑到load balance, 所以采用下面这种伪随机的实现方式

对target-tasks, 先随机shuffle, 打乱次序 

在acquire-random-range-id, 会依次读所有的task, 这样保证, 虽然顺序是随机的, 但是每个task都会被选中一次 

当curr越界时, 清空curr, 并从新shuffle target-tasks

local tasks优先选取, 并采用shuffle的方式  

不care grouping的方式, 现在的实现就是简单的random  

可以自定义customstreamgrouping, 关键就是定义choosetasks逻辑, 来实现自己的tasks choose策略

:custom-object 和:custom-serialized 的不同仅仅是, thrift-grouping是否被序列化过 

没有就可以直接读出object, 否则需要反序列成object

producer of the tuple decides which task of the consumer will receive this tuple. 

direct groupings can only be declared on streams that have been declared as direct streams.

这里直接返回:direct, 因为direct-grouping, 发送到哪个tasks, 是由producer产生tuple的时候已经决定了, 所以这里不需要做任何grouping相关工作  

outbound-components 

一个executor只会对应于一个component, 所以给出当前executor的component-id 

gettargets, 可以得出所有outbound components, [streamid, [target-componentid, grouping]]

调用outbound-groupings, 

最终返回[streamid [component grouper]]的hashmap, 并赋值给executor-data中的stream->component->grouper

task在最终发送message的时候, 就会通过stream->component->grouper来产生真正的target tasks list

outbound-groupings 

对每个task不为空的target component调用mk-grouper 

mk-grouper返回的是grouper fn, 所以, 最终的返回, [component, grouper]

继续阅读