功能描述
從資料中随機選取語句(ChooseSpout)
将句子中的單詞分開(SplitSentence)
統計單詞(WordCount)
-----------------------------------------
項目采用 maven建構
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm.book</groupId>
<artifactId>Getting-Started</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<compilerVersion>1.6</compilerVersion>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<!-- Repository where we can found the storm dependencies -->
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<!-- Storm Dependency -->
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.7.1</version>
</dependency>
</dependencies>
</project>
資料源spout
package storm;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
/**
* 定義資料源spout
*/
public class ChooseSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private Random random;
private static String[] sentences = new String[] {"My name is alei","I am a java engineer"};
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}
@Override
public void nextTuple() {
Utils.sleep(10);
String toSay = sentences[random.nextInt(sentences.length)];
this.collector.emit(new Values(toSay));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
package storm;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* 單詞分割
*/
public class SplitSentence implements IBasicBolt{
Logger LOG= LoggerFactory.getLogger(SplitSentence.class);
@Override
public void prepare(Map map, TopologyContext topologyContext) {
LOG.info("執行prepare");
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence=tuple.getString(0);
for (String word:sentence.split(" ")){
collector.emit(new Values(word));
}
}
@Override
public void cleanup() {
LOG.info("執行cleanup");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
統計
package storm;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* 單詞統計
*/
public class WordCount implements IBasicBolt{
Logger LOG= LoggerFactory.getLogger(WordCount.class);
private Map<String,Integer> _counts=new HashMap<String, Integer>();
@Override
public void prepare(Map map, TopologyContext topologyContext) {
LOG.info("prepare");
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String word=tuple.getString(0);
int count;
if(_counts.containsKey(word)){
count=_counts.get(word);
}else{
count=0;
}
count++;
_counts.put(word,count);
basicOutputCollector.emit(new Values(word,count));
}
@Override
public void cleanup() {
LOG.info("---------------------------列印統計結果---------------------------");
for(Map.Entry<String, Integer> entry : _counts.entrySet()){
LOG.info("key={} \t value={}",entry.getKey(),entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word","count"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Topology 測試
package storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
public class CountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("word-choose",new ChooseSpout());//資料源-随機選取語句
builder.setBolt("word-split",new SplitSentence(),10).
shuffleGrouping("word-choose");
builder.setBolt("word-counter",new WordCount(),20).
fieldsGrouping("word-split",new Fields("word"));
Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
//本地送出測試
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}