天天看點

Storm學習筆記(三)——Storm元件詳解之Bolt、TopologyBolt消息處理者Topology拓撲

目錄

Bolt消息處理者

生命周期

開發Bolt元件

Topology拓撲

結構

運作模式

示例

Bolt消息處理者

Bolt在Storm中是一個被動的角色,它把元組作為輸入,然後産生新的元組作為輸出。Bolt可以執行過濾、函數操作、合并、寫資料庫等操作(還可以簡單地傳遞消息流,複雜的消息流往往需要很多步驟,是以需要很多Bolt來處理)。

生命周期

首先,用戶端建立Bolt,然後将其序列化為拓撲,并送出給叢集中的主機。

之後,叢集啟動Worker程序,反序列化Bolt,調用prepare方法開始處理元組。

接下來,Bolt處理Tuple,Bolt處理一個輸入Tuple,發射0個或者多個Tuple,然後調用ack通知Storm自己已經處理過這個Tuple了(Storm提供了一個IBasicBolt自動調用ack)。Bolt類接收由Spout或者其他上遊Bolt類發來的Tuple,對其進行處理。

Storm學習筆記(三)——Storm元件詳解之Bolt、TopologyBolt消息處理者Topology拓撲

在建立Bolt對象時,通過構造方法初始化成員變量,當Bolt被送出到叢集時,這些成員變量也會被序列化,是以通過反序列化,可以擷取到這些成員變量。

開發Bolt元件

開發Bolt元件的一個簡單的例子:

package storm;

import java.util.Map;
import java.util.stream.Collector;

import org.apache.storm.netty.util.internal.SystemPropertyUtil;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

/*
 * 此消息處理者用于接收Spout發來的Tuple,并将Tuple的值做列印輸出
 */
public class NumberBolt extends BaseRichBolt{

	private OutputCollector collector;
	
	/*
	 * Bolt的主要方法是execute,它以一個Tuple作為輸入
	 * Bolt使用OutputCollector來發射Tuple
	 * Bolt必須為它處理的每一個Tuple調用OutputCollector的ack方法
	 * 以通知Storm該Tuple被處理完成了,進而通知該Tuple的發射者Spout
	 */
	@Override
	public void execute(Tuple input) {
		int randomNum = input.getIntegerByField("number");
		System.out.println(randomNum);
	}

	/*
	 * prepare方法和Spout中的open方法類似,為Bolt提供了OutputCollector,用來從Bolt中發送Tuple。
	 * OutputCollector是線程安全的,并且随時都可以調用它
	 * 在Bolt中,Tuple的發送可以再Prepare、execute、cleanup等方法中進行
	 * 但是一般都是在execute中進行
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
		
	}

	/*
	 * 用于聲名目前Bolt發送的Tuple中包含的字段,和Spout中的類似
	 * Bolt可以發射多條消息流,使用OutputFieldsDeclarer.declareStream方法來定義流
	 * 之後使用OutputCollector.emit來選擇要發射的流
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream("moreThan", new Fields("It"));
		declarer.declareStream("lessThan", new Fields("It"));
	}

}
           

Topology拓撲

Storm的Topology是指類似于網絡拓撲圖的一種虛拟結構。Storm拓撲類似于MapReduce任務,一個關鍵的差別是MapReduce任務運作一段時間後最終會完成,而Storm拓撲一直運作(知道殺死它)。

結構

一個拓撲是由Spout和Bolt組成的圖,Spout和Bolt之間通過劉分組連接配接起來,見下圖。

Storm學習筆記(三)——Storm元件詳解之Bolt、TopologyBolt消息處理者Topology拓撲

Topology是由Spout、Bolt、資料載體Tuple等構成的一定規則的網絡拓撲圖。Storm提供了TopologyBuilder類來建立Topology。

運作模式

Topology運作模式有兩種:

本地模式,分布式模式

這兩種模式的接口差別很大,使用場景不同

示例

package storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

public class NumberTopology {
	public static void main(String[] args){
		//用于設定Topology相關的環境參數
		Config config = new Config();
		
		//Storm的運作有兩種模式:本地模式和叢集模式
		//本地模式:LocalCluster
		//叢集模式:StormSubmitter submitter = new StormSubmitter();
		LocalCluster cluster = new LocalCluster();
		
		//執行個體化Spout、bolt和Topologybuilder
		TopologyBuilder builder = new TopologyBuilder();
		NumberSpout numberSpout = new NumberSpout();
		NumberBolt numberBolt = new NumberBolt();
		
		builder.setSpout("number_spout", numberSpout);
		builder.setBolt("number_bolt", numberBolt).shuffleGrouping("number_spout");
		
		StormTopology topology = builder.createTopology();
		cluster.submitTopology("topology", config, topology);
	}
}
           

結果輸出如下圖:

Storm學習筆記(三)——Storm元件詳解之Bolt、TopologyBolt消息處理者Topology拓撲