天天看点

Storm-源码分析-Topology Submit-Executor

在worker中通过executor/mk-executor worker e, 创建每个executor

executor会把需要发送的tuple缓存到batch-transfer->worker queue中 

参考下面的comments, 为了避免component block (大量的tuple没有被及时处理), 额外创建了overflow buffer, 只有当这个buffer也满了, 才停止nexttuple(对于spout executor比较需要overflow buffer)

返回fn, fn用于将[task, tuple]放到overflow-buffer或者batch-transfer->worker queue中

注意, 这是executor->transfer-fn, 不同于worker->transfer-fn, 名字起的不好, 会混淆 

executor的transfer-fn将tuple缓存到executor的batch-transfer->worker, 而worker->transfer-fn将tuple发送到worker的transfer queue

<a href="http://www.cnblogs.com/fxjwind/p/3223110.html">storm-源码分析-stats (backtype.storm.stats)</a>

根据conf里面的sampling-rate创建一个sampler

这里创建的是even-sampler,

even-sampler, 返回的是个fn ,并且通过with-meta添加metadata({:rate freq}) 

所以, 通过(:rate (meta sampler)), 可以从sampler的meta里面取出rate值

sampler就是fn, 每次调用都会返回(= curr target) 

curr从start开始递增, 在达到target之前, 调用fn都是返回false 

当curr等于target时, 调用fn返回true 

当curr大于target时, 从新随机生成target, 将curr清零

所以sampler实际产生的效果, 就是不停的调用sampler, 会随机出现若干次false和一次true (在freq的范围内) 

从而达到sampler的效果, 只有是true的时候才取样

其实对于简单的sampler, 比如rate是20%, 可以简单的每跳过4个取一个, 但是这样可能的问题是, 取样的规律性太强, 如果数据恰好符合你的规律, 比如5倍数的数据相同, 就会有问题 

所以这里为了增加随机性, 采用这样的实现 

并且这里对闭包和metadata的应用, 值得借鉴

(task/mk-task executor-data t)

<a href="http://www.cnblogs.com/fxjwind/p/3227653.html">storm-源码分析-topology submit-task</a>

从batch-transfer-queue取出messages, 没有到达batchend时, 放到cached-emit中的arraylist中 

当达到batchend时, 使用transfer-fn将messages发送到transfer-queue (spout应该没有发送给自己的tuple吧)

worker, transfer-fn 将task分为local和remote  对于local的, 使用local-transfer将messages发送到对应的recieve-queue里面  而对于remote的, 使用disruptor/publish发送到transfer-queue里面

try…catch mk-threads函数, 如果发生异常将error写到zk, 以便其他的daemon能及时知道

handlers (with-error-reaction report-error-and-die 

                   (mk-threads executor-data task-datas))

本文章摘自博客园,原文发布日期:2013-08-05