Storm的一個簡單例子
- Storm的一個簡單例子
-
- maven依賴
-
- 相關類
- LogAnalyserStorm
- FakeCallLogReaderSpout
- CallLogCreatorBolt
- CallLogCounterBolt
Storm的一個簡單例子
maven依賴
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.3</version>
</dependency>
相關類
- LogAnalyserStorm.java
- FakeCallLogReaderSpout.java
- CallLogCreatorBolt.java
- CallLogCounterBolt.java
LogAnalyserStorm
package com.storm.demo;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class LogAnalyserStorm {
public static void main(String... args) throws Exception{
Config config = new Config();
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder
.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder
.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
FakeCallLogReaderSpout
package com.storm.demo;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class FakeCallLogReaderSpout implements IRichSpout {
private SpoutOutputCollector collector;
private boolean completed = false;
private TopologyContext context;
private Random randomGenerator = new Random();
private Integer idx = 0;
/**
* 為Spout提供執行環境,執行器将運作此方法來初始化噴頭
* @param map 為此Spout提供storm配置
* @param topologyContext 提供有關拓撲中的Spout位置,其任務ID,輸入和輸出資訊的完整資訊
* @param spoutOutputCollector 使我們能夠發出将由bolts處理的元祖
*/
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.context = topologyContext;
this.collector = spoutOutputCollector;
}
/**
* 當Spout将要關閉時調用此方法
*/
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
/**
* 通過收集器發出生成的資料
*/
@Override
public void nextTuple() {
if (this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<>();
mobileNumbers.add("13805194441");
mobileNumbers.add("13805194442");
mobileNumbers.add("13805194443");
mobileNumbers.add("13805194444");
Integer localIndex = 0;
while (localIndex++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while (fromMobileNumber == toMobileNumber){
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
/**
* 确認處理了特定元祖
* @param o
*/
@Override
public void ack(Object o) {
}
/**
* 指定不處理和不重新處理特定元祖
* @param o
*/
@Override
public void fail(Object o) {
}
/**
* 聲明元祖的輸出模式
* @param outputFieldsDeclarer 用于聲明輸出流ID,輸出字段等
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("from", "to", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
CallLogCreatorBolt
package com.storm.demo;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class CallLogCreatorBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
CallLogCounterBolt
package com.storm.demo;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.counterMap = new HashMap<>();
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if (!counterMap.containsKey(call)) {
counterMap.put(call, 1);
} else {
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
counterMap.entrySet().forEach(f -> {
System.out.println(f.getKey() + " : " + f.getValue());
});
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("call"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}