天天看点

RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

 源码dmeo在文章末尾获取👇🏻

1. SpringAMQP则允许配置三种确认模式 

RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

1. manual:手动ack,需要在业务代码结束后,调用api发送ack。

2. auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

3.  none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

首先声明队列交换机 

@Configuration
public class CommonConfig {

    @Bean
    public DirectExchange simpleDirect() {
        return new DirectExchange("simple.direct", true, false);
    }

    @Bean
    public Queue simpleQueue() {
        return QueueBuilder.durable("simple.queue").build();
    }
}
           

创建一个会报错的消息接收方 ,模拟消费者报错

@RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        log.info("消费者接收到simple.queue的消息:【" + msg + "】");
        //该代码会报错异常, 数值溢出
        System.out.println(1 / 0);
        System.out.println("消费者处理消息成功!");
    }
           

 配置好上述这些代码, 我们就可以启动消费者服务器了, 然后去mq客户端查看创建的队列和交换机

RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

1.2 none

在我们配置成none的时候, 很明显此消息投递后立即被删除

RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

 1.3 aoto(推荐使用)

当换成aoto模式的时候, 代码出现异常消息, mq会进行重新投递, 但是重新投递会一直无限重试

RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 
RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

为了解决这种无限重试的问题, spring提供了retry(重试)机制, 使用这个机制我们就可以在消费者报错抛异常的时候, 利用本地的重试来解决这个问题, 如下所示. 

RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

 现在我们配置了失败等待的时间为1秒, 等待时间的倍数为2 , 重试次数最大为3 , 这样一来我们就会收到三次重试的信息

第一次和第二次的时间相差一秒(失败时间(1秒) x 失败倍数1 = 1秒), 

第二次和第三次的时间相差两秒(失败时间(1秒) x 失败倍数2 = 2秒), 

最后在到达最大重试次数就会停止重试

RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

但是这种方法的弊端就是当重试次数达到最大耗尽后, 消息回直接被丢弃

 这里我们采用一种解决方案就是RepublishMessageRecoverer失败之后将消息投递到一个指定的队列当中

1. 创建RepublishMessageRecoverer 交换机和队列

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("e");
    }

    //失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "e");
    }
}
           

 创建成功后启动服务器, 在rabbitmq客户端查看创建的队列和交换机

RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

 并向simple.queue队列发送消息

RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

 这时重试的次数已经达到最大, MQ就转发到失败的队列当中去了

Republishing failed message to exchange 'error.direct' with routing key e
RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

 我们再查看失败的消息error.queue的信息, 这里直接展示了我们控制台中栈错误信息, 其中错误原因就是 代码1/0的问题

RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 
RabbitMQ消费者确认消息入门演示 源码dmeo在文章末尾获取👇🏻1. SpringAMQP则允许配置三种确认模式 

 链接:https://pan.baidu.com/s/1il41ywFnYM4_q3MU9GN_MQ 

提取码:heng