和其他的daemon一样, 都是通过defserverfn macro来创建worker
1.2.1. 建立worker本地的hb
调用do-heartbeat, 将worker的hb写到本地的localstate数据库中, (.put state ls-worker-heartbeat hb false)
1.2.2. 将worker hb同步到zk, 以便nimbus可以立刻知道worker已经启动
调用do-executor-heartbeats, 通过worker-heartbeat!将worker hb写入zk的workerbeats目录
1.2.3. 设定timer定期更新本地hb和zk hb
(schedule-recurring (:heartbeat-timer worker) 0 (conf worker-heartbeat-frequency-secs) heartbeat-fn)
(schedule-recurring (:executor-heartbeat-timer worker) 0 (conf task-heartbeat-frequency-secs) #(do-executor-heartbeats worker :executors @executors))
mk-refresh-connections定义并返回一个匿名函数, 但是这个匿名函数, 定义了函数名this, 这个情况前面也看到, 是因为这个函数本身要在函数体内被使用.
并且refresh-connections是需要反复被执行的, 即当每次assignment-info发生变化的时候, 就需要refresh一次
所以这里使用timer.schedule-recurring就不合适, 因为不是以时间触发
这里使用的是zk的callback触发机制
supervisor的mk-synchronize-supervisor, 以及worker的mk-refresh-connections, 都采用类似的机制 a. 首先需要在每次assignment改变的时候被触发, 所以都利用zk的watcher b. 都需要将自己作为callback, 并在获取assignment时进行注册, 都使用(fn this []) c. 因为比较耗时, 都选择后台执行callback, 但是mk-synchronize-supervisor使用的是eventmanager, mk-refresh-connections使用的是timer 两者不同, timer是基于优先级队列, 所以更灵活, 可以设置延时时间, 而eventmanager, 就是普通队列实现, fifo 另外, eventmanager利用reify来封装接口, 返回的是record, 比timer的实现要优雅些
首先, 如果没有指定callback, 以(schedule (:refresh-connections-timer worker) 0 this)为callback
接着, (.assignment-info storm-cluster-state storm-id callback) 在获取assignment信息的时候, 设置callback, 也就是说当assignment发生变化时, 就会向refresh-connections-timer中发送一个'立即执行this’的event
这样就可以保证, 每次assignment发生变化, timer都会在后台做refresh-connections的操作
refresh-connections的步骤
a. 找出该worker下需要往其他task发送数据的task, outbound-tasks
worker-outbound-tasks, 找出当前work中的task属于的component, 并找出该component的目标component
最终找出目标compoennt所对应的所有task, 作为返回
b. 找出outbound-tasks对应的tasks->node+port, my-assignment
c. 如果outbound-tasks在同一个worker进程中, 不需要建connection, 所以排除掉, 剩下needed-assignment
:value –> needed-connections , :key –> needed-tasks
d. 和当前已经创建并cache的connection集合对比一下, 找出new-connections和remove-connections
e. 调用icontext.connect, (.connect ^icontext (:mq-context worker) storm-id ((:node->host assignment) node) port), 创建新的connection, 并merge到:cached-node+port->socket中
f. 使用my-assignment更新:cached-task->node+port (结合:cached-node+port->socket, 就可以得到task->socket)
g. close所有remove-connections, 并从:cached-node+port->socket中删除
launch接收线程,将数据从server的侦听端口, 不停的放到task对应的接收队列
1.5.1 mq-context
调用transportfactory/makecontext来创建context对象, 根据配置不同, 分别创建local或zmq的context
1.5.2 transfer-local-fn
返回fn, 该fn会将tuple-batch里面的tuples, 按task所对应的executor发送到对应的接收队列
作用就是将alist里面的elem, 按afn(elem)作为key, 经行group, 最终返回hashmap, 以便通过key可以取出所有的elem
对上面的例子, bind = [short-executor pairs] amap = grouped grouped的一个entry是, {: short-executor pairs} 一个简化的iter map的宏, 比较难于理解
1.5.3 msg-loader/launch-receive-thread!
a, 使用async-loop, 创建异步执行loop的线程, 并start thread
主要的逻辑是, bind到socket端口, 不停的recv messages
当接收完一批, 通过transfer-local-fn放到接收队列
b, 在async-loop中已经start thread, 完成let的时候thread已经在工作了
这个function的返回值, 很有意思, 其实是这个thread的close function, 并且由于闭包了该thread, 使得这个thread在close前一直存在
生成disrputor的event handler,
将packets不停的放到drainer里面, 当batch结束时, 将drainer里面的每条message发送到对应task的connection
从下图比较清晰的可以看出worker做了哪些事情,
1. 根据assignment变化, 调整或创建send-connection
2. 创建executors的输入和输出queue
3. 创建worker的接收和发送线程, receive thread和tansfer thread
4. 根据assignments关系, 创建executors
其中线程间通信使用的是, disruptor
而进程间通信使用的是, zmq
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLzUTY0EmNwYzNwY2MzITO5kjZiRDOmJzYjdTOihTNxkTLwMDOzUTMzIzLcdDMzEDMy8CXzUzNyEzMvw1ZvxmYvwVbvNmLn9GbiRXauNmLzV2Zh1Wavw1LcpDc0RHaiojIsJye.png)
本文章摘自博客园,原文发布日期:2013-07-23