1,spout編寫,讀取檔案内容:
package com.storm.test;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout{
private SpoutOutputCollector collector;
private FileReader fileReader;
private String filePath;
private boolean completed = false;
@Override
public void ack(Object msgId) {
System.out.println("msgId === "+msgId);
}
@Override
public void close() {
}
@Override
public void fail(Object msgId) {
System.out.println("fail === "+msgId);
}
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.fileReader = new FileReader(conf.get("wordFile").toString());
} catch (Exception e) {
e.printStackTrace();
}
this.filePath = conf.get("wordFile").toString();
this.collector = collector;
}
@Override
public void nextTuple() {
if(completed){
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
return ;
}
String str;
BufferedReader reader = new BufferedReader(fileReader);
try {
while ((str = reader.readLine()) != null){
System.out.println("read line = "+str);
this.collector.emit(new Values(str),str);
System.out.println("WordReader spout = "+str);
}
} catch (Exception e) {
e.printStackTrace();
}finally{
completed = true;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
2,Bolt編寫實作句子分割:
package com.storm.test;
import java.util.Map;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer extends BaseBasicBolt{
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String sentence = input.getString(0);
String[] words = sentence.split(",");
System.out.println("reader line = "+sentence);
for(String word : words){
if(!word.trim().isEmpty()){
collector.emit(new Values(word.trim().toUpperCase()));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
3,bolt編寫是單詞統計:
package com.storm.test;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
public class WordCount extends BaseBasicBolt{
Integer id;
String name;
Map counters ;
@Override
public void cleanup() {
System.out.println("word counter :["+name+"-"+id);
for(Map.Entry entry : counters.entrySet()){
System.out.println(entry.getKey()+":"+entry.getValue());
}
System.out.println("counter finish!");
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new HashMap();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String str = input.getString(0);
System.out.println("wordCounter recever "+str);
if(!counters.containsKey(str)){
counters.put(str,1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str,c);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
4,main方法編寫:
package com.storm.test;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class ToplogyTest {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-count",new WordCount(),1).fieldsGrouping("word-normalizer",new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
//word.txt:hello,world,hello,storm,hello,spark,hadoop,hadoop
conf.put("wordFile", "/home/lixun/word.txt");
//conf.setDebug(true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounterTopology",conf,builder.createTopology());
Thread.sleep(4000);
cluster.killTopology("wordCounterTopology");
cluster.shutdown();
}
}