目錄
Bolt消息處理者
生命周期
開發Bolt元件
Topology拓撲
結構
運作模式
示例
Bolt消息處理者
Bolt在Storm中是一個被動的角色,它把元組作為輸入,然後産生新的元組作為輸出。Bolt可以執行過濾、函數操作、合并、寫資料庫等操作(還可以簡單地傳遞消息流,複雜的消息流往往需要很多步驟,是以需要很多Bolt來處理)。
生命周期
首先,用戶端建立Bolt,然後将其序列化為拓撲,并送出給叢集中的主機。
之後,叢集啟動Worker程序,反序列化Bolt,調用prepare方法開始處理元組。
接下來,Bolt處理Tuple,Bolt處理一個輸入Tuple,發射0個或者多個Tuple,然後調用ack通知Storm自己已經處理過這個Tuple了(Storm提供了一個IBasicBolt自動調用ack)。Bolt類接收由Spout或者其他上遊Bolt類發來的Tuple,對其進行處理。
在建立Bolt對象時,通過構造方法初始化成員變量,當Bolt被送出到叢集時,這些成員變量也會被序列化,是以通過反序列化,可以擷取到這些成員變量。
開發Bolt元件
開發Bolt元件的一個簡單的例子:
package storm;
import java.util.Map;
import java.util.stream.Collector;
import org.apache.storm.netty.util.internal.SystemPropertyUtil;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
/*
* 此消息處理者用于接收Spout發來的Tuple,并将Tuple的值做列印輸出
*/
public class NumberBolt extends BaseRichBolt{
private OutputCollector collector;
/*
* Bolt的主要方法是execute,它以一個Tuple作為輸入
* Bolt使用OutputCollector來發射Tuple
* Bolt必須為它處理的每一個Tuple調用OutputCollector的ack方法
* 以通知Storm該Tuple被處理完成了,進而通知該Tuple的發射者Spout
*/
@Override
public void execute(Tuple input) {
int randomNum = input.getIntegerByField("number");
System.out.println(randomNum);
}
/*
* prepare方法和Spout中的open方法類似,為Bolt提供了OutputCollector,用來從Bolt中發送Tuple。
* OutputCollector是線程安全的,并且随時都可以調用它
* 在Bolt中,Tuple的發送可以再Prepare、execute、cleanup等方法中進行
* 但是一般都是在execute中進行
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/*
* 用于聲名目前Bolt發送的Tuple中包含的字段,和Spout中的類似
* Bolt可以發射多條消息流,使用OutputFieldsDeclarer.declareStream方法來定義流
* 之後使用OutputCollector.emit來選擇要發射的流
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("moreThan", new Fields("It"));
declarer.declareStream("lessThan", new Fields("It"));
}
}
Topology拓撲
Storm的Topology是指類似于網絡拓撲圖的一種虛拟結構。Storm拓撲類似于MapReduce任務,一個關鍵的差別是MapReduce任務運作一段時間後最終會完成,而Storm拓撲一直運作(知道殺死它)。
結構
一個拓撲是由Spout和Bolt組成的圖,Spout和Bolt之間通過劉分組連接配接起來,見下圖。
Topology是由Spout、Bolt、資料載體Tuple等構成的一定規則的網絡拓撲圖。Storm提供了TopologyBuilder類來建立Topology。
運作模式
Topology運作模式有兩種:
本地模式,分布式模式
這兩種模式的接口差別很大,使用場景不同
示例
package storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
public class NumberTopology {
public static void main(String[] args){
//用于設定Topology相關的環境參數
Config config = new Config();
//Storm的運作有兩種模式:本地模式和叢集模式
//本地模式:LocalCluster
//叢集模式:StormSubmitter submitter = new StormSubmitter();
LocalCluster cluster = new LocalCluster();
//執行個體化Spout、bolt和Topologybuilder
TopologyBuilder builder = new TopologyBuilder();
NumberSpout numberSpout = new NumberSpout();
NumberBolt numberBolt = new NumberBolt();
builder.setSpout("number_spout", numberSpout);
builder.setBolt("number_bolt", numberBolt).shuffleGrouping("number_spout");
StormTopology topology = builder.createTopology();
cluster.submitTopology("topology", config, topology);
}
}
結果輸出如下圖: