天天看點

RabbitMQ實作死信隊列

當一個隊列的消息發送出現異常的時候,需要設定一個死信隊列,将失敗的消息放置死信隊列,進行人工幹預

rabbitMQ的配置檔案:

spring:
  rabbitmq:
    virtual-host: /
    addresses: localhost
    username: guest
    password: guest
    port: 5672
  application:
    name: consumer-01
           

rabbitMQ的隊列設定:

package com.example.consumer01.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author liyunxing
 * @package com.example.consumer01.config
 * @code
 * @description:
 * @since 2019/4/25 9:42
 */
@Configuration
public class RabbitConfig {

    //死信交換機
    public static final String X_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
    //死信路由
    public static final String X_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
    //死信隊列
    public static final String DEAD_QUEUE = "dead-queue";

    //普通交換機
    public static final String NORMAL_EXCHANGE = "normal-exchange";
    //普通路由
    public static final String NORMAL_ROUTING_KEY = "normal-routing-key";
    //普通隊列
    public static final String NORMAL_QUEUE = "normal-queue";

    /**
     * 建立死信隊列
     */
    @Bean
    public Queue getDeadQueue(){
        return new Queue(DEAD_QUEUE);
    }
    //建立死信交換機
    @Bean
    public Exchange getDeadExchange(){
        return ExchangeBuilder.directExchange(X_DEAD_LETTER_EXCHANGE).durable(true).build();
    }
    //隊列與延時交換機進行綁定
    @Bean
    public Binding bindDead(){
        return BindingBuilder.bind(getDeadQueue()).to(getDeadExchange()).with(X_DEAD_LETTER_ROUTING_KEY).noargs();
    }

    //建立普通隊列
    @Bean
    public Queue getNormalQueue(){
        Map args = new HashMap();
        //當消息發送異常的時候,消息需要路由到的交換機和routing-key,這裡配的直接是發送至死信隊列
        args.put("x-dead-letter-exchange",X_DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",X_DEAD_LETTER_ROUTING_KEY);
        //建立隊列的時候,将死信綁定到隊列中
        return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();
    }
    //建立普通交換機
    @Bean
    public Exchange getNormalExchange(){
        return ExchangeBuilder.directExchange(NORMAL_EXCHANGE).durable(true).build();
    }
    //普通隊列與普通交換機進行綁定
    @Bean
    public Binding bindNormal(){
        return BindingBuilder.bind(getNormalQueue()).to(getNormalExchange()).with(NORMAL_ROUTING_KEY).noargs();
    }
}
           

RabbitMQ的監聽:

package com.example.consumer01.rabbit;

import com.example.consumer01.config.RabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author liyunxing
 * @package com.example.consumer01.rabbit
 * @code
 * @description:
 * @since 2019/4/25 9:43
 */
@Component
public class Consumer01 {

    //監聽死信隊列
    @RabbitListener(queues = {RabbitConfig.DEAD_QUEUE})
    public void receiver(String msg){
        System.out.println("dead queue 收到消息>>>>>>>>>"+msg);
    }
}

           

RabbitMQ的生産者:

package com.example.consumer01;

import com.example.consumer01.config.RabbitConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class Consumer01ApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void contextLoads() {
        String msg = "hello world";
        /**
         * 發送給普通對隊列,設定五秒之後過期
         * 但是并沒有實作消費的監聽,是以該消息将在五秒之後過期
         */
        rabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE,RabbitConfig.NORMAL_ROUTING_KEY,msg,message -> {
            message.getMessageProperties().setExpiration("5000");
            return message;
        });
    }
}
           

繼續閱讀