天天看点

Storm-源码分析-Topology Submit-Worker

和其他的daemon一样, 都是通过defserverfn macro来创建worker

1.2.1. 建立worker本地的hb 

调用do-heartbeat, 将worker的hb写到本地的localstate数据库中, (.put state ls-worker-heartbeat hb false)

1.2.2. 将worker hb同步到zk, 以便nimbus可以立刻知道worker已经启动 

调用do-executor-heartbeats, 通过worker-heartbeat!将worker hb写入zk的workerbeats目录

1.2.3. 设定timer定期更新本地hb和zk hb

(schedule-recurring (:heartbeat-timer worker) 0 (conf worker-heartbeat-frequency-secs) heartbeat-fn) 

(schedule-recurring (:executor-heartbeat-timer worker) 0 (conf task-heartbeat-frequency-secs) #(do-executor-heartbeats worker :executors @executors))

mk-refresh-connections定义并返回一个匿名函数, 但是这个匿名函数, 定义了函数名this, 这个情况前面也看到, 是因为这个函数本身要在函数体内被使用. 

并且refresh-connections是需要反复被执行的, 即当每次assignment-info发生变化的时候, 就需要refresh一次 

所以这里使用timer.schedule-recurring就不合适, 因为不是以时间触发 

这里使用的是zk的callback触发机制

supervisor的mk-synchronize-supervisor, 以及worker的mk-refresh-connections, 都采用类似的机制  a. 首先需要在每次assignment改变的时候被触发, 所以都利用zk的watcher  b. 都需要将自己作为callback, 并在获取assignment时进行注册, 都使用(fn this [])  c. 因为比较耗时, 都选择后台执行callback, 但是mk-synchronize-supervisor使用的是eventmanager, mk-refresh-connections使用的是timer  两者不同, timer是基于优先级队列, 所以更灵活, 可以设置延时时间, 而eventmanager, 就是普通队列实现, fifo  另外, eventmanager利用reify来封装接口, 返回的是record, 比timer的实现要优雅些

首先, 如果没有指定callback, 以(schedule (:refresh-connections-timer worker) 0 this)为callback 

接着, (.assignment-info storm-cluster-state storm-id callback) 在获取assignment信息的时候, 设置callback, 也就是说当assignment发生变化时, 就会向refresh-connections-timer中发送一个'立即执行this’的event 

这样就可以保证, 每次assignment发生变化, timer都会在后台做refresh-connections的操作

refresh-connections的步骤

a. 找出该worker下需要往其他task发送数据的task, outbound-tasks

    worker-outbound-tasks, 找出当前work中的task属于的component, 并找出该component的目标component

    最终找出目标compoennt所对应的所有task, 作为返回   

b. 找出outbound-tasks对应的tasks->node+port, my-assignment

c. 如果outbound-tasks在同一个worker进程中, 不需要建connection, 所以排除掉, 剩下needed-assignment 

   :value –> needed-connections , :key –> needed-tasks

d. 和当前已经创建并cache的connection集合对比一下, 找出new-connections和remove-connections

e. 调用icontext.connect, (.connect ^icontext (:mq-context worker) storm-id ((:node->host assignment) node) port), 创建新的connection, 并merge到:cached-node+port->socket中

f. 使用my-assignment更新:cached-task->node+port (结合:cached-node+port->socket, 就可以得到task->socket) 

g. close所有remove-connections, 并从:cached-node+port->socket中删除

launch接收线程,将数据从server的侦听端口, 不停的放到task对应的接收队列

1.5.1 mq-context 

调用transportfactory/makecontext来创建context对象, 根据配置不同, 分别创建local或zmq的context

1.5.2 transfer-local-fn 

返回fn, 该fn会将tuple-batch里面的tuples, 按task所对应的executor发送到对应的接收队列

作用就是将alist里面的elem, 按afn(elem)作为key, 经行group, 最终返回hashmap, 以便通过key可以取出所有的elem 

对上面的例子,  bind = [short-executor pairs]  amap = grouped  grouped的一个entry是, {: short-executor pairs}  一个简化的iter map的宏, 比较难于理解 

1.5.3 msg-loader/launch-receive-thread!

a, 使用async-loop, 创建异步执行loop的线程, 并start thread 

   主要的逻辑是, bind到socket端口, 不停的recv messages 

   当接收完一批, 通过transfer-local-fn放到接收队列

b, 在async-loop中已经start thread, 完成let的时候thread已经在工作了 

   这个function的返回值, 很有意思, 其实是这个thread的close function, 并且由于闭包了该thread, 使得这个thread在close前一直存在

生成disrputor的event handler, 

将packets不停的放到drainer里面, 当batch结束时, 将drainer里面的每条message发送到对应task的connection

从下图比较清晰的可以看出worker做了哪些事情, 

1. 根据assignment变化, 调整或创建send-connection 

2. 创建executors的输入和输出queue 

3. 创建worker的接收和发送线程, receive thread和tansfer thread 

4. 根据assignments关系, 创建executors

其中线程间通信使用的是, disruptor 

而进程间通信使用的是, zmq

Storm-源码分析-Topology Submit-Worker

本文章摘自博客园,原文发布日期:2013-07-23