天天看点

Storm-源码分析- Messaging (backtype.storm.messaging)

先定义两个接口和一个类 

taskmessage类本身比较好理解, 抽象storm的message格式 

对于icontext, 注释也说了, 定义messaging plugin, 通过什么渠道去发送message, storm这里设计成可替换的 

默认定义storm实现了local和zmq两种plugin, 当然你可以实现更多的 

local应该是用于local mode, 而zmq用于distributed mode

icontext接口主要是用于创建iconnection, 体现对socket的管理, 分别通过bind和connect定义服务器端和客户端的connection  

iconnection接口主要用于定义, 真正收发message的逻辑

最终通过transportfactory, 根据config.storm_messaging_transport的配置, 利用java的reflection动态的创建不同类型的context

taskmessage如其名, 包含task和message字段, 以说明发送给哪个task的message 

并且定义了序列化和反序列化的函数

可以详细看看local和zmq的plugin的实现

在local模式下使用的message plugin 

实现比较简单, 所有都基于queues-map来实现, 这里的queue直接使用linkedblockingqueue, 因为local用于测试, 不用考虑高效性 

所有的接收队列或发送队列都通过add-queue!加到queues-map里面(stormid+port作为key) 

那么所有的recv和send, 都是基于queue的操作

这里使用deftype, 而不是defrecord, 即connection和context本身不需要对字典的支持 

并且在icontext的实现中, 使用到了可变field, 据说是比较难用对的高级特性 

我个人的理解, 是因为deftype和defrecord一样, 没有闭包的效果, 而只有field(对象成员)可以随时被接口函数访问, 所以有些场景下需要field的mutable, 比如这里的queues-map 

之前类似的场景都是用reify实现的, 这里给出用deftype实现的版本 

在distributed mode时, storm使用zmq作为进程间和instrance间通信