天天看點

rabbitmq進階特性(死信隊列)

死信隊列

消息成為死信的三種情況:

  1. 隊列消息長度到達限制,超過的消息成為死信;
  2. 消費者拒絕消費消息,basicNack,并且不把消息重新放入原目标隊列,requeue=false;
  3. 原隊列存在消息過期設定,消息到達逾時時間未被消費;
    rabbitmq進階特性(死信隊列)

Demo

生産者

1.maven依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
           

2.yml配置檔案

server:
  port: 8003

spring:
  rabbitmq:
    host: 192.168.56.101
    port: 5672
    virtual-host: /ems
    username: admin
    password: admin

           

3.隊列配置類

/**
 * @author xiaozikang
 * @date 2020/10/13 21:10
 * @Email:[email protected]
 */

@Configuration
public class RabbitmqConfig {

    public static final String DXL_EXCHANGE = "dxlExchange";
    public static final String DXL_QUEUE = "dxlQueue";
    public static final String TOPIC_EXCHANG = "topicExchange";
    public static final String TOPIC_QUEUE = "topicQueue";

    // 死信交換機
    @Bean("dxlExchange")
    public Exchange dxlExchange(){
        return ExchangeBuilder.topicExchange(DXL_EXCHANGE).durable(true).build();
    }

    @Bean("topicExchange")
    public Exchange topicExchange(){
        return ExchangeBuilder.topicExchange(TOPIC_EXCHANG).durable(true).build();
    }

    // 死信隊列
    @Bean("dxlQueue")
    public Queue dxlQueue(){
        return QueueBuilder.durable(DXL_QUEUE).build();
    }

    @Bean("topicQueue")
    public Queue topicQueue(){
        // 1.正常隊列綁定死信交換機x-dead-letter-exchange
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("x-dead-letter-exchange",DXL_EXCHANGE);
        // 2.發送給死信交換機的routingkey
        hashMap.put("x-dead-letter-routing-key","dlx.xiao");
        // 3.設定隊列的過期時間ttl
        hashMap.put("x-message-ttl",50000);
        // 4.設定隊列的長度限制 max-length,最大能存儲多少條消息
        hashMap.put("x-max-length",10);
        return QueueBuilder.durable(TOPIC_QUEUE).withArguments(hashMap).build();
    }

    @Bean("dxlBinding")
    public Binding dxlBinding(
            @Qualifier("dxlQueue") Queue dxlQueue,
            @Qualifier("dxlExchange") Exchange dxlExchange
    ){
        return BindingBuilder.bind(dxlQueue).to(dxlExchange).with("dlx.#").noargs();
    }

    @Bean("topicBinding")
    public Binding topicBinding(
            @Qualifier("topicQueue") Queue topicQueue,
            @Qualifier("topicExchange") Exchange topicExchange
    ){
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#").noargs();
    }

}

           

4.發送消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqProducerDlxApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void contextLoads() {
        // 測試消息過期後成為死信
        rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANG,"topic.xiao","topic隊列");
    }

    @Test
    public void test02(){
        // 測試消息超過隊列長度限制後成為死信
        for (int i = 0; i < 11; i++) {
            rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANG,"topic.xiao","topic隊列");
        }
    }

    @Test
    public void test03(){
        // 測試消息被消費者拒收後不放入原隊列中成為死信
        rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANG,"topic.xiao","topic隊列");
    }

}

           

消費者

搭建工程同生産者一樣(略)

/**
 * @author xiaozikang
 * @date 2020/10/14 17:21
 * @Email:[email protected]
 */

@Component
public class ConsumerDlx {

    @RabbitListener(queues = "topicQueue")
    public void process(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("處理業務");
            int a = 1/0;
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag,true,false);
        }
    }

}

           

結果

rabbitmq進階特性(死信隊列)