Storm的ack原理
通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作。比如在Meta中,成功被处理,即可更新偏移量,当失败时,重复发送数据。
因此,通过Ack机制,很容易做到保证所有数据均被处理,一条都不漏。
另外需要注意的,当spout触发fail动作时,不会自动重发失败的tuple,需要spout自己重新获取数据,手动重新再发送一次
ack机制即, spout发送的每一条消息,
- 在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理
- 在规定的时间内,没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,
- 或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作
另外Ack机制还常用于限流作用: 为了避免spout发送数据太快,而bolt处理太慢,常常设置pending数,当spout有等于或超过pending数的tuple没有收到ack或fail响应时,跳过执行nextTuple,从而限制spout发送数据。
通过conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);设置spout pend数。
这个timeout时间可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来设定。Timeout的默认时长为30秒
实现过程
torm 系统中有一组叫做"acker"的特殊的任务,它们负责跟踪DAG(有向无环图)中的每个消息。
acker任务保存了spout id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker就知道消息处理完成时该通知哪个spout任务。第二个值是一个64bit的数字,我们称之为"ack val", 它是树中所有消息的随机id的异或计算结果。
<TaskId,<RootId,ackValue>>
Spoutid,<系统生成的id,ackValue>
Task-0,64bit,0
ack val表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的消息id发送过来做异或。 每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被完全处理了

开启ack机制
spout 在发送数据的时候带上msgid
设置acker数至少大于0;Config.setNumAckers(conf, ackerParal);
在bolt中完成处理tuple时,执行OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);
推荐使用IBasicBolt, 因为IBasicBolt 自动封装了OutputCollector.ack(tuple), 处理失败时,请抛出FailedException,则自动执行OutputCollector.fail(tuple)
关闭Ack机制
spout发送数据是不带上msgid
或者设置acker数等于0
代码操作
方式一: 手动实现
-
编写spout类
实现ack机制需要spout类重写ack()方法和fail()方法
还要在nextTuple()方法向外发送数据时绑定一个唯一id
当spout发送的一条数据被完整处理, storm会调用ack()方法
当spout发送的一条数据被标记为处理失败, storm会调用fail()方法
public class MySpout extends BaseRichSpout { private SpoutOutputCollector spoutOutputCollector; //创建一个map用来保存数据 private Map<String, String> msgBuffer = new HashMap<String, String>(); /** * 初始化方法 * @param map * @param topologyContext * @param spoutOutputCollector */ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } /** * 数据发送 */ public void nextTuple() { //生成一个唯一编号 String msgId = UUID.randomUUID().toString(); //模拟一条消息 String msg = "this is test message"; //把消息存入map msgBuffer.put(msgId, msg); //向下游bolt发送一条数据,并附带唯一编号 spoutOutputCollector.emit(new Values(msg), msgId); } /** * 字段声明 * @param outputFieldsDeclarer */ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); } /** * 当spout发送的一条数据被完整处理, storm会调用这个方法 * @param msgId 消息的唯一编号 */ @Override public void ack(Object msgId) { System.out.println("消息处理成功了, msgId: "+msgId); super.ack(msgId); } /** * 当spout发送的一条数据被标记为处理失败, storm会调用这个方法 * @param msgId */ @Override public void fail(Object msgId) { System.out.println("消息处理失败了需要重发, msgId: "+msgId); //如果发送数据失败后,从map中取出数据再次发送 String msg = msgBuffer.get(msgId); spoutOutputCollector.emit(new Values(msg), msgId); } }
-
编写bolt类
在execute()方法中处理完所有业务逻辑后需要调用ack()方法
bolt如果产生了新的数据,需要锚定一点,让新产生的tuple与原有tuple关联
第二个bolt类public class MyBolt1 extends BaseRichBolt { private OutputCollector outputCollector; /** * 初始化方法 * @param map * @param topologyContext * @param outputCollector */ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } /** * 数据发送 * @param tuple */ public void execute(Tuple tuple) { //获取数据 String line = tuple.getStringByField("line"); String[] words = line.split(" "); for (String word : words) { //将新产生的tuple与原有tuple关联 outputCollector.emit(tuple, new Values(word)); } //bolt对数据完成处理后发出信号 outputCollector.ack(tuple); //测试消息处理失败 //outputCollector.fail(tuple); } /** * 字段声明 * @param outputFieldsDeclarer */ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
public class MyBolt2 extends BaseRichBolt { private OutputCollector outputCollector; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } public void execute(Tuple tuple) { //打印出数据 System.out.println(tuple.getStringByField("word")); //bolt对数据完成处理后发出信号 outputCollector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
- 编写驱动类
public class MyTopology { public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { //通过TopologyBuilder 封装任务信息 TopologyBuilder topologyBuilder = new TopologyBuilder(); //设置spout获取数据 //SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint):参数:id, spout对象, 线程数量 topologyBuilder.setSpout("MySpout", new MySpout(), 1); //设置splitbolt 对句子进行切割 topologyBuilder.setBolt("MyBolt1", new MyBolt1(), 1).shuffleGrouping("MySpout"); //设置wordcountbolt 对单词进行统计 topologyBuilder.setBolt("MyBolt2", new MyBolt2(), 1).shuffleGrouping("MyBolt1"); //准备一个配置文件 Config config = new Config(); //本地模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordcount", config, topologyBuilder.createTopology()); } }
通过手动开启ack机制方式总结:
1.对spout代码进行修改
1)继承BaseRichSpout,重写ack()方法和fail()方法
2)在nextTuple()方法发送数据时绑定msgId(msgId要保证唯一)
2.对bolt代码修改
1)在execute()方法中处理完所有业务逻辑后需要调用ack()方法outputCollector.ack(tuple);
2)bolt如果产生了新的数据,需要锚定一点,让新产生的tuple与原有tuple关联outputCollector.emit(tuple, new Values(word));
3.如果想测试失败的情况就在execute()方法中调用fail()方法outputCollector.fail(tuple);
方式二: 通过继承BaseBasicBolt类实现
继承BaseBasicBolt后就不需要定义锚点和调用ack()方法了
修改方式一代码中的两个Bolt类(spout类和驱动类与方式一相同)
MyBolt1 :
public class MyBolt1 extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//获取数据
String line = tuple.getStringByField("line");
String[] words = line.split(" ");
for (String word : words) {
//发送数据
basicOutputCollector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//声明字段
outputFieldsDeclarer.declare(new Fields("word"));
}
}
MyBolt12:
public class MyBolt2 extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//打印出数据
System.out.println(tuple.getStringByField("word"));
//失败测时
//throw new FailedException("失败测试");
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
通过继承BaseBasicBolt类实现ack机制总结:
bolt类继承了BaseBasicBolt 就不需要手动添加锚点和调用方法发出成功处理声明
如果想测试失败的情况就抛出在bolt类中throw new FailedException(“失败测试”);