对于executor thread是整个storm最为核心的代码, 因为在这个thread里面真正完成了大部分工作, 而其他的如supervisor,worker都是封装调用.
对于executor的mk-threads, 是通过mutilmethods对spout和bolt分别定义不同的逻辑
spout在emit tuple后, 会等待ack或fail, 所以这些tuple暂时不能直接从删掉, 只能先放入pending队列, 直到最终被ack或fail后, 才能被删除
首先, tuple pending的个数是有限制的, p*num-tasks
p是topology-max-spout-pending, num-tasks是spout的task数
然后, spouts需要两种情况下需要wait, nexttuple为空, 或达到maxspoutpending上限
默认的wait策略是, sleep1毫秒, 可以在topology-spout-wait-strategy上配置特有的wait strategy class
最后, 定义pending的结构, 并且pending是会设置超时的, 不然万一后面的blot发生问题, 会导致spout block
其他的基本一致, 主要数据结构为, linkedlist<hashmap<k, v>> _buckets; 最主要的操作是rotate, 删除旧bucket, 添加新bucket
但rotatingmap需要外部的计数器来触发rotate, storm是通过system_tick来触发, 下面会看到
tuple-action-fn, 处理不同stream的tuple
1.2.1 system_tick_stream_id
(.rotate pending) rotate pending列表
1.2.2 metrics_tick_stream_id
执行(metrics-tick executor-data task-datas tuple)
触发component发送builtin-metrics的data, 到metrics_stream, 最终发送到metric-bolt统计当前的component处理tuples的情况
具体逻辑, 就是创建task-info和data-points, 并send到metrics_stream
1.2.3 default, 普通tuple
对于spout而言, 作为topology的source, 收到的tuple只会是acker-ack-stream或acker-fail-stream
所以收到tuple, 取得msgid, 从pending列表中删除
最终根据steamid, 调用ack-spout-msg或fail-spout-msg
以ack-spout-msg为例, fail基本一样, 只是调用.fail而已
这是executor的主线程, 没有使用disruptor.consume-loop来实现, 是因为这里不仅仅包含对recieve tuple的处理
所以使用async-loop来直接实现
前面也了解过, async-loop的实现是新开线程执行afn, 返回为sleeptime, 然后sleep sleeptime后继续执行afn……
这里的实现比较奇特,
在afn中只是做了准备工作, 比如定义send-spout-msg, 初始化spout…
然后afn, 返回一个fn, 真正重要的工作在这个fn里面执行了, 因为sleeptime在作为函数参数的时候, 也一定会先被evaluate
比较奇葩, 为什么要这样...
1.3.1 send-spout-msg
首先生成send-spout-msg函数, 这个函数最终被emit, emitdirect调用, 用于发送spout msg
所以逻辑就是首先根据message-id判断是否需要track, 需要则利用messageid生成root-id和out-id
然后生成tuple对象(tupleimpl)
先看看messageid和tupleimpl的定义
这里的messageid和emit传入的message-id没有什么关系, 这个名字起的容易混淆
这里主要的操作就是通过generateid产生随机id, 然后通过makerootid, 将[root-id, out-id]加入map, anchorstoids
后面做的事, 使用transfer-fn将tuple发到发送queue, 然后在pending中增加item用于tracking, 并send message到acker通知它track这个message
1.3.2 spout.open, 初始化spout
很简单, 关键是实现ispoutoutputcollector, emit, emitdirect
1.3.3 setup-metrics!, metrics_tick的来源
使用schedule-recurring定期给自己发送metrics_tick tuple, 以触发builtin-metrics的定期发送
1.3.4 fn
里面做了spout thread最关键的几件事, 最终返回0, 表示async-loop的sleep时间
handle recieve-queue里面的tuple
调用nexttuple…
注意所有事情都是在一个线程里面顺序做的, 所以不能有block的逻辑
先判断tuple的stream-id, 对于metrics_tick的处理参考上面
否则, 就是普通的tuple, 用对应的task去处理
对于一个executor线程中包含多个task, 其实就是这里根据task-id选择不同的task-data
并且最终调用bolt-obj的execute, 这就是user定义的bolt逻辑
^ibolt bolt-obj (:object task-data)
(.execute bolt-obj tuple)
2.2.1 bolt-emit
类似send-spout-msg, 被emit调用, 用于发送tuple, storm的命名风格不统一
调用task-fn产生out-tasks, 以及调用transfer-fn, 将tuples发送到发送队列都比较好理解
关键中一段对于anchors-to-ids的操作, 刚开始有些费解...这个anchors-to-ids 到底干吗用的? 用于记录的dag图中, 该tuple产生的edge, 以及和root的关系 代码里面anchor表示的是源tuple, 而理解上anchor更象是一种关系, 所以有些confuse 所以上面的逻辑就是新产生edge-id, 虽然相同的out-task, 但不同的anchor会产生不同的edge-id 然后对每个anchor的root-ids, 产生map [root-id, edge-id] (上面的逻辑是异或, 因为不同anchors可能有相同的root) 最终就是得到该tuple产生edges和所有相关的roots之间的关系
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL0QWOjJGZlVTNmhDNkRWY5YmMiRDNzkTO5UWZ1kDZ3gTLzQTMycTM1AzLchDMzEDMy8CXzUzNyEzMvw1ZvxmYvwVbvNmLn9GbiRXauNmLzV2Zh1Wavw1LcpDc0RHaiojIsJye.png)
然后其中的(.updateackval a edge-id)是干吗的? 为了节省一次向acker的消息发送, 理论上, 应该在创建edge的时候发送一次消息去acker上注册一下, 然后在ack的时候再发送一次消息去acker完成ack 但是storm做了优化, 节省了在创建edge的这次消息发送 优化的做法是, 将新创建的edge-id, 缓存在父tuple的_outackval上, 因为处理完紧接着会去ack父tuple, 所以在这个时候将新创建的edge信息一起同步到acker,具体看下面的ack实现 所以这里调用updateackval去更新父tuple的_outackval(做异或), 而没有向acker发送消息
关于storm跟踪所有tuple的方法 传统的方法, 在spout的时候, 生成rootid, 之后每次emit tuple, 产生一条edgeid, 就可以记录下整个dag 然后在ack的时候, 只需要标记或删除这些edgeid, 表明已经处理完就ok. 这样的问题在于, 如果dag图比较复杂, 那么这个结构会很大, 可扩展性不好 storm采用的方法是, 不需要记录具体的每条edge, 因为实际上他并不关心有哪些edge, 他只关心每条edge是否都被ack了, 所以只需要不停的做异或, 成对的异或结果为0
2.2.1 prepare
主要在于outputcollector的实现,
其中emit和emitdirect都是直接调用bolt-emit, 很简单
重点就是ack和fail的实现
其中比较难理解的是, 发送ack消息是不是直接发送本身的edge-id, 而是(bit-xor id ack-val)
其实做了两件事, ack当前tuple和同步新的edges
因为acker拿到id和ack-val也是和acker记录的值做异或, 所以这里先直接做异或, 省得在消息中需要发送两个参数
如果有耐心看到这儿, 再附送两幅图...
本文章摘自博客园,原文发布日期:2013-08-05