天天看点

Storm源码结构 (来源Storm Github Wiki)

本文译自storm github wiki:

structure of the codebase,有助于深入了解storm的设计和源码学习。本人也是参照这个进行学习的,觉得在理解storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里。以下的模块分析里没有包括storm 0.9.0增加的netty模块,对应的代码包在storm github下的storm-netty文件夹内,内容比较简单,关于这块的release

note可以参考storm 0.9.0 released netty transport,这里有一篇storm

的新消息传输机制也可以参考(这篇文章的博客里有不少分析storm的文章,博主本人貌似是storm的committer,好像在阿里工作)。此外,在storm github wiki pages里也可以看到不少需要的基础内容。

storm的源码共分为三个不同的层次。

首先,storm在设计之初就考虑到了兼容多语言开发。nimbus是一个thrift服务,topologies被定义为thrift结构体。thrift的运用使得storm可以被任意开发语言使用。

其次,storm的所有接口都是java语言来定义的。因此,尽管storm中的很多功能实现都是clojure代码(说实话,第一次看clojure代码的时候,第一感觉是这乱七八糟的都是些什么啊!那么多的括号又是什么节奏!),但是使用这些功能都必须通过java api。这意味着storm的所有特性对于java来讲都是可用的。

第三,storm的很大一部分实现都是clojure代码。从代码行来看,差不多是一半java代码,一半clojure代码。但是由于clojure在表达能力上更为见长,因此,实际上绝大多数逻辑的实现都是clojure来做的。

接下来的小节里将会逐个详细解释这三个层次。

要理解storm的代码结构,首先需要看的是storm.thrift文件。(在storm-core/src下)

storm使用了从这里folk出来的thrift版本来自动生成代码。这个thrift版本实际上是将所有的java packages都重命名为"org.apache.thrift7"之后的thrift 7。除此之外,它与thrfit 7是完全一样的。之所以单独出这样一个thrift版本一是考虑到thrift缺少向后兼容,而是为了避免包名冲突以满足一些用户在他们自己的topologies中用到其他版本的thrift。

一个topology中的任何一个spout或bolt都会被用户指定一个唯一标识,称为"component id"。当描述1个bolt接收其他哪些spout或bolt的输出时需要用到这个"component id"。stormtopology结构中保存了1个map来保存"component

id"到"component"的映射关系,这个映射关系包含所有的component类型(即所有的spout、bolt)。

thrift对spout或bolt的定义是相同的,因此我们只需要看一下bolt的thrift定义。它包含了1个"componentobject"结构和1个"componentcommon"结构。

"componentobject"即是bolt的实现实体。它可以是以下三个类型之一:

1个序列化的java对象(这个对象实现ibolt接口)

1个"shellcomponent"对象,意味着bolt是由其他语言实现的。如果以这种方式来定义1个bolt,storm将会实例化1个shellbolt对象来负责处理基于jvm的worker进程与非jvm的component(即该bolt)实现体之间的通讯。

1个"javaobject"结构,这个结构告诉storm实例化这个bolt所需要的classname和构造函数参数。这一点在你想用非jvm语言来定义topology时比较有用。这样,在你使用非jvm语言来定义topology时就可以做到既使用基于jvm的spout或bolt,同时又不需要创建并序列化它们的java对象。

"componentcommon"定义了这个component的其他所有属性。包括:

这个component发射什么stream以及stream的元数据(是否是direct stream,stream中field的声明)

这个component接收什么stream(被定义在1个component_id到stream_id的map里,在stream做分组时用到)

这个component的并行度

这个component的配置项configuration

注意,在spout的结构中同样有"componentcommon"字段,因此,spout也是可以被声明接收其他的stream输入。然而,storm java api并没有提供一种方式指定spout接收什么stream,同时如果你在这里指定1个spout的输入声明,在提交这个topology时将会出现报错信息。之所以这样设计,是因为spout的输入声明不是让用户自己来使用的,而是storm内部使用的。storm会在内部自动向topology添加stream和bolt来构造acking

framework,其中的两个stream就是从acker bolt发出给topology中的所有spout节点的。只要1个tuple树被检测到完成了或失败了,acker就会通过这两个stream分别发出"ack"或"fail"消息。将用户提交的topology转换成运行时的topology的代码可参见这里。

storm的接口定义都是java接口。主要的接口如下:

irichbolt

irichspout

topologybuilder

这样定义这些接口的主要意图在于:

以java语言来定义接口

基于此接口,可以做到在不同的场合,提供出各自最适合的默认实现基类

这一策略的实际运用可以参考baserichspout类

spout和bolt就是按照以上接口描述的方式被序列化到topology的thrift定义结构中。

值得一提的一个细节是,ibolt、ispout与irichbolt、irichspout这两对接口是有区别的。它们主要区别是在"rich"版本里增加了"declareoutputfields"方法。这样设计的原因是所有的输出stream的输出field声明都必须是在thrift结构里的(这样就可以做到使用任何编程语言来声明了),但是用户又希望能够在自己的class中来声明stream输出field信息。为解决这个问题,"topologybuilder"在构造thrift结构时就是通过调用"declareoutputfields"方法来得到输出field的声明,然后将其转换纳入thrift结构。这个转换操作可以从"topologybuilder"代码中的这一段里看到。

通过将storm所有的接口都由java语言来定义确保了storm的所有功能对于java来讲都是可使用的。同时,java接口的使用也使得java用户在使用storm时体验更好。

应该说,storm主要是由clojure语言实现的。尽管从代码行数上看一半是java一半是clojure,但其实里面绝大多数的逻辑实现都是clojure。有两个值得一提的例外就是drpc和支持事务的topology,它们二者都纯java实现的。这样做的主要目的是来展示如何基于storm,实现storm之上更高层次的抽象。drpc和支持事务的topology的实现分别位于backtype.storm.coordination和backtype.storm.transactional包里。

这里总结了一份主要的java包和clojure命名空间的内容列表:

backtype.storm.coordination: 实现了drpc和事务性topology里用到的基于storm的批处理功能。这个包里最重要得类是coordinatedbolt

backtype.storm.drpc: drpc的更高层次抽象的具体实现

backtype.storm.generated: 自动生成的thrift代码(利用这里folk出来的thrift版本生成的,主要是把org.apache.thrift包重命名成org.apache.thrift7来避免与其他thrift版本的冲突)

backtype.storm.grouping: 包含了用户实现自定义stream分组类时需要用到的接口

backtype.storm.hooks: 定义了处理storm各种事件的钩子接口,例如当task发射tuple时、当tuple被ack时。关于钩子的手册详见这里

backtype.storm.serialization: storm序列化/反序列化tuple的实现。在kryo之上构建。

backtype.storm.spout: spout及相关接口的定义(例如"spoutoutputcollector")。也包括了"shellspout"来实现非jvm语言定义spout的协议。

backtype.storm.task: bolt及相关接口的定义(例如"outputcollector")。也包括了"shellbolt"来实现非jvm语言定义bolt的协议。最后,"topologycontext"也是在这里定义的,用来在运行时供spout和bolt使用以获取topology的执行信息。

backtype.storm.testing: 包括了storm单元测试中用到的各种测试bolt及工具。

backtype.storm.topology: 在thrift结构之上的java层,用以提供一个纯java api来使用storm(用户不需要了解thrift的细节)。"topologybuilder"及不同spout和bolt的基类们也在这里定义。稍高一层次的接口"ibasicbolt"也在这里定义,它会使得创建某些特定类型的bolt会更加简洁。

backtype.storm.transactional: 包括了事务性topology的实现。

backtype.storm.tuple: 包括storm中tuple数据模型的实现。

backtype.storm.utils: 包含了storm源码中用到的数据结构及各种工具类。

backtype.storm.bootstrap: 包括了1个有用的宏来引入源码中用到的所有类及命名空间。

backtype.storm.clojure: 包括了利用clojure为storm定义的特定领域语言(dsl)。

backtype.storm.cluster: storm守护进程中用到的zookeeper逻辑都封装在这个文件中。这部分代码提供了api来将整个集群的运行状态映射到zookeeper的"文件系统"上(例如哪里运行着怎样的task,每个task运行的是哪个spout/bolt)。

backtype.storm.command.*: 这些命名空间包括了各种"storm xxx"开头的客户端命令行的命令实现。这些实现都很简短。

backtype.storm.config: clojure中config的读取/解析实现。同时也包括了工具函数来告诉nimbus、supervisor等守护进程在各种情况下应该使用哪些本地目录。例如:"master-inbox"函数会返回本地目录告诉nimbus应该将上传给它的jar包保存到哪里。

backtype.storm.daemon.acker: "acker" bolt的实现。这是storm确保数据被完全处理的关键组成部分。

backtype.storm.daemon.common: storm守护进程用到的公共函数,例如根据topology的名字获取其id,将1个用户定义的topology映射到真正运行的topology(真正运行的topology是在用户定义的topology基础上添加了ack stream及acker bolt,参见system-topology!函数),同时包括了各种心跳及storm中其他数据结构的定义。

backtype.storm.daemon.drpc: 包括了drpc服务器的实现,用来与drpc topology一起使用。

backtype.storm.daemon.nimbus: 包括了nimbus的实现。

backtype.storm.daemon.supervisor: 包括了supervisor的实现。

backtype.storm.daemon.task: 包括了spout或bolt的task实例实现。包括了处理消息路由、序列化、为ui提供的统计集合及spout、bolt执行动作的实现。

backtype.storm.daemon.worker: 包括了worker进程(1个worker包含很多的task)的实现。包括了消息传输和task启动的实现。

backtype.storm.event: 包括了1个简单的异步函数的执行器。nimbus和supervisor很多场合都用到了异步函数执行器来避免资源竞争。

backtype.storm.log: 定义了用来输出log信息给log4j的函数。

backtype.storm.messaging.*: 定义了1个高一层次的接口来实现点对点的消息通讯。工作在本地模式时storm会使用内存中的java队列来模拟消息传递。工作在集群模式时,消息传递使用的是zeromq。通用的接口在protocol.clj中定义。

backtype.storm.stats: 实现了向zookeeper中写入ui使用的统计信息时如何进行汇总。实现了不同粒度的聚合。

backtype.storm.testing: 包括了测试storm topology的工具。包括时间仿真,运行一组固定数量的tuple然后获得输出快照的"complete-topology","tracker topology"可以在集群"空闲"时做更细粒度的控制操作,以及其他工具。

backtype.storm.thrift: 包括了自动生成的thrift api的clojure封装以使得使用thrift结构更加便利。

backtype.storm.timer: 实现了1个后台定时器来延迟执行函数或者定时轮询执行。storm不能使用java里的timer类,因为为了单测nimbus和supervisor,必须要与时间仿真集成起来使用。

backtype.storm.ui.*: storm ui的实现。完全独立于其他的代码,通过nimbus的thrift api来获取需要的数据。

backtype.storm.util: 包括了storm代码中用到的通用工具函数。

backtype.storm.zookeeper: 包括了clojure对zookeeper api的封装,同时也提供了一些高一层次的操作例如:"mkdirs"、"delete-recursive"

(全文完)