最新項目有一個場景,送出訂單後,有一個15分鐘的支付逾時時間,有同僚提出 直接一個定時任務輕松搞定, 但是不優雅.那麼怎麼優雅的解決這個問題呢? 團隊的大佬,我源哥建議使用 死信隊列,但是項目還沒有實踐,我這個 小菜雞就不能抄作業了,那就隻能放大招啦(百度),成功搞定.
先來完整說一下項目的流程,有一個15分鐘的支付逾時,還有一個24小時的訂單服務逾時,需要寫一個監聽 處理逾時的消息,如果消費失敗,需要将消息 加入異常隊列,人工處理 下面就直接上代碼
package com.ikang.common;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableRabbit
public class RabbitmqConfig {
/**
* 設定 消息手動ack
* @param connectionFactory
* @return
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
/****************** 死信隊列配置 begin ************************/
@Value("${mqConfig.ddx.queueNameProcess}")
String queue_process_name; // 最終消費隊列的 name
@Value("${mqConfig.ddx.queueName24h}")
String queue_ttl_name_24h; // 24小時隊列 name
@Value("${mqConfig.ddx.queueName15m}")
String queue_ttl_name_15m; // 15分鐘隊列 name
@Value("${mqConfig.ddx.delayExchangeName}")
String delay_exchange_name; // 轉發交換機
@Value("${mqConfig.ddx.queueRoutKey15m}")
String queue_routing_key_15m; // key.15m
@Value("${mqConfig.ddx.queueRoutKey24h}")
String queue_routing_key_24h; // key.24h
@Value("${mqConfig.ddx.queueRoutKey}")
String delay_routing_key; // key.process 最終消費隊列的key
@Value("${mqConfig.ddx.ttlExchangeName}")
String ttl_exchange_name; // 入口交換機
/**
* 死信交換機
* @return
*/
@Bean
public DirectExchange delayExchange(){
DirectExchange directExchange = new DirectExchange(delay_exchange_name,true,false);
return directExchange;
}
/**
* 延遲交換機
* @return
*/
@Bean
public DirectExchange TTLExchange(){
DirectExchange directExchange = new DirectExchange(ttl_exchange_name,true,false);
return directExchange;
}
/**
* 實際消費隊列
* @return
*/
@Bean
public Queue delayProcessQueue() {
return new Queue(queue_process_name,true,false,false);
}
/**
* 15分鐘 延遲隊列
* @return
*/
@Bean
public Queue delayTTLQueue15m() {
Map<String,Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange",delay_exchange_name);
paramMap.put("x-dead-letter-routing-key",delay_routing_key);
paramMap.put("x-message-ttl",900000);//15分鐘
return new Queue(queue_ttl_name_15m,true,false,false,paramMap);
}
/**
* 24小時 延遲隊列
* @return
*/
@Bean
public Queue delayTTLQueue24h() {
Map<String,Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange",delay_exchange_name);
paramMap.put("x-dead-letter-routing-key",delay_routing_key);
paramMap.put("x-message-ttl",1800000);//24小時
return new Queue(queue_ttl_name_24h,true,false,false,paramMap);
}
/**
* 轉發交換機 綁定 最終消費的隊列
* @return
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(delay_routing_key);
}
/**
* 綁定延遲隊列 15m
* @return
*/
@Bean
public Binding queueTTLBinding() {
return BindingBuilder.bind(delayTTLQueue15m()).to(TTLExchange()).with(queue_routing_key_15m);
}
/**
* 綁定延遲隊列 24h
* @return
*/
@Bean
public Binding queueTTLBinding2() {
return BindingBuilder.bind(delayTTLQueue24h()).to(TTLExchange()).with(queue_routing_key_24h);
}
/****************** 死信隊列配置 end ************************/
/****************** 異常消息投放隊列 begin ************************/
@Value("${mqConfig.exchangeNameException}")
String exchange_exception;
@Value("${mqConfig.queueNameException}")
String queue_exception;
@Value("${mqConfig.routeKeyException}")
String routekey_exception;
@Bean
public Queue queueException() {
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
return new Queue(queue_exception, durable, exclusive, autoDelete);
}
@Bean
public DirectExchange exchangeException() {
boolean durable = true;
boolean autoDelete = false;
return new DirectExchange(exchange_exception, durable, autoDelete);
}
@Bean
public Binding bindingException() {
return BindingBuilder.bind(queueException())
.to(exchangeException())
.with(routekey_exception);
}
/****************** 異常消息投放隊列 end ************************/
}
上面的是死性隊列的配置相關資訊,基本就是三步, 1.聲明隊列,配置相關屬性 2.聲明交換機,配置相關屬性 3. 配對,把他倆綁定在一起. 就是這麼簡單
然後 生産消息,消費消息
@Slf4j
@Service
public class RabbitmqService {
@Autowired
RabbitTemplate rabbitTemplate;
@Value("${mqConfig.ddx.ttlExchangeName}")
String ttl_exchange_name;
@Value("${mqConfig.exchangeNameException}")
String exchange_name_Exception;
@Value("${mqConfig.routeKeyException}")
String routkey_Exception;
@Value("${mqConfig.ddx.queueRoutKey24h}")
String queue_routing_key_24h; // key.24h
@Value("${mqConfig.ddx.queueRoutKey15m}")
String queue_routing_key_15m; // key.15m
/**
*
* @param transfer
*/
public void produceMsg(RabbitTransfer transfer) {
String transStr = JSON.toJSONString(transfer);
rabbitTemplate.convertAndSend(ttl_exchange_name,transfer.getRoutingKey(),transStr,new CorrelationData(UUID.randomUUID().toString()));
}
/**
* 消息處理失敗,加入異常隊列,人工處理
* @param transfer
*/
public void produceExceptionMsg(RabbitTransfer transfer){
String transStr = JSON.toJSONString(transfer);
rabbitTemplate.convertAndSend(exchange_name_Exception,routkey_Exception,transStr,new CorrelationData(UUID.randomUUID().toString()));
}
/**
* 處理步驟:
* 1. 問診訂單完成,im推送訂單id
* 2. 根據orderId 整理資料,給監管平台發送訂單資訊
* @param orderId
* @param channel
* @param tag
*/
@RabbitListener(queues = "${mqConfig.inqOrderQueue}")
public void receiveIm2Hosp(String orderId, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
// 增加try catch,防止消費失敗後,一直循環消費
try{
inqOrderMsgService.receive(orderId);
}catch (Exception ex){
ex.printStackTrace();
log.info("發送 消息異常 郵件成功!");*/
}
channel.basicAck(tag, false);
}
/**
* 處理步驟:
* @param msg
* @param channel
* @param tag
*/
@RabbitListener(queues = "${mqConfig.ddx.queueNameProcess}")
@SneakyThrows
public void process(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
//15分鐘 24小時邏輯
channel.basicAck(tag, false);
}
}
消費失敗,加入異常隊列,其實還可以更優雅一些, 可以在消費逾時消息的隊列上,再做一個綁定,綁定一個異常隊列,這樣 不要消費失敗的時候,手動将消費發送到異常隊列了. 這裡就是實作了
這樣一套 死信隊列就ok了,是不是 很簡單, 這個隻是使用,rabbitMQ還是很強大的,推薦一個資料,rabbitMQ實戰指南,很實用