天天看点

RabbitMQ 发布订阅-实现延时重试队列

RabbitMQ消息处理失败,我们会让失败消息进入重试队列等待执行,因为在重试队列距离真正执行还需要定义的时间间隔,因此,我们可以将重试队列设置成延时处理。今天参考网上其他人的实现,简单梳理下消息延时重试执行的思路。

消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。在这里我们一步一步实现一个带有失败重试功能的发布订阅组件,使用该组件后可以非常简单的实现消息的发布订阅。

业务背景

结合RabbitMQ的Topic模式和Work Queue模式实现生产方产生消息,消费方按需订阅,消息投递到消费方的队列之后,多个worker同时对消息进行消费

结合RabbitMQ的 Message TTL 和 Dead Letter Exchange 实现消息的延时重试功能

消息达到最大重试次数之后,将其投递到失败队列,等待人工介入处理bug后,重新将其加入队列消费

执行流程图

RabbitMQ 发布订阅-实现延时重试队列

生产者发布消息到主Exchange

主Exchange根据Routing Key将消息分发到对应的消息队列

多个消费者的worker进程同时对队列中的消息进行消费,因此它们之间采用“竞争”的方式来争取消息的消费

消息消费后,不管成功失败,都要返回ACK消费确认消息给队列,避免消息消费确认机制导致重复投递,同时,如果消息处理成功,则结束流程,否则进入重试阶段

如果重试次数小于设定的最大重试次数(默认为3次),则将消息重新投递到Retry Exchange的重试队列

重试队列不需要消费者直接订阅,它会等待消息的有效时间过期之后,重新将消息投递给Dead Letter Exchange,我们在这里将其设置为主Exchange,实现延时后重新投递消息,这样消费者就可以重新消费消息

如果三次以上都是消费失败,则认为消息无法被处理,直接将消息投递给Failed Exchange的Failed Queue,这时候应用可以触发报警机制,以通知相关责任人处理

等待人工介入处理(解决bug)之后,重新将消息投递到主Exchange,这样就可以重新消费了

技术实现:

为了实现消息的延时重试和失败存储,我们需要创建三个Exchange来处理消息。

master 主Exchange,发布消息时发布到该Exchange

master.retry 重试Exchange,消息处理失败时(3次以内),将消息重新投递给该Exchange

master.failed 失败Exchange,超过三次重试失败后,消息投递到该Exchange

所有的Exchange声明(declare)必须使用以下参数

参数

说明

exchange

-

Exchange名称

type

topic

Exchange 类型

passive

false

如果Exchange已经存在,则返回成功,不存在则创建

durable

true

持久化存储Exchange,这里仅仅是Exchange本身持久化,消息和队列需要单独指定其持久化

no-wait

该方法需要应答确认

在RabbitMQ的管理界面中,我们可以看到创建的三个Exchange

RabbitMQ 发布订阅-实现延时重试队列

消息发布时,使用<code>basic_publish</code>方法,参数如下

message

发布的消息对象

master

消息发布到的Exchange

routing-key

路由KEY,用于标识消息类型

mandatory

是否强制路由,指定了该选项后,如果没有订阅该消息,则会返回路由不可达错误

immediate

指定了当消息无法直接路由给消费者时如何处理

发布消息时,对于<code>message</code>对象,其内容使用json编码后的字符串,同时消息进行持久化

消息订阅的实现相对复杂一些,需要完成队列的声明以及队列和Exchange的绑定

对于每一个订阅消息的服务,都必须创建一个该服务对应的队列,将该队列绑定到关注的路由规则,这样之后,消息生产者将消息投递给Exchange之后,就会按照路由规则将消息分发到对应的队列供消费者消费了。

消费服务需要declare三个队列

<code>[queue_name]</code> 队列名称,格式符合 <code>[服务名称]@订阅服务标识</code>

<code>[queue_name]@retry</code> 重试队列

<code>[queue_name]@failed</code> 失败队列

Declare队列时,参数规定规则如下

queue

队列名称

队列不存在则创建,存在则直接成功

队列持久化

exclusive

排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除

auto-delete

当不再使用时,是否自动删除

对于<code>@retry</code>重试队列,需要指定额外参数

查看队列的详细信息,我们可以看到 queueName@retry 队列与其它两个队列的不同

RabbitMQ 发布订阅-实现延时重试队列

Queue

Exchange

[queue_name]

[queue_name]@retry

master.retry

[queue_name]@failed

master.failed

绑定时,需要提供订阅的路由KEY,该路由KEY与消息发布时的路由KEY对应,区别是这里可以使用通配符同时订阅多种类型的消息。

绑定的队列

绑定的Exchange

订阅的消息路由规则

RabbitMQ 发布订阅-实现延时重试队列

使用 <code>basic_consume</code> 对消息进行消费的时候,需要注意下面参数

消费的队列名称

consumer-tag

消费者标识,留空即可

no_local

如果设置了该字段,服务器将不会发布消息到 发布它的客户端

no_ack

需要消费确认应答

排他访问,设置后只允许当前消费者访问该队列

nowait

消费端在消费消息时,需要从消息中获取消息被消费的次数,以此判断该消息处理失败时重试还是发送到失败队列。

在消息发送到重试队列和失败队列时,我们在消息的headers中添加了一个名为<code>x-orig-routing-key</code>的字段,该字段是实现消息重试的关键字段,由于我们的消息需要在不同的Exchange,Queue之间流转,为了避免消息在重新投递到主Exchange时,被所有的消费者队列重新消费,在重试过程中,我们将消息的routing-key修改为队列名称,直接投递给原始消费消息的队列。<code>x-orig-routing-key</code>用于在之后能够重新获取到最开始的routing-key。

这里的重复消费是指 某个消息被两个消费方A和B消费了,其中A消费失败,B成功,这时候,消息由A消费者重新投递到主Exchange后,B消费队列也会获取到该消息,因此就会导致B消费者重复消费已经消费国的消息

本文实现延时重试,使用了三个重试Exchange,Exchange如果订阅特别多的话,Exchange的压力会非常大,因此在非常极端的情况下,消息大批量失败,且消息收发非常快,那么Exchange的性能可能会有问题。

本文是使用发布订阅实现延时重试的消息执行,也会有其他思路。