天天看点

storm-kafka-0.8-plus 源码解析

globalpartitioninformation (storm.kafka.trident)

记录partitionid和broker的关系

可以静态的生成globalpartitioninformation,向上面代码一样 

也可以动态的从zk获取,推荐这种方式 

从zk获取就会用到dynamicbrokersreader

dynamicbrokersreader

核心就是从zk上读出partition和broker的对应关系 

操作zk都是使用curator框架

核心函数,

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

dynamicpartitionconnections

维护到每个broker的connection,并记录下每个broker上对应的partitions

核心数据结构,为每个broker维持一个connectioninfo

connectioninfo的定义,包含连接该broker的simpleconsumer和记录partitions的set

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

核心函数,就是register

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

关键核心逻辑,用于管理一个partiiton的读取状态 

先理解下面几个变量,

kafka对于一个partition,一定是从offset从小到大按顺序读的,并且这里为了保证不读丢数据,会定期的将当前状态即offset写入zk

几个中间状态,

从kafka读到的offset,_emittedtooffset 

从kafka读到的messages会放入_waitingtoemit,放入这个list,我们就认为一定会被emit,所以emittedtooffset可以认为是从kafka读到的offset 

已经成功处理的offset,lastcompletedoffset 

由于message是要在storm里面处理的,其中是可能fail的,所以正在处理的offset是缓存在_pending中的 

如果_pending为空,那么lastcompletedoffset=_emittedtooffset 

如果_pending不为空,那么lastcompletedoffset为pending list里面第一个offset,因为后面都还在等待ack

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

已经写入zk的offset,_committedto 

我们需要定期将lastcompletedoffset,写入zk,否则crash后,我们不知道上次读到哪儿了 

所以_committedto <= lastcompletedoffset 

完整过程,

1. 初始化,

关键就是注册partition,然后初始化offset,以知道从哪里开始读

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

2. 从kafka读取messages,放到_waitingtoemit

从kafka中读到数据bytebuffermessageset, 

把需要emit的msg,messageandrealoffset,放到_waitingtoemit 

把没完成的offset放到pending 

更新emittedtooffset

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

其中fetch message的逻辑如下,

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

3. emit msg

从_waitingtoemit中取到msg,转换成tuple,然后通过collector.emit发出去 

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

可以看看转换tuple的过程, 

可以看到是通过kafkaconfig.scheme.deserialize来做转换

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

所以你使用时,需要定义scheme逻辑,

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

4. 定期的commit offset

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

5. 最后关注一下,fail时的处理

首先作者没有cache message,而只是cache offset 

所以fail的时候,他是无法直接replay的,在他的注释里面写了,不这样做的原因是怕内存爆掉

所以他的做法是,当一个offset fail的时候, 直接将_emittedtooffset回滚到当前fail的这个offset 

下次从kafka fetch的时候会从_emittedtooffset开始读,这样做的好处就是依赖kafka做replay,问题就是会有重复问题 

所以使用时,一定要考虑,是否可以接受重复问题

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

最后来看看kafkaspout

1. 初始化 

关键就是初始化dynamicpartitionconnections和_coordinator

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

看看_coordinator 是干嘛的? 

这很关键,因为我们一般都会开多个并发的kafkaspout,类似于high-level中的consumer group,如何保证这些并发的线程不冲突? 

使用和highlevel一样的思路,一个partition只会有一个spout消费,这样就避免处理麻烦的访问互斥问题(kafka做访问互斥很麻烦,试着想想) 

是根据当前spout的task数和partition数来分配,task和partitioin的对应关系的,并且为每个partition建立partitionmanager

这里首先看到totaltasks就是当前这个spout component的task size 

staticcoordinator和zkcoordinator的差别就是, 从statichost还是从zk读到partition的信息,简单起见,看看staticcoordinator实现

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

其中分配的逻辑在calculatepartitionsfortask

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

2. nexttuple

逻辑写的比较tricky,其实只要从一个partition读成功一次 

只所以要for,是当emitstate.no_emitted时,需要遍历后面的partition以保证读成功一次

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

定期commit的逻辑,遍历去commit每个partitionmanager

3. ack和fail

直接调用partitionmanager

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

4. declareoutputfields 

所以在scheme里面需要定义,deserialize和getoutputfields

再来看下metrics,关键学习一下如何在storm里面加metrics 

在spout.open里面初始化了下面两个metrics

kafkaoffset 

反映出每个partition的earliesttimeoffset,latesttimeoffset,和latestemittedoffset,其中latesttimeoffset - latestemittedoffset就是spout lag 

除了反映出每个partition的,还会算出所有的partitions的总数据

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

_kafkaoffsetmetric.getvalueandreset,其实只是get,不需要reset

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

kafkapartition 

反映出从kafka fetch数据的情况,fetchapilatencymax,fetchapilatencymean,fetchapicallcount 和 fetchapimessagecount

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

pm.getmetricsdatamap(),

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

更新的逻辑如下,

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

我们在读取kafka时,

首先是关心,每个partition的读取状况,这个通过取得kafkaoffset metrics就可以知道

再者,我们需要replay数据,使用high-level接口的时候可以通过系统提供的工具,这里如何搞?

看下下面的代码, 

第一个if,是从配置文件里面没有读到配置的情况 

第二个else if,当topologyinstanceid发生变化时,并且forcefromstart为true时,就会取startoffsettime指定的offset(latest或earliest) 

这个topologyinstanceid, 每次kafkaspout对象生成的时候随机产生, 

string _uuid = uuid.randomuuid().tostring(); 

spout对象是在topology提交时,在client端生成一次的,所以如果topology停止,再重新启动,这个id一定会发生变化

所以应该是只需要把forcefromstart设为true,再重启topology,就可以实现replay

storm-kafka-0.8-plus 源码解析
storm-kafka-0.8-plus 源码解析

代码例子

storm-kafka的文档很差,最后附上使用的例子

storm-kafka-0.8-plus 源码解析