了解下死信,就是永遠都不能被消費到的資訊。死信隊列其實就是為了防止消息未消費成功丢失的場景。通過死信隊列存儲這類消息,然後再對死信隊列的消息做處理操作。場景比如訂單支付挂起。
maven配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>SpringbootTest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringbootTest</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
定義兩個消費者,一個01消費正常隊列,一個02消費死信隊列。
public class DeadConsumer01 {
private static final String DEAD_EXCHANGE = "dead-1exchange";
private static final String NORMAL_EXCHANGE = "normal-1exchange";
private static final String DEAD_QUEUE = "dead-1queue";
private static final String NORMAL_QUEUE = "normal-1queue";
@SneakyThrows
public static void main(String[] args) {
Channel channel = MqChanneUtils.getChannel();
//聲明交換機
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
//聲明死信隊列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//聲明正常隊列-需綁定死信交換機
Map<String,Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange",DEAD_EXCHANGE);//設定與之綁定的死信交換機
paramMap.put("x-dead-letter-routing-key","dead");//設定死信隊列的綁定key
paramMap.put("x-max-length",4);//設定隊列消息最大數
paramMap.put("x-message-ttl",10000);//設定隊列中消息過期時間
channel.queueDeclare(NORMAL_QUEUE,false,false,false,paramMap);
//綁定
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
DeliverCallback deliverCallback = (comsumeTag, delivery)->{
String message = new String(delivery.getBody(), "UTF-8");
if (message.equals("消息3")){
channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);
}else {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("consumer01已經收到消息" + message);
}
};
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,comsumeTag->{
});
}
public class DeadConsumer02 {
private static final String DEAD_QUEUE = "dead-1queue";
@SneakyThrows
public static void main(String[] args) {
Channel channel = MqChanneUtils.getChannel();
DeliverCallback deliverCallback = (comsumeTag, delivery)->{
String message = new String(delivery.getBody(),"UTF-8");
System.out.println("consumer02 死信隊列已經收到消息"+message);
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,comsumeTag->{
});
}
}
定義一個生産者
public class Producter01 {
private static final String NORMAL_EXCHANGE = "normal-1exchange";
@SneakyThrows
public static void main(String[] args) {
Channel channel = MqChanneUtils.getChannel();
/* AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().expiration("10000").build();*/
for (int i =0; i < 5;i++) {
String message = "消息";
channel.basicPublish(NORMAL_EXCHANGE, "normal", false, null,(message+i).getBytes(StandardCharsets.UTF_8));
}
}
}
原理就是根據消費者01的定義的一些參數來控制是否進入死信隊列。如它的消息在10s内沒被消費,這就可以用作訂單支付逾時的場景。如它的消息長度超過最大長度限制,這可以控制高并發下的搶單場景。如消費者這邊對于某些特殊消息要做拒絕處理,但又需要留痕到資料庫。都可以用死信隊列在做邏輯。 其實死信隊列跟普通隊列一緻,就是你定義一個普通隊列來解決死信的問題也是OK的,隻不過MQ裡頭把它作為一個參數。定義這個更加使用靈活。