天天看点

Storm的ack原理和使用

Storm的ack原理

通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作。比如在Meta中,成功被处理,即可更新偏移量,当失败时,重复发送数据。

因此,通过Ack机制,很容易做到保证所有数据均被处理,一条都不漏。

另外需要注意的,当spout触发fail动作时,不会自动重发失败的tuple,需要spout自己重新获取数据,手动重新再发送一次

ack机制即, spout发送的每一条消息,

  • 在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理
  • 在规定的时间内,没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,
  • 或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作

另外Ack机制还常用于限流作用: 为了避免spout发送数据太快,而bolt处理太慢,常常设置pending数,当spout有等于或超过pending数的tuple没有收到ack或fail响应时,跳过执行nextTuple,从而限制spout发送数据。

通过conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);设置spout pend数。

这个timeout时间可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来设定。Timeout的默认时长为30秒

实现过程

torm 系统中有一组叫做"acker"的特殊的任务,它们负责跟踪DAG(有向无环图)中的每个消息。

acker任务保存了spout id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker就知道消息处理完成时该通知哪个spout任务。第二个值是一个64bit的数字,我们称之为"ack val", 它是树中所有消息的随机id的异或计算结果。

<TaskId,<RootId,ackValue>>

Spoutid,<系统生成的id,ackValue>

Task-0,64bit,0

ack val表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的消息id发送过来做异或。 每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被完全处理了

Storm的ack原理和使用

开启ack机制

spout 在发送数据的时候带上msgid

设置acker数至少大于0;Config.setNumAckers(conf, ackerParal);

在bolt中完成处理tuple时,执行OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);

推荐使用IBasicBolt, 因为IBasicBolt 自动封装了OutputCollector.ack(tuple), 处理失败时,请抛出FailedException,则自动执行OutputCollector.fail(tuple)

关闭Ack机制

spout发送数据是不带上msgid

或者设置acker数等于0

代码操作

方式一: 手动实现

  1. 编写spout类

    实现ack机制需要spout类重写ack()方法和fail()方法

    还要在nextTuple()方法向外发送数据时绑定一个唯一id

    当spout发送的一条数据被完整处理, storm会调用ack()方法

    当spout发送的一条数据被标记为处理失败, storm会调用fail()方法

    public class MySpout extends BaseRichSpout {
        private SpoutOutputCollector spoutOutputCollector;
        //创建一个map用来保存数据
        private Map<String, String> msgBuffer = new HashMap<String, String>();
    
        /**
         * 初始化方法
         * @param map
         * @param topologyContext
         * @param spoutOutputCollector
         */
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }
    
        /**
         * 数据发送
         */
        public void nextTuple() {
            //生成一个唯一编号
            String msgId = UUID.randomUUID().toString();
            //模拟一条消息
            String msg = "this is test message";
            //把消息存入map
            msgBuffer.put(msgId, msg);
    
            //向下游bolt发送一条数据,并附带唯一编号
            spoutOutputCollector.emit(new Values(msg), msgId);
        }
    
        /**
         * 字段声明
         * @param outputFieldsDeclarer
         */
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("line"));
        }
    
        /**
         * 当spout发送的一条数据被完整处理, storm会调用这个方法
         * @param msgId 消息的唯一编号
         */
        @Override
        public void ack(Object msgId) {
            System.out.println("消息处理成功了, msgId: "+msgId);
            super.ack(msgId);
        }
    
        /**
         * 当spout发送的一条数据被标记为处理失败, storm会调用这个方法
         * @param msgId
         */
        @Override
        public void fail(Object msgId) {
            System.out.println("消息处理失败了需要重发, msgId: "+msgId);
            //如果发送数据失败后,从map中取出数据再次发送
            String msg = msgBuffer.get(msgId);
            spoutOutputCollector.emit(new Values(msg), msgId);
        }
    }
               
  2. 编写bolt类

    在execute()方法中处理完所有业务逻辑后需要调用ack()方法

    bolt如果产生了新的数据,需要锚定一点,让新产生的tuple与原有tuple关联

    public class MyBolt1 extends BaseRichBolt {
        private OutputCollector outputCollector;
        /**
         * 初始化方法
         * @param map
         * @param topologyContext
         * @param outputCollector
         */
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.outputCollector = outputCollector;
        }
    
        /**
         * 数据发送
         * @param tuple
         */
        public void execute(Tuple tuple) {
            //获取数据
            String line = tuple.getStringByField("line");
            String[] words = line.split(" ");
            for (String word : words) {
                //将新产生的tuple与原有tuple关联
                outputCollector.emit(tuple, new Values(word));
            }
            //bolt对数据完成处理后发出信号
            outputCollector.ack(tuple);
    
            //测试消息处理失败
            //outputCollector.fail(tuple);
        }
    
        /**
         * 字段声明
         * @param outputFieldsDeclarer
         */
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word"));
        }
    }
               
    第二个bolt类
    public class MyBolt2 extends BaseRichBolt {
        private OutputCollector outputCollector;
    
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.outputCollector = outputCollector;
        }
    
        public void execute(Tuple tuple) {
            //打印出数据
            System.out.println(tuple.getStringByField("word"));
    		//bolt对数据完成处理后发出信号
            outputCollector.ack(tuple);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
    }
               
  3. 编写驱动类
    public class MyTopology {
        public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
            //通过TopologyBuilder 封装任务信息
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            //设置spout获取数据
            //SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint):参数:id, spout对象, 线程数量
            topologyBuilder.setSpout("MySpout", new MySpout(), 1);
            //设置splitbolt 对句子进行切割
            topologyBuilder.setBolt("MyBolt1", new MyBolt1(), 1).shuffleGrouping("MySpout");
            //设置wordcountbolt 对单词进行统计
            topologyBuilder.setBolt("MyBolt2", new MyBolt2(), 1).shuffleGrouping("MyBolt1");
    
            //准备一个配置文件
            Config config = new Config();
    
            //本地模式
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("wordcount", config, topologyBuilder.createTopology());
        }
    }
               

通过手动开启ack机制方式总结:

1.对spout代码进行修改

1)继承BaseRichSpout,重写ack()方法和fail()方法

2)在nextTuple()方法发送数据时绑定msgId(msgId要保证唯一)

2.对bolt代码修改

1)在execute()方法中处理完所有业务逻辑后需要调用ack()方法outputCollector.ack(tuple);

2)bolt如果产生了新的数据,需要锚定一点,让新产生的tuple与原有tuple关联outputCollector.emit(tuple, new Values(word));

3.如果想测试失败的情况就在execute()方法中调用fail()方法outputCollector.fail(tuple);

方式二: 通过继承BaseBasicBolt类实现

继承BaseBasicBolt后就不需要定义锚点和调用ack()方法了

修改方式一代码中的两个Bolt类(spout类和驱动类与方式一相同)

MyBolt1 :

public class MyBolt1 extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        //获取数据
        String line = tuple.getStringByField("line");
        String[] words = line.split(" ");
        for (String word : words) {
            //发送数据
            basicOutputCollector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //声明字段
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}
           

MyBolt12:

public class MyBolt2 extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        //打印出数据
        System.out.println(tuple.getStringByField("word"));
        
        //失败测时
        //throw new FailedException("失败测试");
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}
           

通过继承BaseBasicBolt类实现ack机制总结:

bolt类继承了BaseBasicBolt 就不需要手动添加锚点和调用方法发出成功处理声明

如果想测试失败的情况就抛出在bolt类中throw new FailedException(“失败测试”);

继续阅读