天天看點

storm保證消息可靠性

public class TopoMain {

    public static void main(String[] args) {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new MessageSpout());

        builder.setBolt("bolt-1", new SpliterBolt()).shuffleGrouping("spout");

        builder.setBolt("bolt-2", new FileWriterBolt()).shuffleGrouping("bolt-1");

        Config conf = new Config();

        conf.setDebug(false);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("reliability", conf, builder.createTopology());

    }

}

MessageSpout.java

public class MessageSpout implements IRichSpout {

    private static final long serialVersionUID = -4664068313075450186L;

    private int index = 0;

    private String[] lines;

    private SpoutOutputCollector collector;

    public MessageSpout(){

        lines = new String[]{

                "0,zero",

                "1,one",

                "2,two",

                "3,three",

                "4,four",

                "5,five",

                "6,six",

                "7,seven",

                "8,eight",

                "9,nine"

        };

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("line"));

    }

    @Override

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

        this.collector = collector;

    }

    @Override

    public void nextTuple() {

        if(index < lines.length){

            String l = lines[index];

            collector.emit(new Values(l), index);

            index++;

        }

    }

    @Override

    public void ack(Object msgId) {

        System.out.println("message sends successfully (msgId = " + msgId +")");

    }

    @Override

    public void fail(Object msgId) {

        System.out.println("error : message sends unsuccessfully (msgId = " + msgId +")");

        System.out.println("resending...");

        collector.emit(new Values(lines[(Integer) msgId]), msgId);

        System.out.println("resend successfully");

    }

    @Override

    public void close() {

    }

    @Override

    public void activate() {

    }

    @Override

    public void deactivate() {

    }

    @Override

    public Map<String, Object> getComponentConfiguration() {

        return null;

    }

}

SpliterBolt.java

public class SpliterBolt implements IRichBolt {

    private static final long serialVersionUID = 6266473268990329206L;

    private OutputCollector collector;

    @Override

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        this.collector = collector;

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }

    @Override

    public void execute(Tuple input) {

        String line = input.getString(0);

        String[] words = line.split(",");

        for (String word : words) {

            collector.emit(input, new Values(word));

        }

        collector.ack(input);

    }

    @Override

    public void cleanup() {

    }

    @Override

    public Map<String, Object> getComponentConfiguration() {

        return null;

    }

}

FileWriterBolt.java

public class FileWriterBolt implements IRichBolt {

    private static final long serialVersionUID = -8619029556495402143L;

    private FileWriter writer;

    private OutputCollector collector;

    private int count = 0;

    @Override

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        this.collector = collector;

        try {

            writer = new FileWriter("e://reliability.txt");

        } catch (IOException e) {

        }

    }

    @Override

    public void execute(Tuple input) {

        String word = input.getString(0);

        if (count == 5) {

            collector.fail(input);

        } else {

            try {

                writer.write(word);

                writer.write("\r\n");

                writer.flush();

            } catch (IOException e) {

                e.printStackTrace();

            }

            collector.emit(input, new Values(word));

            collector.ack(input);

        }

        count++;

    }

    @Override

    public void cleanup() {

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override

    public Map<String, Object> getComponentConfiguration() {

        return null;

    }

}

繼續閱讀