天天看点

【RocketMQ源码分析】消息幂等

幂等问题

我们知道RocketMQ消息处理成功的标志是消费者消费一条消息后向Broker端发送ACK消息(ack,message back)并且被Broker处理。这个过程中是涉及到网络传输,有网络的地方就存在不确定性,如果由于网络等原因导致ACK丢失,则RocketMQ会触发消息消费重试机制,重新消费该条消息。这里就涉及到了消息幂等的概念。

幂等场景

RocketMQ基于发布、订阅模型,客户端生产者、消费者在与broker网络通信过程中都存在网络的不确定,都有能会导致消息的重复。

  1. 消息投递重复

    生产者发送消息时,消息成功投递到broker,但此时发生网络闪断或者生产者down掉,导致broker发送ACK失败。此时生产者由于未能收到消息发送响应,认为发送失败,因此尝试重新发送消息到broker。当消息发送成功后,在broker中就会存在两条相同内容的消息,最终消费者会拉取到两条内容一样并且Message ID也相同的消息。因此造成了消息的重复。

  2. 消费时重复

    消费消息时同样会出现重复消费的情况。当消费者在处理业务完成返回消费状态给broker时,由于网络闪断等异常情况导致未能将消费完成的CONSUME_SUCCESS状态返回给broker。broker为了保证消息被至少消费一次的语义,会在网络环境恢复之后再次投递该条被处理的消息,最终造成消费者多次收到内容一样并且Message ID也相同的消息,造成了消息的重复。

无论是发送时重复还是消费时重复,最终的效果均为消费者消费时收到了重复的消息,那么我们就知道:只需要在消费者端统一进行幂等处理就能够实现消息幂等。

解决方案

作为一款高性能的消息中间件,RocketMQ能够保证消息不丢失但不保证消息不重复。如果在RocketMQ中实现消息去重实际也是可以的,但是考虑到高可用以及高性能的需求,如果做了服务端的消息去重,RocketMQ就需要对消息做额外的rehash、排序等操作,这会花费较大的时间和空间等资源代价,收益并不明显。RocketMQ考虑到正常情况下出现重复消息的概率其实是很小的,因此RocketMQ将消息幂等操作交给了业务方处理。

常用的幂等解决方案有:

  1. token令牌

    生成全局唯一token令牌,缓存在redis中,利用Redis缓存去重,保证消息只消费一次。

  2. 唯一索引

    在业务中通常是具备唯一业务标识的字符串,如:下单场景使用订单号、支付场景使用支付流水号等。保证数据库只会生成一条记录。比如:下单之后,插入一条老客记录。

  3. 数据库乐观锁

    利用数据库版本号version作为乐观锁,保证更新操作的幂等性。比如:下单送积分,给用户账户加积分

  4. 分布式锁

    如果是涉及到分布式系统,构建全局唯一索引比较困难,例如唯一性的字段没法确定,这时候可以引入分布式锁,保证只有一个能成功。

  5. 状态机

    状态机是天然支持幂等的,在下单支付过程中,涉及到订单多个状态的流转,状态机可以有效保证订单状态变更的幂等性,常用的StateMachine等

幂等的唯一依据应当由消息生产者生成,在发送消息时候,我们能够通过消息的key设置该id,对应的API为 org.apache.rocketmq.common.message.setKeys(String keys) 代码如下:

Message sendMessage = new Message(
"test-topic",
message.getBytes());
sendMessage.setKeys("10045780803");
           

消费者收到消息之后根据该key结合唯一所以索引或者乐观锁去做业务幂等性。

// 默认msgs只有一条消息
for (MessageExt msg : msgs) {
    //userId作为key  
    String key = msg.getKeys();
    //保存新客,userId作为唯一索引
    //insert into user values(key)
}
           

RocketMQ的消息MessageID是不能作为去重依据的,RocketMQ的消息投递过程中有可能会出现重复的MessageID,因此不建议通过MessageID作为处理依据,而应当使用业务唯一标识如:订单号、流水号等作为幂等处理的关键依据。一般实际生产中建议1、2结合3、4一起使用来保证消息消费的幂等性。

继续阅读