天天看点

storm消费kafka实现实时计算

storm消费kafka实现实时计算

* 每个应用实例部署一个日志agent

* agent实时将日志发送到kafka

* storm实时计算日志

* storm计算结果保存到hbase

创建实时计算项目并引入storm和kafka相关的依赖

创建消费kafka的spout,直接用storm提供的KafkaSpout即可。

创建处理从kafka读取数据的Bolt,JsonBolt负责解析kafka读取到的json并发送到下个Bolt进一步处理(下一步处理的Bolt不再写,只要继承BaseRichBolt就可以对tuple处理)。

创建拓扑MyTopology,先配置好KafkaSpout的配置SpoutConfig,其中zk的地址端口和根节点,将id为KAFKA_SPOUT_ID的spout通过shuffleGrouping关联到jsonBolt对象。

本地测试时直接不带运行参数运行即可,放到集群是需带拓扑名称作为参数。

另外需要注意的是:KafkaSpout默认从上次运行停止时的位置开始继续消费,即不会从头开始消费一遍,因为KafkaSpout默认每2秒钟会提交一次kafka的offset位置到zk上,如果要每次运行都从头开始消费可以通过配置实现。

========广告时间========

<a href="http://blog.csdn.net/wangyangzhizhou/article/details/74080321">为什么写《Tomcat内核设计剖析》</a>

=========================

欢迎关注:

storm消费kafka实现实时计算