天天看點

Storm Bolt之定時機制Tick應用

       Storm中有一種内置的定時機制Storm Bolt之Tick,可以在任何bolt的task每個一定時間(支援通過使用者自定義配置)收到來自System Id的tick tuple。Bolt在收到這樣的tuple後,根據業務需求完成相應的處理流程。

     在建構topology時,調用addConfiguration()方法,完成對tick時間間隔的設定,如下:

public static void main(String[] args) throws Exception {
        //開始組合bolt和spout的元件
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("Test_Spout", new TestSpout(TopoLoad.getKafkaSpoutConfig()), conf.getInt(SPOUT_PARALLELISN_HINT_INT));
        builder.setBolt("Test_Bolt", new TestBolt(), 4).localOrShuffleGrouping("Test_Spout")
        .addConfiguration(TOPOLOGY_TICK_TUPLE_FREQ_SECS,10);
}
           

 在具體的Bolt中,使用者的Bolt需要繼承BaseRichBolt/BaseBasicBolt,在Bolt中重寫getComponentConfiguration()方法,設定tick時間間隔。并在bolt的execute(Tuple tuple)方法中調用TupleUtils.isTick(input)判斷tuple是否為tick tuple,相應進行業務處理。

public class TestBolt extends BaseRichBolt {
    int flushIntervalSecs = 1;
    int batchSize = 1000;
    private String streamFieldName;
    OutputCollector collector;

    public TestBolt(String streamFieldName) {
        this.streamFieldName = streamFieldName;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        if (!TupleUtils.isTick(input)) {//如果收到的tuple不是tick的tuple,不處理
             //業務處理
        }else{
             //業務處理
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
    }

    public MySqlBolt withBatchSize(int batchSize) {
        this.batchSize = batchSize;
        return this;
    }

    public MySqlBolt withFlushIntervalSecs(int flushIntervalSecs) {
        this.flushIntervalSecs = flushIntervalSecs;
        return this;
    }
}
           

繼續閱讀