死信隊列
消息成為死信的三種情況:
- 隊列消息長度到達限制,超過的消息成為死信;
- 消費者拒絕消費消息,basicNack,并且不把消息重新放入原目标隊列,requeue=false;
- 原隊列存在消息過期設定,消息到達逾時時間未被消費;
Demo
生産者
1.maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2.yml配置檔案
server:
port: 8003
spring:
rabbitmq:
host: 192.168.56.101
port: 5672
virtual-host: /ems
username: admin
password: admin
3.隊列配置類
/**
* @author xiaozikang
* @date 2020/10/13 21:10
* @Email:[email protected]
*/
@Configuration
public class RabbitmqConfig {
public static final String DXL_EXCHANGE = "dxlExchange";
public static final String DXL_QUEUE = "dxlQueue";
public static final String TOPIC_EXCHANG = "topicExchange";
public static final String TOPIC_QUEUE = "topicQueue";
// 死信交換機
@Bean("dxlExchange")
public Exchange dxlExchange(){
return ExchangeBuilder.topicExchange(DXL_EXCHANGE).durable(true).build();
}
@Bean("topicExchange")
public Exchange topicExchange(){
return ExchangeBuilder.topicExchange(TOPIC_EXCHANG).durable(true).build();
}
// 死信隊列
@Bean("dxlQueue")
public Queue dxlQueue(){
return QueueBuilder.durable(DXL_QUEUE).build();
}
@Bean("topicQueue")
public Queue topicQueue(){
// 1.正常隊列綁定死信交換機x-dead-letter-exchange
HashMap<String, Object> hashMap = new HashMap<>();
hashMap.put("x-dead-letter-exchange",DXL_EXCHANGE);
// 2.發送給死信交換機的routingkey
hashMap.put("x-dead-letter-routing-key","dlx.xiao");
// 3.設定隊列的過期時間ttl
hashMap.put("x-message-ttl",50000);
// 4.設定隊列的長度限制 max-length,最大能存儲多少條消息
hashMap.put("x-max-length",10);
return QueueBuilder.durable(TOPIC_QUEUE).withArguments(hashMap).build();
}
@Bean("dxlBinding")
public Binding dxlBinding(
@Qualifier("dxlQueue") Queue dxlQueue,
@Qualifier("dxlExchange") Exchange dxlExchange
){
return BindingBuilder.bind(dxlQueue).to(dxlExchange).with("dlx.#").noargs();
}
@Bean("topicBinding")
public Binding topicBinding(
@Qualifier("topicQueue") Queue topicQueue,
@Qualifier("topicExchange") Exchange topicExchange
){
return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#").noargs();
}
}
4.發送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqProducerDlxApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
// 測試消息過期後成為死信
rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANG,"topic.xiao","topic隊列");
}
@Test
public void test02(){
// 測試消息超過隊列長度限制後成為死信
for (int i = 0; i < 11; i++) {
rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANG,"topic.xiao","topic隊列");
}
}
@Test
public void test03(){
// 測試消息被消費者拒收後不放入原隊列中成為死信
rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANG,"topic.xiao","topic隊列");
}
}
消費者
搭建工程同生産者一樣(略)
/**
* @author xiaozikang
* @date 2020/10/14 17:21
* @Email:[email protected]
*/
@Component
public class ConsumerDlx {
@RabbitListener(queues = "topicQueue")
public void process(String msg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("處理業務");
int a = 1/0;
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
channel.basicNack(deliveryTag,true,false);
}
}
}