Rabbitmq 高级特性 - 死信队列
- 说明:
- 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
-
主要是其他mq没有交换机这个概念,它们就一个死信队列,但是我们的rabbitmq有交换机的概念,所以我们一般在rabbitmq说的死信队列说的都是死信交换机.为什么叫做死信队列,翻译过来又是死信交换机?
- 消息成为死信的三种情况(
):面试可能会问
- 队列消息长度到达限制;
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
- 队列绑定死信交换机:
- 给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
- x-dead-letter-exchange: 给队列设置一个死信交换机的名称
- x-dead-letter-routing-key:给队列设置一个信息交换机的
Routing key
- 给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
- 死信队列步骤(基于RabbitMQ 高级特性 - 消息的可靠性投递):
- 声明正常的队列(
)和交换机(test_queue_dlx
)test_exchange_dlx
<!-- 1. 声明正常的队列(`test_queue_dlx`)和交换机(`test_exchange_dlx`) --> <rabbit:queue id="test_queue_dlx" name="test_queue_dlx"/> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="text.dlx.#" queue="test_queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
- 声明死信队列(
)和死信交换机(queue_dlx
)exchange_dlx
<!-- 2. 声明死信队列(`queue_dlx`)和死信交换机(`exchange_dlx`) --> <rabbit:queue id="queue_dlx" name="queue_dlx"/> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
- 正常队列绑定死信交换机(
)在1的queue里面绑定
- 设置两个参数:
- x-dead-letter-exchange 死信交换机名称
- x-dead-letter-routing-key 发给死信交换机的Routing key
<rabbit:queue id="test_queue_dlx" name="test_queue_dlx"> <!-- 3. 正常队列绑定死信交换机 --> <rabbit:queue-arguments> <!-- 3.1 x-dead-letter-exchange 死信交换机名称 --> <entry key="x-dead-letter-exchange" value="exchange_dlx"/> <!-- 3,2 x-dead-letter-routing-key 发给死信交换机的Routing key --> <entry key="x-dead-letter-routing-key" value="dlx.xixi"/> </rabbit:queue-arguments> </rabbit:queue> ```
- 设置两个参数:
- 声明正常的队列(
- 消息成为死信的情况(
)在1的queue里面绑定
- 队列的过期时间
<!-- 4.1 设置队列的过期时间 ttl--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
- 队列的长度限制
<!-- 4.2 设置队列的长度限制 max-length --> <entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
- 消息拒收(和消费者有关)
- 队列的过期时间
- 测试
- 队列过期时间结果
@Test public void testDlx() { //1. 测试过期时间,死信消息 rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","测试消息 我会死吗? "); }
- 执行之后
- 10秒之后
- 队列长度限制
@Test public void testDlx() { //2. 测试长度限制后,消息死信 for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","测试消息 我会死吗? "); } }
- 推测由于长度的关系,
会保留10条test_queue_dlx
- 另外的10条会到queue_dlx里面(
)加上刚刚1条就变成11条
- 然后由于10秒的过期时间
会变成0,test_queue_dlx
会变成21queue_dlx
- 推测由于长度的关系,
- 消息过期(基于RabbitMQ 高级特性 Consumer Ack)
- 编写监听类
@Component public class DlxListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //1. 接受消息 System.out.println(new String(message.getBody())); //2. 处理业务逻辑 打印一句话代表 System.out.println("处理业务逻辑"); int i = 3/0; //出现错误 //3. 手动签收 channel.basicAck(deliveryTag,true); } catch (Exception e) { System.out.println("出现异常, 拒绝接收"); //4. 拒绝签收,不重回队列 requeue = false channel.basicNack(deliveryTag,true,false); } } }
- 配置监听(
)spring-rabbitmq-consumer.xml
<!-- 定义监听器--> <rabbitmq:listener-container connection-factory="connectionFactory" acknowledge="manual"> <!-- 监听正常的队列--> <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/> </rabbitmq:listener-container>
- 运行
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest { @Test public void test() { while(true) { } } }
- 发送消息
@Test public void testDlx() { //3. 测试消息拒收 rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","测试消息 我会死吗? "); }
- 监听结果(
)乱码需要监听的哪里修改编码格式
- 死信队列(
)21+1
- 队列过期时间结果
- 修复乱码
- 死信队列小结
- 死信交换机和死信队列和普通的没有区别
- 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
- 消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,并且不重回队列;
- 原队列存在消息过期设置,消息到达超时时间未被消费;