天天看点

Storm-源码分析-Topology Submit-Executor-mk-threads

对于executor thread是整个storm最为核心的代码, 因为在这个thread里面真正完成了大部分工作, 而其他的如supervisor,worker都是封装调用.

对于executor的mk-threads, 是通过mutilmethods对spout和bolt分别定义不同的逻辑

spout在emit tuple后, 会等待ack或fail, 所以这些tuple暂时不能直接从删掉, 只能先放入pending队列, 直到最终被ack或fail后, 才能被删除

首先, tuple pending的个数是有限制的, p*num-tasks 

p是topology-max-spout-pending, num-tasks是spout的task数

然后, spouts需要两种情况下需要wait, nexttuple为空, 或达到maxspoutpending上限

默认的wait策略是, sleep1毫秒, 可以在topology-spout-wait-strategy上配置特有的wait strategy class

最后, 定义pending的结构, 并且pending是会设置超时的, 不然万一后面的blot发生问题, 会导致spout block

其他的基本一致, 主要数据结构为, linkedlist<hashmap<k, v>> _buckets; 最主要的操作是rotate, 删除旧bucket, 添加新bucket

但rotatingmap需要外部的计数器来触发rotate, storm是通过system_tick来触发, 下面会看到

tuple-action-fn, 处理不同stream的tuple

1.2.1 system_tick_stream_id

(.rotate pending) rotate pending列表

1.2.2 metrics_tick_stream_id

执行(metrics-tick executor-data task-datas tuple)

触发component发送builtin-metrics的data, 到metrics_stream, 最终发送到metric-bolt统计当前的component处理tuples的情况

具体逻辑, 就是创建task-info和data-points, 并send到metrics_stream

1.2.3 default, 普通tuple

对于spout而言, 作为topology的source, 收到的tuple只会是acker-ack-stream或acker-fail-stream 

所以收到tuple, 取得msgid, 从pending列表中删除 

最终根据steamid, 调用ack-spout-msg或fail-spout-msg

以ack-spout-msg为例, fail基本一样, 只是调用.fail而已

这是executor的主线程, 没有使用disruptor.consume-loop来实现, 是因为这里不仅仅包含对recieve tuple的处理 

所以使用async-loop来直接实现 

前面也了解过, async-loop的实现是新开线程执行afn, 返回为sleeptime, 然后sleep sleeptime后继续执行afn…… 

这里的实现比较奇特, 

在afn中只是做了准备工作, 比如定义send-spout-msg, 初始化spout… 

然后afn, 返回一个fn, 真正重要的工作在这个fn里面执行了, 因为sleeptime在作为函数参数的时候, 也一定会先被evaluate 

比较奇葩, 为什么要这样...

1.3.1 send-spout-msg

首先生成send-spout-msg函数, 这个函数最终被emit, emitdirect调用, 用于发送spout msg 

所以逻辑就是首先根据message-id判断是否需要track, 需要则利用messageid生成root-id和out-id 

然后生成tuple对象(tupleimpl) 

先看看messageid和tupleimpl的定义

这里的messageid和emit传入的message-id没有什么关系, 这个名字起的容易混淆 

这里主要的操作就是通过generateid产生随机id, 然后通过makerootid, 将[root-id, out-id]加入map, anchorstoids

后面做的事, 使用transfer-fn将tuple发到发送queue, 然后在pending中增加item用于tracking, 并send message到acker通知它track这个message

1.3.2 spout.open, 初始化spout

很简单, 关键是实现ispoutoutputcollector, emit, emitdirect

1.3.3 setup-metrics!, metrics_tick的来源

使用schedule-recurring定期给自己发送metrics_tick tuple, 以触发builtin-metrics的定期发送

1.3.4 fn

里面做了spout thread最关键的几件事, 最终返回0, 表示async-loop的sleep时间 

handle recieve-queue里面的tuple 

调用nexttuple… 

注意所有事情都是在一个线程里面顺序做的, 所以不能有block的逻辑

先判断tuple的stream-id, 对于metrics_tick的处理参考上面

否则, 就是普通的tuple, 用对应的task去处理 

对于一个executor线程中包含多个task, 其实就是这里根据task-id选择不同的task-data 

并且最终调用bolt-obj的execute, 这就是user定义的bolt逻辑

^ibolt bolt-obj (:object task-data)

(.execute bolt-obj tuple)

2.2.1 bolt-emit

类似send-spout-msg, 被emit调用, 用于发送tuple, storm的命名风格不统一 

调用task-fn产生out-tasks, 以及调用transfer-fn, 将tuples发送到发送队列都比较好理解

关键中一段对于anchors-to-ids的操作, 刚开始有些费解...这个anchors-to-ids 到底干吗用的? 用于记录的dag图中, 该tuple产生的edge, 以及和root的关系  代码里面anchor表示的是源tuple, 而理解上anchor更象是一种关系, 所以有些confuse   所以上面的逻辑就是新产生edge-id, 虽然相同的out-task, 但不同的anchor会产生不同的edge-id  然后对每个anchor的root-ids, 产生map [root-id, edge-id] (上面的逻辑是异或, 因为不同anchors可能有相同的root)  最终就是得到该tuple产生edges和所有相关的roots之间的关系
Storm-源码分析-Topology Submit-Executor-mk-threads
然后其中的(.updateackval a edge-id)是干吗的?  为了节省一次向acker的消息发送, 理论上, 应该在创建edge的时候发送一次消息去acker上注册一下, 然后在ack的时候再发送一次消息去acker完成ack  但是storm做了优化, 节省了在创建edge的这次消息发送  优化的做法是,  将新创建的edge-id, 缓存在父tuple的_outackval上, 因为处理完紧接着会去ack父tuple, 所以在这个时候将新创建的edge信息一起同步到acker,具体看下面的ack实现  所以这里调用updateackval去更新父tuple的_outackval(做异或), 而没有向acker发送消息
关于storm跟踪所有tuple的方法  传统的方法, 在spout的时候, 生成rootid, 之后每次emit tuple, 产生一条edgeid, 就可以记录下整个dag  然后在ack的时候, 只需要标记或删除这些edgeid, 表明已经处理完就ok.  这样的问题在于, 如果dag图比较复杂, 那么这个结构会很大, 可扩展性不好  storm采用的方法是, 不需要记录具体的每条edge, 因为实际上他并不关心有哪些edge, 他只关心每条edge是否都被ack了, 所以只需要不停的做异或, 成对的异或结果为0

2.2.1 prepare

主要在于outputcollector的实现,

其中emit和emitdirect都是直接调用bolt-emit, 很简单

重点就是ack和fail的实现

其中比较难理解的是, 发送ack消息是不是直接发送本身的edge-id, 而是(bit-xor id ack-val) 

其实做了两件事, ack当前tuple和同步新的edges 

因为acker拿到id和ack-val也是和acker记录的值做异或, 所以这里先直接做异或, 省得在消息中需要发送两个参数

如果有耐心看到这儿, 再附送两幅图...

Storm-源码分析-Topology Submit-Executor-mk-threads
Storm-源码分析-Topology Submit-Executor-mk-threads

本文章摘自博客园,原文发布日期:2013-08-05