文章目錄
1. 什麼是幂等性?1.1 消息隊列的幂等性1.2 模拟重試機制1.2.1 生産者代碼1.2.2 消費者代碼1.2.3 消費者 application.yml 配置2. 如何保證消息幂等性,不被重複消費?解決方法
1. 什麼是幂等性?
在程式設計中一個幂等操作的特點是其任意多次執行所産生的影響均與一次執行的影響相同。
HTTP方法的幂等性是指一次和多次請求某一個資源應該具有同樣的副作用。幂等性屬于語義範疇,正如編譯器隻能幫助檢查文法錯誤一樣,HTTP規範也沒有辦法通過消息格式等文法手段來定義它。
簡之:一個請求,不管重複來多少次,結果是不會改變的。
1.1 消息隊列的幂等性
如同HTTP方法的幂等性,消息隊列同樣會出現幂等性問題。
消費者在消費 MQ 中的消息時,MQ 已把消息發送給消費者,消費者在給 MQ 傳回 ack 時網絡中斷,故 MQ 未收到确認資訊,該條消息會重新發給其他的消費者,或者在網絡重連後再次發送給該消費者,但實際上該消費者已成功消費了該條消息,造成消費者消費了重複的消息;注意,RabbitMQ 這種消息重試(補償)機制是預設的。
是以,MQ 消費者的幂等性問題,主要在于 MQ 的重試機制,因為網絡原因或用戶端延遲消費導緻重複消費。
那麼,如何合适選擇重試機制?我們來看兩種情況。
情況1: 消費者擷取到消息後,調用第三方接口,但接口暫時無法通路,是否需要重試?
需要重試
情況2: 消費者擷取到消息後,抛出資料轉換異常,是否需要重試?
不需要重試
總結:對于情況2,如果消費者代碼抛出異常是需要釋出新版本才能解決的問題,那麼不需要重試,重試也無濟于事。應該采用日志記錄+定時任務 job 健康檢查+人工進行補償
1.2 模拟重試機制
我們采用一種短信消費者用戶端異常的情況來模拟 RabbitMQ 的重試機制。
@RabbitListener(queues = "fanout_sms_queue")
public void process(String msg) {
System.out.println("短信消費者擷取生産者消息msg:" + msg);
int i = 1/0;
}
如上代碼,很顯然會報錯,一擔報錯生産者的消息時不會被消費的?
@RabbitListener 底層使用 AOP 進行異常通知攔截,如果程式沒有抛出異常資訊,那麼就會自動送出事務;如果 AOP 異常通知攔截有捕獲到異常資訊的話,就會自動實作重試(補償)機制,同時,這個補償機制的消息會緩存到 RabbitMQ 伺服器端進行存放,一直重試到不抛出異常為止。
1.2.1 生産者代碼
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 發送消息
*
* @param queueName 隊列名稱
*/
public void send(String queueName) {
String msg = "my_fanout_msg:" + System.currentTimeMillis();
Message message = MessageBuilder
.withBody(msg.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID() + "")
.build();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, message);
}
}
1.2.2 消費者代碼
@Component
public class FanoutEamilConsumer {
@RabbitListener(queues = "fanout_eamil_queue")
public void process(Message message) throws Exception {
String revMessage = Thread.currentThread().getName()
+ ",郵件消費者擷取生産者消息msg:"
+ new String(message.getBody(), "UTF-8")
+ ",messageId:" + message.getMessageProperties().getMessageId();
System.out.println(revMessage);
}
}
1.2.3 消費者 application.yml 配置
spring:
rabbitmq:
####連接配接位址
host: 127.0.0.1
####端口号
port: 5672
####賬号
username: guest
####密碼
password: guest
### 位址
virtual-host: /admin_host
listener:
simple:
retry:
####開啟消費者重試
enabled: true
####最大重試次數
max-attempts: 5
####重試間隔次數
initial-interval: 3000
server:
port: 8081
我們通過 RabbitMQ 配置,增加了 RabbitMQ 重試時間以及重試次數限制,在一定程度上解決了重複消費的問題,接下來看一道常問的面試題。
2. 如何保證消息幂等性,不被重複消費?
其實,這個問題也算是 MQ 面試當中經常考察的一點,因為無論是什麼 MQ 都會有這個問題。
首先通過上邊我們了解了什麼是“幂等性”,以及 MQ 幂等性問題的産生,是以我們要清楚為什麼會出現重複性消費?在什麼場景會出現重複消費?
解決方法
使用全局 MessageID 判斷消費方使用同一個,解決幂等性問題。
或者使用業務邏輯保證唯一(比如訂單号碼)
生産者關鍵代碼:
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 發送消息
*
* @param queueName 隊列名稱
*/
public void send(String queueName) {
String msg = "my_fanout_msg:" + System.currentTimeMillis();
Message message = MessageBuilder
.withBody(msg.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID() + "")
.build();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, message);
}
如上,生産者在發送消息時(convertAndSend),給消息對象設定了唯一的 MessageID,隻有該 MessageID 沒有被消費者标記方能在重試機制中再次被消費。
消費者關鍵代碼:
@RabbitListener(queues = "fanout_eamil_queue")
public void process(Message message) throws Exception {
String revMessage = Thread.currentThread().getName()
+ ",郵件消費者擷取生産者消息msg:"
+ new String(message.getBody(), "UTF-8")
+ ",messageId:" + message.getMessageProperties().getMessageId();
System.out.println(revMessage);
發送郵件的邏輯XXX
}
如上,通過 message.getMessageProperties().getMessageId() 擷取 MessageID,擷取的 MessageID 可以用來判斷是否已經被消費者消費過了,如果已經消費則取消再次消費。
通常怎麼判斷呢?
比如上方是一個郵件發送的消費者,在做補償時,假如上一步郵件發送成功了,我們會把該 ID 存至 redis中,下次再調用時,先去 redis 判斷是否存在該 ID 了,如果存在表明已經消費過了則直接傳回,不再消費,否則消費,然後将記錄存至 redis。
我建立了一個java相關的公衆号,用來記錄自己的學習之路,感興趣的小夥伴可以關注一下微信公衆号哈:niceyoo
