在worker中通过executor/mk-executor worker e, 创建每个executor
executor会把需要发送的tuple缓存到batch-transfer->worker queue中
参考下面的comments, 为了避免component block (大量的tuple没有被及时处理), 额外创建了overflow buffer, 只有当这个buffer也满了, 才停止nexttuple(对于spout executor比较需要overflow buffer)
返回fn, fn用于将[task, tuple]放到overflow-buffer或者batch-transfer->worker queue中
注意, 这是executor->transfer-fn, 不同于worker->transfer-fn, 名字起的不好, 会混淆
executor的transfer-fn将tuple缓存到executor的batch-transfer->worker, 而worker->transfer-fn将tuple发送到worker的transfer queue
<a href="http://www.cnblogs.com/fxjwind/p/3223110.html">storm-源码分析-stats (backtype.storm.stats)</a>
根据conf里面的sampling-rate创建一个sampler
这里创建的是even-sampler,
even-sampler, 返回的是个fn ,并且通过with-meta添加metadata({:rate freq})
所以, 通过(:rate (meta sampler)), 可以从sampler的meta里面取出rate值
sampler就是fn, 每次调用都会返回(= curr target)
curr从start开始递增, 在达到target之前, 调用fn都是返回false
当curr等于target时, 调用fn返回true
当curr大于target时, 从新随机生成target, 将curr清零
所以sampler实际产生的效果, 就是不停的调用sampler, 会随机出现若干次false和一次true (在freq的范围内)
从而达到sampler的效果, 只有是true的时候才取样
其实对于简单的sampler, 比如rate是20%, 可以简单的每跳过4个取一个, 但是这样可能的问题是, 取样的规律性太强, 如果数据恰好符合你的规律, 比如5倍数的数据相同, 就会有问题
所以这里为了增加随机性, 采用这样的实现
并且这里对闭包和metadata的应用, 值得借鉴
(task/mk-task executor-data t)
<a href="http://www.cnblogs.com/fxjwind/p/3227653.html">storm-源码分析-topology submit-task</a>
从batch-transfer-queue取出messages, 没有到达batchend时, 放到cached-emit中的arraylist中
当达到batchend时, 使用transfer-fn将messages发送到transfer-queue (spout应该没有发送给自己的tuple吧)
worker, transfer-fn 将task分为local和remote 对于local的, 使用local-transfer将messages发送到对应的recieve-queue里面 而对于remote的, 使用disruptor/publish发送到transfer-queue里面
try…catch mk-threads函数, 如果发生异常将error写到zk, 以便其他的daemon能及时知道
handlers (with-error-reaction report-error-and-die
(mk-threads executor-data task-datas))
本文章摘自博客园,原文发布日期:2013-08-05