public class TopoMain {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new MessageSpout());
builder.setBolt("bolt-1", new SpliterBolt()).shuffleGrouping("spout");
builder.setBolt("bolt-2", new FileWriterBolt()).shuffleGrouping("bolt-1");
Config conf = new Config();
conf.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("reliability", conf, builder.createTopology());
}
}
MessageSpout.java
public class MessageSpout implements IRichSpout {
private static final long serialVersionUID = -4664068313075450186L;
private int index = 0;
private String[] lines;
private SpoutOutputCollector collector;
public MessageSpout(){
lines = new String[]{
"0,zero",
"1,one",
"2,two",
"3,three",
"4,four",
"5,five",
"6,six",
"7,seven",
"8,eight",
"9,nine"
};
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
if(index < lines.length){
String l = lines[index];
collector.emit(new Values(l), index);
index++;
}
}
@Override
public void ack(Object msgId) {
System.out.println("message sends successfully (msgId = " + msgId +")");
}
@Override
public void fail(Object msgId) {
System.out.println("error : message sends unsuccessfully (msgId = " + msgId +")");
System.out.println("resending...");
collector.emit(new Values(lines[(Integer) msgId]), msgId);
System.out.println("resend successfully");
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
SpliterBolt.java
public class SpliterBolt implements IRichBolt {
private static final long serialVersionUID = 6266473268990329206L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void execute(Tuple input) {
String line = input.getString(0);
String[] words = line.split(",");
for (String word : words) {
collector.emit(input, new Values(word));
}
collector.ack(input);
}
@Override
public void cleanup() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
FileWriterBolt.java
public class FileWriterBolt implements IRichBolt {
private static final long serialVersionUID = -8619029556495402143L;
private FileWriter writer;
private OutputCollector collector;
private int count = 0;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
try {
writer = new FileWriter("e://reliability.txt");
} catch (IOException e) {
}
}
@Override
public void execute(Tuple input) {
String word = input.getString(0);
if (count == 5) {
collector.fail(input);
} else {
try {
writer.write(word);
writer.write("\r\n");
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}
collector.emit(input, new Values(word));
collector.ack(input);
}
count++;
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}