
2021-11-21 21:28:19
37
分類專欄:
RabbitMQ企業級入門文章标簽:
後端 rabbitmq版權
專欄收錄該内容
6 篇文章 1 訂閱
訂閱專欄
1.背景
通過上文學習知道了死信隊列,如果隻是網絡抖動,出現異常那麼直接進入死信隊列,那麼是不合理的。這就可以使用延時重試隊列,本文将介紹如何實作延時重試隊列。
2.原理
圖是俺在網上找的,請原作者諒解。
- 發送到業務隊裡 如果正常收到 正常運作
- 如果處理失敗 重試 并投入延時隊列 如果超過延時時間 重新投入業務隊列
- 如果重試次數大于3 那麼進入死信隊列
3.代碼實作
1.業務隊列
這裡聲明業務隊列與綁定關系。
@Configuration
public class BusinessConfig {
/**
* yewu1子產品direct交換機的名字
*/
public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
/**
* demo業務的隊列名稱
*/
public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
/**
* demo業務的routekey
*/
public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
/**
* 業務交換機交換機(一個項目一個業務交換機即可)
* 1.定義direct exchange,綁定queueTest
* 2.durable="true" rabbitmq重新開機的時候不需要建立新的交換機
* 3.direct交換器相對來說比較簡單,比對規則為:如果路由鍵比對,消息就被投送到相關的隊列
* fanout交換器中沒有路由鍵的概念,他會把消息發送到所有綁定在此交換器上面的隊列中。
* topic交換器你采用模糊比對路由鍵的原則進行轉發消息到隊列中
*/
@Bean
public DirectExchange yewu1Exchange() {
DirectExchange directExchange = new DirectExchange(YEWU1_EXCHANGE, true, false);
return directExchange;
}
/**
* 建立隊列(一個業務需要一個隊列一個routekey 命名格式 項目名-業務名)
* 1.隊列名稱
* 2.durable="true" 持久化 rabbitmq重新開機的時候不需要建立新的隊列
* 3.exclusive 表示該消息隊列是否隻在目前connection生效,預設是false
* 4.auto-delete 表示消息隊列沒有在使用時将被自動删除 預設是false
* 5.對nack或者發送逾時的 發送給死信隊列 args是綁定死信隊列
*
*/
@Bean
public Queue yewu1DemoQueue() {
return new Queue(YEWU1_DEMO_QUEUE, true, false, false);
}
/**
* 交換機與routekey綁定
*
* @return
*/
@Bean
public Binding yewu1DemoBinding() {
return BindingBuilder.bind(yewu1DemoQueue()).to(yewu1Exchange())
.with(YEWU1_DEMO_ROUTINGKEY);
}
}
2.延時隊列
聲明延時隊列與綁定關系。
3.死信隊列
聲明私信隊列與綁定關系。
@Configuration
public class DeadConfig {
/**
* 死信隊列
*/
public final static String FAIL_QUEUE_NAME = "fail_queue";
/**
* 死信交換機
*/
public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
/**
* 死信routing
*/
public final static String FAIL_ROUTING_KEY = "fail_routing";
/**
* 建立配置死信隊列
*
*/
@Bean
public Queue deadQueue() {
return new Queue(FAIL_QUEUE_NAME, true, false, false);
}
/**
* 死信交換機
*
* @return
*/
@Bean
public DirectExchange deadExchange() {
DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
return directExchange;
}
/**
* 綁定關系
*
* @return
*/
@Bean
public Binding failBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
}
}
4.生産者
生産者如上文,通用代碼。
@RestController
@RequestMapping("/TestRabbit")
public class ProducerDemo {
@Resource
private RabbitTemplate rabbitTemplate;
//@RequestMapping("/sendDirect")
String sendDirect(@RequestBody String message) throws Exception {
System.out.println("開始生産");
CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, BusinessConfig.YEWU1_DEMO_ROUTINGKEY,
message, data);
System.out.println("結束生産");
System.out.println("發送id:" + data);
return "OK,sendDirect:" + message;
}
}
5.消費者
大量的邏輯,請參考注釋。
public enum RabbitEnum {
/**
* 處理成功
*/
ACCEPT,
/**
* 可以重試的錯誤
*/
RETRY,
/**
* 無需重試的錯誤
*/
REJECT
@Component
public class ConsumerDemo {
private final static Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
@Resource
private RabbitTemplate rabbitTemplate;
// @RabbitListener(queues = "yewu1_demo_queue")
protected void consumer(Message message, Channel channel) throws Exception {
RabbitEnum ackSign = RabbitEnum.RETRY;
System.out.println(message.getMessageProperties().getCorrelationId());
try {
// 可以加入重複消費判斷
int i = 1 / 0;
} catch (Exception e) {
ackSign = RabbitEnum.RETRY;
throw e;
} finally {
// 通過finally塊來保證Ack/Nack會且隻會執行一次
if (ackSign == RabbitEnum.ACCEPT) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if (ackSign == RabbitEnum.RETRY) {
String correlationData =
(String)message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
System.out.println(message.getMessageProperties().getCorrelationId());
long retryCount = getRetryCount(message.getMessageProperties());
if (retryCount >= 3) {
// 重試次數超過3次,則将消息發送到失敗隊列等待特定消費者處理或者人工處理
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
rabbitTemplate.convertAndSend(DeadConfig.FAIL_EXCHANGE_NAME, DeadConfig.FAIL_ROUTING_KEY,
message, new CorrelationData(correlationData));
logger.info("連續失敗三次,将消息發送到死信隊列,發送消息:" + new String(message.getBody()));
} catch (Exception e1) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
logger.error("發送死信隊列報錯:" + e1.getMessage() + ",原始消息:" + new String(message.getBody()));
}
} else {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重試次數不超過3次,則将消息發送到重試隊列等待重新被消費
rabbitTemplate.convertAndSend(RetryConfig.YEWU1_RETRY_EXCHANGE_NAME,
RetryConfig.YEWU1_DEMO_RETRY_ROUTING_KEY, message,
new CorrelationData(correlationData));
logger.info("消費失敗,消息發送到重試隊列;" + "原始消息:" + new String(message.getBody()) + ";第"
+ (retryCount + 1) + "次重試");
} catch (Exception e1) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
logger.error("消息發送到重試隊列的時候,異常了:" + e1.getMessage() + ",重新發送消息");
}
}
}
}
}
/**
* 擷取消息被重試的次數
*/
public long getRetryCount(MessageProperties messageProperties) {
Long retryCount = 0L;
if (null != messageProperties) {
List<Map<String, ?>> deaths = messageProperties.getXDeathHeader();
if (deaths != null && deaths.size() > 0) {
Map<String, Object> death = (Map<String, Object>)deaths.get(0);
retryCount = (Long)death.get("count");
}
}
return retryCount;
}
}
參考: https://www.cnblogs.com/mfrank/p/11260355.html