天天看點

cluster模式下storm kill topology時做cleanup的解決方法1.背景2.解決方案3.code&result4.注意事項

[時間是讓人猝不及防的東西,晴時有風陰時有雨]

1.背景

在bolt中,需要在topology被關閉前執行某個操作,而根據官方文檔:

The 

cleanup

 method is called when a Bolt is being shutdown and should cleanup any resources that were opened. There's no guarantee that this method will be called on the cluster: for example, if the machine the task is running on blows up, there's no way to invoke the method. The 

cleanup

 method is intended for when you run topologies in  local mode (where a Storm cluster is simulated in process), and you want to be able to run and kill many topologies without suffering any resource leaks.

cleanup方法并不可靠,它隻在local mode下生效。

2.解決方案

在killing a topology之前,需要先deactivate相應的topology,然後處理未完成的message。可以調用Spout.deactivate()方法,傳給bolt一個特殊的tuple,在bolt處檢查該特殊tuple,一旦收到執行需要執行的操作。

tuple的特殊性可以通過tuple的stream來區分。

3.code&result

下面的代碼中,正常的消息都來自kafkaSpout,對于關閉的資訊,單獨寫了一個spout叫MySpout來處理。

topology部分:增加單獨的Spout

public StormTopology buildTopology(Map map) {
    String zkServer = map.get("zookeeper").toString();
    System.out.println("zkServer: " + zkServer);
    final BrokerHosts zkHosts = new ZkHosts(zkServer);
    SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, Constants.KAFKA_TOPIC, "/test", "shutdown-test");
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("kafkaSpout", new KafkaSpout(kafkaConfig), SPOUT_PARALLELISM_HINT);
    builder.setSpout("shutDownSpout", new MySpout());
    builder.setBolt("parseBolt", new ParseBolt(), PARSE_BOLT_PARALLELISM_HINT).
            shuffleGrouping("kafkaSpout").allGrouping("shutDownSpout", "stop");
    builder.setBolt("ToDruidBolt", new BeamBolt(new ToDruidBolt()), STAT_BOLT_PARALLELISM_HINT).shuffleGrouping("parseBolt");
    return builder.createTopology();
}
           

MySpout部分:重點在于deactivate方法和declareOutputFields方法,後者的 message.declareStream方法标記了topology deactivate前發送的tuple的stream,用于在bolt中與普通tuple做差別。

public class MySpout extends BaseRichSpout {
    private static final Logger logger = LoggerFactory.getLogger(MySpout.class);
    private SpoutOutputCollector _collector;

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        logger.info("shutdown spout open function called");
        _collector = collector;
    }

    public void activate() {
        logger.info("shutdown spout activate function called");
    }

    public void deactivate() {
        logger.info("shutdown deaactivate to spout and bolt");
        try {
            String mes = "shutDown";
            long id = 11111111111111111L;
            _collector.emit("stop", new Values(mes), id);
            //Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void nextTuple() {
        try {
            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer message) {
        message.declareStream("stop", new Fields("stop"));
    }

    public void ack(Object msgId) {
        logger.info("shutDown spout ack, msId " + msgId);
    }

    public void fail(Object msgId) {
        logger.error("shutDown spout fail, msId " + msgId);
    }
}
           

ParseBolt部分:

if ( tuple.getSourceStreamId().equals("stop") ) {
    System.out.println("==========clear=============");
    flush();
    return;
}
public void flush() {
    //some things to do before topology will be killed
}
           

4.注意事項

(1)kill topology時,建議輸入的等待時間盡量長,有時時間過短消息來不及傳遞,會導緻該方法失效

(2)對接spout的bolt一般不隻一個,需要用allGrouping政策來確定這些bolt都收到消息

參考:

http://storm.apache.org/releases/current/Tutorial.html http://storm.apache.org/releases/current/Lifecycle-of-a-topology.html http://stackoverflow.com/questions/32117845/how-to-call-a-particular-method-before-killing-a-storm-topology