天天看点

Storm学习笔记(三)——Storm组件详解之Bolt、TopologyBolt消息处理者Topology拓扑

目录

Bolt消息处理者

生命周期

开发Bolt组件

Topology拓扑

结构

运行模式

示例

Bolt消息处理者

Bolt在Storm中是一个被动的角色,它把元组作为输入,然后产生新的元组作为输出。Bolt可以执行过滤、函数操作、合并、写数据库等操作(还可以简单地传递消息流,复杂的消息流往往需要很多步骤,因此需要很多Bolt来处理)。

生命周期

首先,客户端创建Bolt,然后将其序列化为拓扑,并提交给集群中的主机。

之后,集群启动Worker进程,反序列化Bolt,调用prepare方法开始处理元组。

接下来,Bolt处理Tuple,Bolt处理一个输入Tuple,发射0个或者多个Tuple,然后调用ack通知Storm自己已经处理过这个Tuple了(Storm提供了一个IBasicBolt自动调用ack)。Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。

Storm学习笔记(三)——Storm组件详解之Bolt、TopologyBolt消息处理者Topology拓扑

在创建Bolt对象时,通过构造方法初始化成员变量,当Bolt被提交到集群时,这些成员变量也会被序列化,所以通过反序列化,可以获取到这些成员变量。

开发Bolt组件

开发Bolt组件的一个简单的例子:

package storm;

import java.util.Map;
import java.util.stream.Collector;

import org.apache.storm.netty.util.internal.SystemPropertyUtil;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

/*
 * 此消息处理者用于接收Spout发来的Tuple,并将Tuple的值做打印输出
 */
public class NumberBolt extends BaseRichBolt{

	private OutputCollector collector;
	
	/*
	 * Bolt的主要方法是execute,它以一个Tuple作为输入
	 * Bolt使用OutputCollector来发射Tuple
	 * Bolt必须为它处理的每一个Tuple调用OutputCollector的ack方法
	 * 以通知Storm该Tuple被处理完成了,从而通知该Tuple的发射者Spout
	 */
	@Override
	public void execute(Tuple input) {
		int randomNum = input.getIntegerByField("number");
		System.out.println(randomNum);
	}

	/*
	 * prepare方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。
	 * OutputCollector是线程安全的,并且随时都可以调用它
	 * 在Bolt中,Tuple的发送可以再Prepare、execute、cleanup等方法中进行
	 * 但是一般都是在execute中进行
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
		
	}

	/*
	 * 用于声名当前Bolt发送的Tuple中包含的字段,和Spout中的类似
	 * Bolt可以发射多条消息流,使用OutputFieldsDeclarer.declareStream方法来定义流
	 * 之后使用OutputCollector.emit来选择要发射的流
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream("moreThan", new Fields("It"));
		declarer.declareStream("lessThan", new Fields("It"));
	}

}
           

Topology拓扑

Storm的Topology是指类似于网络拓扑图的一种虚拟结构。Storm拓扑类似于MapReduce任务,一个关键的区别是MapReduce任务运行一段时间后最终会完成,而Storm拓扑一直运行(知道杀死它)。

结构

一个拓扑是由Spout和Bolt组成的图,Spout和Bolt之间通过刘分组连接起来,见下图。

Storm学习笔记(三)——Storm组件详解之Bolt、TopologyBolt消息处理者Topology拓扑

Topology是由Spout、Bolt、数据载体Tuple等构成的一定规则的网络拓扑图。Storm提供了TopologyBuilder类来创建Topology。

运行模式

Topology运行模式有两种:

本地模式,分布式模式

这两种模式的接口区别很大,使用场景不同

示例

package storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

public class NumberTopology {
	public static void main(String[] args){
		//用于设置Topology相关的环境参数
		Config config = new Config();
		
		//Storm的运行有两种模式:本地模式和集群模式
		//本地模式:LocalCluster
		//集群模式:StormSubmitter submitter = new StormSubmitter();
		LocalCluster cluster = new LocalCluster();
		
		//实例化Spout、bolt和Topologybuilder
		TopologyBuilder builder = new TopologyBuilder();
		NumberSpout numberSpout = new NumberSpout();
		NumberBolt numberBolt = new NumberBolt();
		
		builder.setSpout("number_spout", numberSpout);
		builder.setBolt("number_bolt", numberBolt).shuffleGrouping("number_spout");
		
		StormTopology topology = builder.createTopology();
		cluster.submitTopology("topology", config, topology);
	}
}
           

结果输出如下图:

Storm学习笔记(三)——Storm组件详解之Bolt、TopologyBolt消息处理者Topology拓扑