轉自:http://www.cnblogs.com/linjiqin/archive/2013/05/28/3104016.html
本節探讨一下storm具體怎麼使用,明白怎麼在windows下開發storm程式。
功能描述:實時随機輸出一字元串。
在開發前記得導入storm需要的jar包。
1、SimpleSpout類繼承BaseRichSpout類,用來産生資料并且向topology裡面發出消息:tuple。
package com.ljq.helloword;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* Spout起到和外界溝通的作用,他可以從一個資料庫中按照某種規則取資料,也可以從分布式隊列中取任務
*
* @author Administrator
*
*/
@SuppressWarnings("serial")
public class SimpleSpout extends BaseRichSpout{
//用來發射資料的工具類
private SpoutOutputCollector collector;
private static String[] info = new String[]{
"comaple\t,12424,44w46,654,12424,44w46,654,",
"lisi\t,435435,6537,12424,44w46,654,",
"lipeng\t,45735,6757,12424,44w46,654,",
"hujintao\t,45735,6757,12424,44w46,654,",
"jiangmin\t,23545,6457,2455,7576,qr44453",
"beijing\t,435435,6537,12424,44w46,654,",
"xiaoming\t,46654,8579,w3675,85877,077998,",
"xiaozhang\t,9789,788,97978,656,345235,09889,",
"ceo\t,46654,8579,w3675,85877,077998,",
"cto\t,46654,8579,w3675,85877,077998,",
"zhansan\t,46654,8579,w3675,85877,077998,"};
Random random=new Random();
/**
* 初始化collector
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 在SpoutTracker類中被調用,每調用一次就可以向storm叢集中發射一條資料(一個tuple元組),該方法會被不停的調用
*/
@Override
public void nextTuple() {
try {
String msg = info[random.nextInt(11)];
// 調用發射方法
collector.emit(new Values(msg));
// 模拟等待100ms
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 定義字段id,該id在簡單模式下沒有用處,但在按照字段分組的模式下有很大的用處。
* 該declarer變量有很大作用,我們還可以調用declarer.declareStream();來定義stramId,該id可以用來定義更加複雜的流拓撲結構
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source")); //collector.emit(new Values(msg));參數要對應
}
}
2、SimpleBolt類繼承BaseBasicBolt類,處理一個輸入tuple。
package com.ljq.helloword;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 接收噴發節點(Spout)發送的資料進行簡單的處理後,發射出去。
*
* @author Administrator
*
*/
@SuppressWarnings("serial")
public class SimpleBolt extends BaseBasicBolt {
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String msg = input.getString(0);
if (msg != null){
//System.out.println("msg="+msg);
collector.emit(new Values(msg + "msg is processed!"));
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("info"));
}
}
3、SimpleTopology類包含一個main函數,是Storm程式執行的入口點,包括一個資料噴發節點spout和一個資料處理節點bolt。
package com.ljq.helloword;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
/**
* 定義了一個簡單的topology,包括一個資料噴發節點spout和一個資料處理節點bolt。
*
* @author Administrator
*
*/
public class SimpleTopology {
public static void main(String[] args) {
try {
// 執行個體化TopologyBuilder類。
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 設定噴發節點并配置設定并發數,該并發數将會控制該對象在叢集中的線程數。
topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1);
// 設定資料處理節點并配置設定并發數。指定該節點接收噴發節點的政策為随機方式。
topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout");
Config config = new Config();
config.setDebug(true);
if (args != null && args.length > 0) {
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
} else {
// 這裡是本地模式下運作的啟動代碼。
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("simple", config, topologyBuilder.createTopology());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}