globalpartitioninformation (storm.kafka.trident)
记录partitionid和broker的关系
可以静态的生成globalpartitioninformation,向上面代码一样
也可以动态的从zk获取,推荐这种方式
从zk获取就会用到dynamicbrokersreader
dynamicbrokersreader
核心就是从zk上读出partition和broker的对应关系
操作zk都是使用curator框架
核心函数,
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
dynamicpartitionconnections
维护到每个broker的connection,并记录下每个broker上对应的partitions
核心数据结构,为每个broker维持一个connectioninfo
connectioninfo的定义,包含连接该broker的simpleconsumer和记录partitions的set
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
核心函数,就是register
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
关键核心逻辑,用于管理一个partiiton的读取状态
先理解下面几个变量,
kafka对于一个partition,一定是从offset从小到大按顺序读的,并且这里为了保证不读丢数据,会定期的将当前状态即offset写入zk
几个中间状态,
从kafka读到的offset,_emittedtooffset
从kafka读到的messages会放入_waitingtoemit,放入这个list,我们就认为一定会被emit,所以emittedtooffset可以认为是从kafka读到的offset
已经成功处理的offset,lastcompletedoffset
由于message是要在storm里面处理的,其中是可能fail的,所以正在处理的offset是缓存在_pending中的
如果_pending为空,那么lastcompletedoffset=_emittedtooffset
如果_pending不为空,那么lastcompletedoffset为pending list里面第一个offset,因为后面都还在等待ack
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
已经写入zk的offset,_committedto
我们需要定期将lastcompletedoffset,写入zk,否则crash后,我们不知道上次读到哪儿了
所以_committedto <= lastcompletedoffset
完整过程,
1. 初始化,
关键就是注册partition,然后初始化offset,以知道从哪里开始读
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
2. 从kafka读取messages,放到_waitingtoemit
从kafka中读到数据bytebuffermessageset,
把需要emit的msg,messageandrealoffset,放到_waitingtoemit
把没完成的offset放到pending
更新emittedtooffset
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
其中fetch message的逻辑如下,
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
3. emit msg
从_waitingtoemit中取到msg,转换成tuple,然后通过collector.emit发出去
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
可以看看转换tuple的过程,
可以看到是通过kafkaconfig.scheme.deserialize来做转换
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
所以你使用时,需要定义scheme逻辑,
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
4. 定期的commit offset
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
5. 最后关注一下,fail时的处理
首先作者没有cache message,而只是cache offset
所以fail的时候,他是无法直接replay的,在他的注释里面写了,不这样做的原因是怕内存爆掉
所以他的做法是,当一个offset fail的时候, 直接将_emittedtooffset回滚到当前fail的这个offset
下次从kafka fetch的时候会从_emittedtooffset开始读,这样做的好处就是依赖kafka做replay,问题就是会有重复问题
所以使用时,一定要考虑,是否可以接受重复问题
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
最后来看看kafkaspout
1. 初始化
关键就是初始化dynamicpartitionconnections和_coordinator
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
看看_coordinator 是干嘛的?
这很关键,因为我们一般都会开多个并发的kafkaspout,类似于high-level中的consumer group,如何保证这些并发的线程不冲突?
使用和highlevel一样的思路,一个partition只会有一个spout消费,这样就避免处理麻烦的访问互斥问题(kafka做访问互斥很麻烦,试着想想)
是根据当前spout的task数和partition数来分配,task和partitioin的对应关系的,并且为每个partition建立partitionmanager
这里首先看到totaltasks就是当前这个spout component的task size
staticcoordinator和zkcoordinator的差别就是, 从statichost还是从zk读到partition的信息,简单起见,看看staticcoordinator实现
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
其中分配的逻辑在calculatepartitionsfortask
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
2. nexttuple
逻辑写的比较tricky,其实只要从一个partition读成功一次
只所以要for,是当emitstate.no_emitted时,需要遍历后面的partition以保证读成功一次
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
定期commit的逻辑,遍历去commit每个partitionmanager
3. ack和fail
直接调用partitionmanager
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
4. declareoutputfields
所以在scheme里面需要定义,deserialize和getoutputfields
再来看下metrics,关键学习一下如何在storm里面加metrics
在spout.open里面初始化了下面两个metrics
kafkaoffset
反映出每个partition的earliesttimeoffset,latesttimeoffset,和latestemittedoffset,其中latesttimeoffset - latestemittedoffset就是spout lag
除了反映出每个partition的,还会算出所有的partitions的总数据
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
_kafkaoffsetmetric.getvalueandreset,其实只是get,不需要reset
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
kafkapartition
反映出从kafka fetch数据的情况,fetchapilatencymax,fetchapilatencymean,fetchapicallcount 和 fetchapimessagecount
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
pm.getmetricsdatamap(),
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
更新的逻辑如下,
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
我们在读取kafka时,
首先是关心,每个partition的读取状况,这个通过取得kafkaoffset metrics就可以知道
再者,我们需要replay数据,使用high-level接口的时候可以通过系统提供的工具,这里如何搞?
看下下面的代码,
第一个if,是从配置文件里面没有读到配置的情况
第二个else if,当topologyinstanceid发生变化时,并且forcefromstart为true时,就会取startoffsettime指定的offset(latest或earliest)
这个topologyinstanceid, 每次kafkaspout对象生成的时候随机产生,
string _uuid = uuid.randomuuid().tostring();
spout对象是在topology提交时,在client端生成一次的,所以如果topology停止,再重新启动,这个id一定会发生变化
所以应该是只需要把forcefromstart设为true,再重启topology,就可以实现replay
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
代码例子
storm-kafka的文档很差,最后附上使用的例子
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)