目录
Bolt消息处理者
生命周期
开发Bolt组件
Topology拓扑
结构
运行模式
示例
Bolt消息处理者
Bolt在Storm中是一个被动的角色,它把元组作为输入,然后产生新的元组作为输出。Bolt可以执行过滤、函数操作、合并、写数据库等操作(还可以简单地传递消息流,复杂的消息流往往需要很多步骤,因此需要很多Bolt来处理)。
生命周期
首先,客户端创建Bolt,然后将其序列化为拓扑,并提交给集群中的主机。
之后,集群启动Worker进程,反序列化Bolt,调用prepare方法开始处理元组。
接下来,Bolt处理Tuple,Bolt处理一个输入Tuple,发射0个或者多个Tuple,然后调用ack通知Storm自己已经处理过这个Tuple了(Storm提供了一个IBasicBolt自动调用ack)。Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。
在创建Bolt对象时,通过构造方法初始化成员变量,当Bolt被提交到集群时,这些成员变量也会被序列化,所以通过反序列化,可以获取到这些成员变量。
开发Bolt组件
开发Bolt组件的一个简单的例子:
package storm;
import java.util.Map;
import java.util.stream.Collector;
import org.apache.storm.netty.util.internal.SystemPropertyUtil;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
/*
* 此消息处理者用于接收Spout发来的Tuple,并将Tuple的值做打印输出
*/
public class NumberBolt extends BaseRichBolt{
private OutputCollector collector;
/*
* Bolt的主要方法是execute,它以一个Tuple作为输入
* Bolt使用OutputCollector来发射Tuple
* Bolt必须为它处理的每一个Tuple调用OutputCollector的ack方法
* 以通知Storm该Tuple被处理完成了,从而通知该Tuple的发射者Spout
*/
@Override
public void execute(Tuple input) {
int randomNum = input.getIntegerByField("number");
System.out.println(randomNum);
}
/*
* prepare方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。
* OutputCollector是线程安全的,并且随时都可以调用它
* 在Bolt中,Tuple的发送可以再Prepare、execute、cleanup等方法中进行
* 但是一般都是在execute中进行
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/*
* 用于声名当前Bolt发送的Tuple中包含的字段,和Spout中的类似
* Bolt可以发射多条消息流,使用OutputFieldsDeclarer.declareStream方法来定义流
* 之后使用OutputCollector.emit来选择要发射的流
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("moreThan", new Fields("It"));
declarer.declareStream("lessThan", new Fields("It"));
}
}
Topology拓扑
Storm的Topology是指类似于网络拓扑图的一种虚拟结构。Storm拓扑类似于MapReduce任务,一个关键的区别是MapReduce任务运行一段时间后最终会完成,而Storm拓扑一直运行(知道杀死它)。
结构
一个拓扑是由Spout和Bolt组成的图,Spout和Bolt之间通过刘分组连接起来,见下图。
Topology是由Spout、Bolt、数据载体Tuple等构成的一定规则的网络拓扑图。Storm提供了TopologyBuilder类来创建Topology。
运行模式
Topology运行模式有两种:
本地模式,分布式模式
这两种模式的接口区别很大,使用场景不同
示例
package storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
public class NumberTopology {
public static void main(String[] args){
//用于设置Topology相关的环境参数
Config config = new Config();
//Storm的运行有两种模式:本地模式和集群模式
//本地模式:LocalCluster
//集群模式:StormSubmitter submitter = new StormSubmitter();
LocalCluster cluster = new LocalCluster();
//实例化Spout、bolt和Topologybuilder
TopologyBuilder builder = new TopologyBuilder();
NumberSpout numberSpout = new NumberSpout();
NumberBolt numberBolt = new NumberBolt();
builder.setSpout("number_spout", numberSpout);
builder.setBolt("number_bolt", numberBolt).shuffleGrouping("number_spout");
StormTopology topology = builder.createTopology();
cluster.submitTopology("topology", config, topology);
}
}
结果输出如下图: