天天看点

RocketMq重试及消息不丢失机制

1、消息重试机制

由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。

RocketMQ为使用者封装了消息重试的处理流程,无需开发人员手动处理。RocketMQ支持了生产端和消费端两类重试机制。

1.1 生产端重试

生产端配置的有发送失败重试次数,默认为2。使用了set方法对外进行暴露,producer客户端可以改写这个默认值。

public DefaultMQProducer(String producerGroup, RPCHook rpcHook) {
		this.createTopicKey = "TBW102";

		this.defaultTopicQueueNums = 4;

		this.sendMsgTimeout = 3000;

		this.compressMsgBodyOverHowmuch = 4096;
		//发送失败,重试次数
		this.retryTimesWhenSendFailed = 2;

		this.retryAnotherBrokerWhenNotStoreOK = false;

		this.maxMessageSize = 131072;

		this.unitMode = false;

		this.producerGroup = producerGroup;
		this.defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
	}
           

1.2 消费端重试

消费者消费消息后,需要给Broker返回消费状态。以MessageListenerConcurrently监听器为例,Consumer消费完成后需要返回ConsumeConcurrentlyStatus并发消费状态。查看源码,ConsumeConcurrentlyStatus是一个枚举,共有两种状态:

public enum ConsumeConcurrentlyStatus {
   //消费成功
   ConsumeConcurrentlyStatus,

   //消费失败,一段时间后重试
   RECONSUME_LATER;
}
           

RECONSUME_LATER代表因为某种原因,消费失败,稍后再试。后续会再次消费

官方文档介绍如下:

RocketMq重试及消息不丢失机制

RocketMQ中的消息无法无限次重新消费,当然了,手动修改重试次数是可以的,不介入的话不行。当重试次数超过所有延迟级别之后。消息会进入死信,死信Topic的命名为:%DLQ% + Consumer组名。

进入死信之后的消息肯定不会再投递了,不过可以通过接口去查询当前RocketMQ中死信队列的消息。如果在上层实现自有命令,那么可以将消息从死信中移出并重新投递。

死信消息具有以下特性:

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

2、保证消息不丢失

分别从Producer发送机制、Broker的持久化机制,以及消费者的offSet机制来最大程度保证消息不易丢失

  • 从Producer的视角来看:如果消息未能正确的存储在MQ中,或者消费者未能正确的消费到这条消息,都是消息丢失。
  • 从Broker的视角来看:如果消息已经存在Broker里面了,如何保证不会丢失呢(宕机、磁盘崩溃)
  • 从Consumer的视角来看:如果消息已经完成持久化了,但是Consumer取了,但是未消费成功且没有反馈,就是消息丢失

从Producer分析:如何确保消息正确的发送到了Broker?

  • 默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功
  • 采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失
  • RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功

从Broker分析:如果确保接收到的消息不会丢失?

  • 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的
  • Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中
  • Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失

从Cunmser分析:如何确保拉取到的消息被成功消费?

  • 消费者可以根据自身的策略批量Pull消息
  • Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标
  • 如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset
  • 如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作
  • 如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的消息到本地

继续阅读