1.業務背景
如果有有錯誤消息,如果手動nack同時将消息放回到隊列中,那麼這條消息會反複消費,留在隊列中 。
如果nack後将消息丢棄,那麼如果碰到網絡抖動,消息也會丢失 。是以 通過建立死信隊列避免消息丢失。
2.實作
檔案目錄如下:
1.原理
我們額外建立一條隊列。當消息進入進入業務隊列後,如果收到nack那麼就将這條消息放入這條條隊列中 。
2.修改pom檔案
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.修改配置檔案
server:
port: 8088
spring:
rabbitmq:
host: 192.168.*.*
port: 5672
username: root
password: root
virtual-host: /
listener:
simple:
acknowledge-mode: manual #手動應答
prefetch: 1 # 每次隻處理一個資訊
publisher-confirms: true #開啟消息确認機制
publisher-returns: true #支援消息發送失敗傳回隊列
4.rabbitmq的配置
@Configuration
public class RabbitMqConfig {
/**
* 連接配接工廠
*/
@Autowired
private ConnectionFactory connectionFactory;
/**
* 定制化amqp模版
*
* ConfirmCallback接口用于實作消息發送到RabbitMQ交換器後接收ack回調 即消息發送到exchange ack
* ReturnCallback接口用于實作消息發送到RabbitMQ 交換器,但無相應隊列與交換器綁定時的回調 即消息發送不到任何一個隊列中 ack
*/
@Bean
public RabbitTemplate rabbitTemplate() {
Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 消息發送失敗傳回到隊列中, yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
// 發送消息确認, yml需要配置 publisher-confirms: true
rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
// 消息傳回, yml需要配置 publisher-returns: true
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String correlationId = message.getMessageProperties().getCorrelationId().toString();
logger.debug("消息:{} 發送失敗, 應答碼:{} 原因:{} 交換機: {} 路由鍵: {}", correlationId, replyCode, replyText, exchange,
routingKey);
});
return rabbitTemplate;
}
/**
* 确認發送消息是否成功(調用util方法)
*
* @return
*/
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack() {
return new MsgSendConfirmCallBack();
}
}
5.util類
發送是否成功的回調方法。
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 回調方法
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("MsgSendConfirmCallBack , 回調id:" + correlationData);
if (ack) {
System.out.println("消息發送成功");
} else {
//可以将消息寫入本地,使用定時任務重新發送
System.out.println("消息發送失敗:" + cause + "\n重新發送");
}
}
}
這裡有一個點,如果想做實作消息失敗重新發送,在注釋處可以實作。需要将消息寫入本地,如果失敗從本地讀取,然後發送,如果成功删除本地資訊。
6.業務隊列(如:訂單業務)
這裡聲明了一個業務隊列 ,關鍵點在于x-dead-letter-exchange,x-dead-letter-routing-key 兩個參數。
@Configuration
public class BusinessConfig {
/**
* 業務1子產品direct交換機的名字
*/
public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
/**
* 業務1 demo業務的隊列名稱
*/
public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
/**
* 業務1 demo業務的routekey
*/
public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
@Bean
public Queue yewu1DemoDeadQueue() {
// 将普通隊列綁定到死信隊列交換機上
Map<String, Object> args = new HashMap<>(2);
args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);
args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);
return new Queue("yewu1_demo_dead_queue", true, false, false, args);
}
/**
* 将消息隊列和交換機進行綁定
*/
@Bean
public Binding binding_one() {
return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())
.with("yewu1_demo_dead_key");
}
}
這裡有一個點如果想持久化消息到磁盤,需要建立隊列時,new Queue将第二個參數輸入為true,但是面對大并發時效率會變低 。
7.死信隊列
這裡聲明死信隊列與綁定關系。
@Configuration
public class DeadConfig {
/**
* 死信隊列
*/
public final static String FAIL_QUEUE_NAME = "fail_queue";
/**
* 死信交換機
*/
public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
/**
* 死信routing
*/
public final static String FAIL_ROUTING_KEY = "fail_routing";
/**
* 建立配置死信隊列
*
*/
@Bean
public Queue deadQueue() {
return new Queue(FAIL_QUEUE_NAME, true, false, false);
}
/**
* 死信交換機
*
* @return
*/
@Bean
public DirectExchange deadExchange() {
DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
return directExchange;
}
/**
* 綁定關系
*
* @return
*/
@Bean
public Binding failBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
}
}
8.生産者消費者
生産者與消費者的代碼實作。
public enum RabbitEnum {
/**
* 處理成功
*/
ACCEPT,
/**
* 可以重試的錯誤
*/
RETRY,
/**
* 無需重試的錯誤
*/
REJECT
@RequestMapping("/sendDirectDead")
String sendDirectDead(@RequestBody String message) throws Exception {
System.out.println("開始生産");
CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",
message, data);
System.out.println("結束生産");
System.out.println("發送id:" + data);
return "OK,sendDirect:" + message;
}
@RabbitListener(queues = "yewu1_demo_dead_queue")
protected void consumerDead(Message message, Channel channel) throws Exception {
RabbitEnum ackSign = RabbitEnum.RETRY;
try {
int i = 10 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
ackSign = RabbitEnum.RETRY;
throw e;
} finally {
// 通過finally塊來保證Ack/Nack會且隻會執行一次
if (ackSign == RabbitEnum.ACCEPT) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if (ackSign == RabbitEnum.RETRY) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
9.實驗
當發送yewu1_demo_dead_queue隊列時,如果抛出異常,會放入死信隊列中。