天天看點

rabbitMQ死信隊列實戰

了解下死信,就是永遠都不能被消費到的資訊。死信隊列其實就是為了防止消息未消費成功丢失的場景。通過死信隊列存儲這類消息,然後再對死信隊列的消息做處理操作。場景比如訂單支付挂起。

maven配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>SpringbootTest</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>SpringbootTest</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
           

定義兩個消費者,一個01消費正常隊列,一個02消費死信隊列。

public class DeadConsumer01 {
    private static final String DEAD_EXCHANGE = "dead-1exchange";
    private static final String NORMAL_EXCHANGE = "normal-1exchange";
    private static final String DEAD_QUEUE = "dead-1queue";
    private static final String NORMAL_QUEUE = "normal-1queue";
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = MqChanneUtils.getChannel();
        //聲明交換機
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        //聲明死信隊列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //聲明正常隊列-需綁定死信交換機
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",DEAD_EXCHANGE);//設定與之綁定的死信交換機
        paramMap.put("x-dead-letter-routing-key","dead");//設定死信隊列的綁定key
        paramMap.put("x-max-length",4);//設定隊列消息最大數
        paramMap.put("x-message-ttl",10000);//設定隊列中消息過期時間
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,paramMap);
        //綁定
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
        DeliverCallback deliverCallback = (comsumeTag, delivery)->{
            String message = new String(delivery.getBody(), "UTF-8");
            if (message.equals("消息3")){
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);
            }else {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                System.out.println("consumer01已經收到消息" + message);
            }
        };
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,comsumeTag->{
        });
    }
           
public class DeadConsumer02 {
    private static final String DEAD_QUEUE = "dead-1queue";
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = MqChanneUtils.getChannel();

        DeliverCallback deliverCallback = (comsumeTag, delivery)->{
            String message = new String(delivery.getBody(),"UTF-8");
            System.out.println("consumer02 死信隊列已經收到消息"+message);
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,comsumeTag->{
        });
    }
}
           

定義一個生産者

public class Producter01 {
    private static final String NORMAL_EXCHANGE = "normal-1exchange";

    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = MqChanneUtils.getChannel();
     /*   AMQP.BasicProperties properties = new
                AMQP.BasicProperties().builder().expiration("10000").build();*/
        for (int i =0; i < 5;i++) {
            String message = "消息";
            channel.basicPublish(NORMAL_EXCHANGE, "normal", false, null,(message+i).getBytes(StandardCharsets.UTF_8));
        }
    }
}
           

 原理就是根據消費者01的定義的一些參數來控制是否進入死信隊列。如它的消息在10s内沒被消費,這就可以用作訂單支付逾時的場景。如它的消息長度超過最大長度限制,這可以控制高并發下的搶單場景。如消費者這邊對于某些特殊消息要做拒絕處理,但又需要留痕到資料庫。都可以用死信隊列在做邏輯。  其實死信隊列跟普通隊列一緻,就是你定義一個普通隊列來解決死信的問題也是OK的,隻不過MQ裡頭把它作為一個參數。定義這個更加使用靈活。