天天看点

rabbitmq高级特性(死信队列)

死信队列

消息成为死信的三种情况:

  1. 队列消息长度到达限制,超过的消息成为死信;
  2. 消费者拒绝消费消息,basicNack,并且不把消息重新放入原目标队列,requeue=false;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费;
    rabbitmq高级特性(死信队列)

Demo

生产者

1.maven依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
           

2.yml配置文件

server:
  port: 8003

spring:
  rabbitmq:
    host: 192.168.56.101
    port: 5672
    virtual-host: /ems
    username: admin
    password: admin

           

3.队列配置类

/**
 * @author xiaozikang
 * @date 2020/10/13 21:10
 * @Email:[email protected]
 */

@Configuration
public class RabbitmqConfig {

    public static final String DXL_EXCHANGE = "dxlExchange";
    public static final String DXL_QUEUE = "dxlQueue";
    public static final String TOPIC_EXCHANG = "topicExchange";
    public static final String TOPIC_QUEUE = "topicQueue";

    // 死信交换机
    @Bean("dxlExchange")
    public Exchange dxlExchange(){
        return ExchangeBuilder.topicExchange(DXL_EXCHANGE).durable(true).build();
    }

    @Bean("topicExchange")
    public Exchange topicExchange(){
        return ExchangeBuilder.topicExchange(TOPIC_EXCHANG).durable(true).build();
    }

    // 死信队列
    @Bean("dxlQueue")
    public Queue dxlQueue(){
        return QueueBuilder.durable(DXL_QUEUE).build();
    }

    @Bean("topicQueue")
    public Queue topicQueue(){
        // 1.正常队列绑定死信交换机x-dead-letter-exchange
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("x-dead-letter-exchange",DXL_EXCHANGE);
        // 2.发送给死信交换机的routingkey
        hashMap.put("x-dead-letter-routing-key","dlx.xiao");
        // 3.设置队列的过期时间ttl
        hashMap.put("x-message-ttl",50000);
        // 4.设置队列的长度限制 max-length,最大能存储多少条消息
        hashMap.put("x-max-length",10);
        return QueueBuilder.durable(TOPIC_QUEUE).withArguments(hashMap).build();
    }

    @Bean("dxlBinding")
    public Binding dxlBinding(
            @Qualifier("dxlQueue") Queue dxlQueue,
            @Qualifier("dxlExchange") Exchange dxlExchange
    ){
        return BindingBuilder.bind(dxlQueue).to(dxlExchange).with("dlx.#").noargs();
    }

    @Bean("topicBinding")
    public Binding topicBinding(
            @Qualifier("topicQueue") Queue topicQueue,
            @Qualifier("topicExchange") Exchange topicExchange
    ){
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#").noargs();
    }

}

           

4.发送消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqProducerDlxApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void contextLoads() {
        // 测试消息过期后成为死信
        rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANG,"topic.xiao","topic队列");
    }

    @Test
    public void test02(){
        // 测试消息超过队列长度限制后成为死信
        for (int i = 0; i < 11; i++) {
            rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANG,"topic.xiao","topic队列");
        }
    }

    @Test
    public void test03(){
        // 测试消息被消费者拒收后不放入原队列中成为死信
        rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANG,"topic.xiao","topic队列");
    }

}

           

消费者

搭建工程同生产者一样(略)

/**
 * @author xiaozikang
 * @date 2020/10/14 17:21
 * @Email:[email protected]
 */

@Component
public class ConsumerDlx {

    @RabbitListener(queues = "topicQueue")
    public void process(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理业务");
            int a = 1/0;
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag,true,false);
        }
    }

}

           

结果

rabbitmq高级特性(死信队列)