storm 實戰及執行個體講解(三)
——comaple.zhang
——2012-09-13
本講将接着上一講,把一個完成的topology完成。上一節主要介紹了一個基本的topology的構造過程,以及每一步所對應的storm叢集中配置設定的資源情況。要想開發storm應用必須對上一講我提到的那些概念有完全的了解,否則開發出來的應用很有可能有這樣那樣的問題而無法工作。那麼接下來我們來一起定義一個spot節點和bolt節點。
spot節點:在實際的開發中這個節點可以起到和外界溝通的作用,他可以從一個資料庫中按照某種規則取資料,也可以從分布式隊列中取任務(該方式我會在後續章節中談到)。這裡我們将開發一個簡單的模拟資料噴發的節點。具體方式見代碼:
package com.jd.comaple.storm.test.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
public class SimpleSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private static String[] info = new String[]{
"comaple\t,12424,44w46,654,12424,44w46,654,",
"lisi\t,435435,6537,12424,44w46,654,",
"lipeng\t,45735,6757,12424,44w46,654,",
"hujintao\t,45735,6757,12424,44w46,654,",
"jiangmin\t,23545,6457,2455,7576,qr44453",
"beijing\t,435435,6537,12424,44w46,654,",
"xiaoming\t,46654,8579,w3675,85877,077998,",
"xiaozhang\t,9789,788,97978,656,345235,09889,",
"ceo\t,46654,8579,w3675,85877,077998,",
"cto\t,46654,8579,w3675,85877,077998,",
"zhansan\t,46654,8579,w3675,85877,077998,"};
Random rd = new Random();
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
try {
String msg = info[rd.nextInt(10)];
//調用發射方法
collector.emit(new Values(msg));
//模拟等待100ms
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source"));
}
}
bolt節點: 處理節點,該節點接收噴發節點發送的資料進行簡單的處理後,發射出去。
package com.jd.comaple.storm.test.bolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SimpleBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields( "info"));
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String mesg = input.getString(0);
if (mesg != null)
collector.emit(new Values( mesg+"mesg is processed!"));
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}