天天看點

Storm的ack原理和使用

Storm的ack原理

通過Ack機制,spout發送出去的每一條消息,都可以确定是被成功處理或失敗處理, 進而可以讓開發者采取動作。比如在Meta中,成功被處理,即可更新偏移量,當失敗時,重複發送資料。

是以,通過Ack機制,很容易做到保證所有資料均被處理,一條都不漏。

另外需要注意的,當spout觸發fail動作時,不會自動重發失敗的tuple,需要spout自己重新擷取資料,手動重新再發送一次

ack機制即, spout發送的每一條消息,

  • 在規定的時間内,spout收到Acker的ack響應,即認為該tuple 被後續bolt成功處理
  • 在規定的時間内,沒有收到Acker的ack響應tuple,就觸發fail動作,即認為該tuple處理失敗,
  • 或者收到Acker發送的fail響應tuple,也認為失敗,觸發fail動作

另外Ack機制還常用于限流作用: 為了避免spout發送資料太快,而bolt處理太慢,常常設定pending數,當spout有等于或超過pending數的tuple沒有收到ack或fail響應時,跳過執行nextTuple,進而限制spout發送資料。

通過conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);設定spout pend數。

這個timeout時間可以通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來設定。Timeout的預設時長為30秒

實作過程

torm 系統中有一組叫做"acker"的特殊的任務,它們負責跟蹤DAG(有向無環圖)中的每個消息。

acker任務儲存了spout id到一對值的映射。第一個值就是spout的任務id,通過這個id,acker就知道消息處理完成時該通知哪個spout任務。第二個值是一個64bit的數字,我們稱之為"ack val", 它是樹中所有消息的随機id的異或計算結果。

<TaskId,<RootId,ackValue>>

Spoutid,<系統生成的id,ackValue>

Task-0,64bit,0

ack val表示了整棵樹的的狀态,無論這棵樹多大,隻需要這個固定大小的數字就可以跟蹤整棵樹。當消息被建立和被應答的時候都會有相同的消息id發送過來做異或。 每當acker發現一棵樹的ack val值為0的時候,它就知道這棵樹已經被完全處理了

Storm的ack原理和使用

開啟ack機制

spout 在發送資料的時候帶上msgid

設定acker數至少大于0;Config.setNumAckers(conf, ackerParal);

在bolt中完成處理tuple時,執行OutputCollector.ack(tuple), 當失敗處理時,執行OutputCollector.fail(tuple);

推薦使用IBasicBolt, 因為IBasicBolt 自動封裝了OutputCollector.ack(tuple), 處理失敗時,請抛出FailedException,則自動執行OutputCollector.fail(tuple)

關閉Ack機制

spout發送資料是不帶上msgid

或者設定acker數等于0

代碼操作

方式一: 手動實作

  1. 編寫spout類

    實作ack機制需要spout類重寫ack()方法和fail()方法

    還要在nextTuple()方法向外發送資料時綁定一個唯一id

    當spout發送的一條資料被完整處理, storm會調用ack()方法

    當spout發送的一條資料被标記為處理失敗, storm會調用fail()方法

    public class MySpout extends BaseRichSpout {
        private SpoutOutputCollector spoutOutputCollector;
        //建立一個map用來儲存資料
        private Map<String, String> msgBuffer = new HashMap<String, String>();
    
        /**
         * 初始化方法
         * @param map
         * @param topologyContext
         * @param spoutOutputCollector
         */
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }
    
        /**
         * 資料發送
         */
        public void nextTuple() {
            //生成一個唯一編号
            String msgId = UUID.randomUUID().toString();
            //模拟一條消息
            String msg = "this is test message";
            //把消息存入map
            msgBuffer.put(msgId, msg);
    
            //向下遊bolt發送一條資料,并附帶唯一編号
            spoutOutputCollector.emit(new Values(msg), msgId);
        }
    
        /**
         * 字段聲明
         * @param outputFieldsDeclarer
         */
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("line"));
        }
    
        /**
         * 當spout發送的一條資料被完整處理, storm會調用這個方法
         * @param msgId 消息的唯一編号
         */
        @Override
        public void ack(Object msgId) {
            System.out.println("消息處理成功了, msgId: "+msgId);
            super.ack(msgId);
        }
    
        /**
         * 當spout發送的一條資料被标記為處理失敗, storm會調用這個方法
         * @param msgId
         */
        @Override
        public void fail(Object msgId) {
            System.out.println("消息處理失敗了需要重發, msgId: "+msgId);
            //如果發送資料失敗後,從map中取出資料再次發送
            String msg = msgBuffer.get(msgId);
            spoutOutputCollector.emit(new Values(msg), msgId);
        }
    }
               
  2. 編寫bolt類

    在execute()方法中處理完所有業務邏輯後需要調用ack()方法

    bolt如果産生了新的資料,需要錨定一點,讓新産生的tuple與原有tuple關聯

    public class MyBolt1 extends BaseRichBolt {
        private OutputCollector outputCollector;
        /**
         * 初始化方法
         * @param map
         * @param topologyContext
         * @param outputCollector
         */
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.outputCollector = outputCollector;
        }
    
        /**
         * 資料發送
         * @param tuple
         */
        public void execute(Tuple tuple) {
            //擷取資料
            String line = tuple.getStringByField("line");
            String[] words = line.split(" ");
            for (String word : words) {
                //将新産生的tuple與原有tuple關聯
                outputCollector.emit(tuple, new Values(word));
            }
            //bolt對資料完成處理後發出信号
            outputCollector.ack(tuple);
    
            //測試消息處理失敗
            //outputCollector.fail(tuple);
        }
    
        /**
         * 字段聲明
         * @param outputFieldsDeclarer
         */
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word"));
        }
    }
               
    第二個bolt類
    public class MyBolt2 extends BaseRichBolt {
        private OutputCollector outputCollector;
    
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.outputCollector = outputCollector;
        }
    
        public void execute(Tuple tuple) {
            //列印出資料
            System.out.println(tuple.getStringByField("word"));
    		//bolt對資料完成處理後發出信号
            outputCollector.ack(tuple);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
    }
               
  3. 編寫驅動類
    public class MyTopology {
        public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
            //通過TopologyBuilder 封裝任務資訊
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            //設定spout擷取資料
            //SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint):參數:id, spout對象, 線程數量
            topologyBuilder.setSpout("MySpout", new MySpout(), 1);
            //設定splitbolt 對句子進行切割
            topologyBuilder.setBolt("MyBolt1", new MyBolt1(), 1).shuffleGrouping("MySpout");
            //設定wordcountbolt 對單詞進行統計
            topologyBuilder.setBolt("MyBolt2", new MyBolt2(), 1).shuffleGrouping("MyBolt1");
    
            //準備一個配置檔案
            Config config = new Config();
    
            //本地模式
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("wordcount", config, topologyBuilder.createTopology());
        }
    }
               

通過手動開啟ack機制方式總結:

1.對spout代碼進行修改

1)繼承BaseRichSpout,重寫ack()方法和fail()方法

2)在nextTuple()方法發送資料時綁定msgId(msgId要保證唯一)

2.對bolt代碼修改

1)在execute()方法中處理完所有業務邏輯後需要調用ack()方法outputCollector.ack(tuple);

2)bolt如果産生了新的資料,需要錨定一點,讓新産生的tuple與原有tuple關聯outputCollector.emit(tuple, new Values(word));

3.如果想測試失敗的情況就在execute()方法中調用fail()方法outputCollector.fail(tuple);

方式二: 通過繼承BaseBasicBolt類實作

繼承BaseBasicBolt後就不需要定義錨點和調用ack()方法了

修改方式一代碼中的兩個Bolt類(spout類和驅動類與方式一相同)

MyBolt1 :

public class MyBolt1 extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        //擷取資料
        String line = tuple.getStringByField("line");
        String[] words = line.split(" ");
        for (String word : words) {
            //發送資料
            basicOutputCollector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //聲明字段
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}
           

MyBolt12:

public class MyBolt2 extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        //列印出資料
        System.out.println(tuple.getStringByField("word"));
        
        //失敗測時
        //throw new FailedException("失敗測試");
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}
           

通過繼承BaseBasicBolt類實作ack機制總結:

bolt類繼承了BaseBasicBolt 就不需要手動添加錨點和調用方法發出成功處理聲明

如果想測試失敗的情況就抛出在bolt類中throw new FailedException(“失敗測試”);

繼續閱讀