Storm的ack原理
通過Ack機制,spout發送出去的每一條消息,都可以确定是被成功處理或失敗處理, 進而可以讓開發者采取動作。比如在Meta中,成功被處理,即可更新偏移量,當失敗時,重複發送資料。
是以,通過Ack機制,很容易做到保證所有資料均被處理,一條都不漏。
另外需要注意的,當spout觸發fail動作時,不會自動重發失敗的tuple,需要spout自己重新擷取資料,手動重新再發送一次
ack機制即, spout發送的每一條消息,
- 在規定的時間内,spout收到Acker的ack響應,即認為該tuple 被後續bolt成功處理
- 在規定的時間内,沒有收到Acker的ack響應tuple,就觸發fail動作,即認為該tuple處理失敗,
- 或者收到Acker發送的fail響應tuple,也認為失敗,觸發fail動作
另外Ack機制還常用于限流作用: 為了避免spout發送資料太快,而bolt處理太慢,常常設定pending數,當spout有等于或超過pending數的tuple沒有收到ack或fail響應時,跳過執行nextTuple,進而限制spout發送資料。
通過conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);設定spout pend數。
這個timeout時間可以通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來設定。Timeout的預設時長為30秒
實作過程
torm 系統中有一組叫做"acker"的特殊的任務,它們負責跟蹤DAG(有向無環圖)中的每個消息。
acker任務儲存了spout id到一對值的映射。第一個值就是spout的任務id,通過這個id,acker就知道消息處理完成時該通知哪個spout任務。第二個值是一個64bit的數字,我們稱之為"ack val", 它是樹中所有消息的随機id的異或計算結果。
<TaskId,<RootId,ackValue>>
Spoutid,<系統生成的id,ackValue>
Task-0,64bit,0
ack val表示了整棵樹的的狀态,無論這棵樹多大,隻需要這個固定大小的數字就可以跟蹤整棵樹。當消息被建立和被應答的時候都會有相同的消息id發送過來做異或。 每當acker發現一棵樹的ack val值為0的時候,它就知道這棵樹已經被完全處理了

開啟ack機制
spout 在發送資料的時候帶上msgid
設定acker數至少大于0;Config.setNumAckers(conf, ackerParal);
在bolt中完成處理tuple時,執行OutputCollector.ack(tuple), 當失敗處理時,執行OutputCollector.fail(tuple);
推薦使用IBasicBolt, 因為IBasicBolt 自動封裝了OutputCollector.ack(tuple), 處理失敗時,請抛出FailedException,則自動執行OutputCollector.fail(tuple)
關閉Ack機制
spout發送資料是不帶上msgid
或者設定acker數等于0
代碼操作
方式一: 手動實作
-
編寫spout類
實作ack機制需要spout類重寫ack()方法和fail()方法
還要在nextTuple()方法向外發送資料時綁定一個唯一id
當spout發送的一條資料被完整處理, storm會調用ack()方法
當spout發送的一條資料被标記為處理失敗, storm會調用fail()方法
public class MySpout extends BaseRichSpout { private SpoutOutputCollector spoutOutputCollector; //建立一個map用來儲存資料 private Map<String, String> msgBuffer = new HashMap<String, String>(); /** * 初始化方法 * @param map * @param topologyContext * @param spoutOutputCollector */ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } /** * 資料發送 */ public void nextTuple() { //生成一個唯一編号 String msgId = UUID.randomUUID().toString(); //模拟一條消息 String msg = "this is test message"; //把消息存入map msgBuffer.put(msgId, msg); //向下遊bolt發送一條資料,并附帶唯一編号 spoutOutputCollector.emit(new Values(msg), msgId); } /** * 字段聲明 * @param outputFieldsDeclarer */ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); } /** * 當spout發送的一條資料被完整處理, storm會調用這個方法 * @param msgId 消息的唯一編号 */ @Override public void ack(Object msgId) { System.out.println("消息處理成功了, msgId: "+msgId); super.ack(msgId); } /** * 當spout發送的一條資料被标記為處理失敗, storm會調用這個方法 * @param msgId */ @Override public void fail(Object msgId) { System.out.println("消息處理失敗了需要重發, msgId: "+msgId); //如果發送資料失敗後,從map中取出資料再次發送 String msg = msgBuffer.get(msgId); spoutOutputCollector.emit(new Values(msg), msgId); } }
-
編寫bolt類
在execute()方法中處理完所有業務邏輯後需要調用ack()方法
bolt如果産生了新的資料,需要錨定一點,讓新産生的tuple與原有tuple關聯
第二個bolt類public class MyBolt1 extends BaseRichBolt { private OutputCollector outputCollector; /** * 初始化方法 * @param map * @param topologyContext * @param outputCollector */ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } /** * 資料發送 * @param tuple */ public void execute(Tuple tuple) { //擷取資料 String line = tuple.getStringByField("line"); String[] words = line.split(" "); for (String word : words) { //将新産生的tuple與原有tuple關聯 outputCollector.emit(tuple, new Values(word)); } //bolt對資料完成處理後發出信号 outputCollector.ack(tuple); //測試消息處理失敗 //outputCollector.fail(tuple); } /** * 字段聲明 * @param outputFieldsDeclarer */ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
public class MyBolt2 extends BaseRichBolt { private OutputCollector outputCollector; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } public void execute(Tuple tuple) { //列印出資料 System.out.println(tuple.getStringByField("word")); //bolt對資料完成處理後發出信号 outputCollector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
- 編寫驅動類
public class MyTopology { public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { //通過TopologyBuilder 封裝任務資訊 TopologyBuilder topologyBuilder = new TopologyBuilder(); //設定spout擷取資料 //SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint):參數:id, spout對象, 線程數量 topologyBuilder.setSpout("MySpout", new MySpout(), 1); //設定splitbolt 對句子進行切割 topologyBuilder.setBolt("MyBolt1", new MyBolt1(), 1).shuffleGrouping("MySpout"); //設定wordcountbolt 對單詞進行統計 topologyBuilder.setBolt("MyBolt2", new MyBolt2(), 1).shuffleGrouping("MyBolt1"); //準備一個配置檔案 Config config = new Config(); //本地模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordcount", config, topologyBuilder.createTopology()); } }
通過手動開啟ack機制方式總結:
1.對spout代碼進行修改
1)繼承BaseRichSpout,重寫ack()方法和fail()方法
2)在nextTuple()方法發送資料時綁定msgId(msgId要保證唯一)
2.對bolt代碼修改
1)在execute()方法中處理完所有業務邏輯後需要調用ack()方法outputCollector.ack(tuple);
2)bolt如果産生了新的資料,需要錨定一點,讓新産生的tuple與原有tuple關聯outputCollector.emit(tuple, new Values(word));
3.如果想測試失敗的情況就在execute()方法中調用fail()方法outputCollector.fail(tuple);
方式二: 通過繼承BaseBasicBolt類實作
繼承BaseBasicBolt後就不需要定義錨點和調用ack()方法了
修改方式一代碼中的兩個Bolt類(spout類和驅動類與方式一相同)
MyBolt1 :
public class MyBolt1 extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//擷取資料
String line = tuple.getStringByField("line");
String[] words = line.split(" ");
for (String word : words) {
//發送資料
basicOutputCollector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//聲明字段
outputFieldsDeclarer.declare(new Fields("word"));
}
}
MyBolt12:
public class MyBolt2 extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//列印出資料
System.out.println(tuple.getStringByField("word"));
//失敗測時
//throw new FailedException("失敗測試");
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
通過繼承BaseBasicBolt類實作ack機制總結:
bolt類繼承了BaseBasicBolt 就不需要手動添加錨點和調用方法發出成功處理聲明
如果想測試失敗的情況就抛出在bolt類中throw new FailedException(“失敗測試”);