前幾篇文章主要介紹了Producer的mandatory參數,備份隊列和TTL的内容,這篇文章繼續介紹Producer端的開發,主要包括釋出方确認和事務機制。
釋出方确認
消息持久化機制可以保證應伺服器出現異常導緻消息丢失的問題,但是Producer将消息發送出去,并不知道消息是否正确到達服務端并持久化。如果未到達服務端,或者到達服務端未持久化到磁盤,消息就丢失,那麼問題仍然存在。如下圖,第一步或者第二步出現問題,消息依然會丢失。
RabbitMQ為解決這個問題,提供了釋出方确認(Publisher Confirm)機制。
生産者将Channel設定成confirm模式,所有在該channel上釋出的消息都會被指派一個唯一的ID(序号從1開始),消息被路由到隊列之後,RabbitMQ就會發送一個确認(ack,包含此消息的ID)給生産者,這樣生産者就知道消息已經正确發送到RabbitMQ了,如果消息是持久化的,RabbitMQ會等到消息落盤再回複。RabbitMQ回複的消息包含了deliveryTag,表示确認消息的序号,還包含multiple,表示到這個消息序号之前所有的消息都已得到處理了。
下面通過代碼示範釋出方确認的使用。
channel.confirmSelect();
String message = "test ack";
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 如果沒有調用comfirmSelect方法開啟,直接調用waitFormConfirms
// 會報java.lang.IllegalStateException
channel.waitForConfirms();
此方式也是串行同步等待的方式,生産者發送消息之後,會被阻塞,直到RabbitMQ接收到消息傳回(對于持久化消息,等待消息落盤後才傳回),生産者接收到ack才會下一條消息的處理,這顯然會有性能問題。
解決方案有兩種:
- 批量發送确認
- 異步确認
批量發送
批量發送方法,用戶端程式需要定量或者定時調用waitFormConfirms方法等待RabbitMQ的确認傳回,相對于上面的方式,性能有極大的提升。
下面是批量發送的代碼實作。
channel.confirmSelect();
int MsgCount = 0;
while (true) {
channel.basicPublish("exchange", "routingKey", null, "batch confirm test".getBytes());
//将發送出去的消息存入緩存中,緩存可以是一個ArrayList 或者BlockingQueue 之類的
if (++MsgCount >= BATCH_COUNT) {
MsgCount = 0;
try {
if (channel.waitForConfirms()) {
//将緩存中的消息清空
}
//将緩存中的消息重新發送
} catch (InterruptedException e) {
e.printStackTrace();
//将緩存中的消息重新發送
}
}
}
但也有問題,當同一批次中出現消息被nack或者逾時,需要用戶端程式處理并重試,這有可能導緻消息重複。
異步發送
異步方案是推薦使用的方式,它的優點初了性能優良之外,還有隻需要在回調方法nack中處理沒有被RabbitMQ成功處理的消息,SpringAMQP中也是這種方案。
實作的時候,隻需要添加ConfirmListener接口的實作,它主要有兩個方法:handleAck和handleNack。下面是代碼實作,是SpringAMQP的精簡版(關于SpringAMQP的細節,小夥伴們可以關注RabbitMQ系列文章後續的更新)。
// 維護消息序号和消息,在回調函數中做相應處理
// SpringAMQP也是這種方案,隻不過這裡簡化了
ConcurrentSkipListMap<Long, String> unconfirmMap = new ConcurrentSkipListMap<>();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag , boolean multiple) throws IOException {
System.out.println("Nack, SeqNo : " + deliveryTag + ", multiple : " + multiple);
if (multiple) {
confirmMap.headMap(deliveryTag - 1).clear();
} else {
confirmMap.remove(deliveryTag);
}
}
// 處理發送失敗的場景,嘗試重發
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("handleNack : " + deliveryTag + " " + multiple);
// 注意:為防止消息一直失敗,導緻死循環,可以在消息上加屬性x-retries,每次重發前,先判斷已經發送的次數,達到門檻值,不再發送
if(multiple){
ConcurrentNavigableMap<Long, String> headMap = unconfirmMap.headMap(deliveryTag + 1);
Set<Map.Entry<Long, String>> entrySet = headMap.entrySet();
Iterator<Map.Entry<Long, String>> iterator = entrySet.iterator();
while(iterator.hasNext()){
String removed = iterator.next().getValue();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, removed.getBytes());
}
} else {
String removed = unconfirmMap.remove(deliveryTag);
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, removed.getBytes());
}
}
});
//模拟一直發送消息
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes ());
confirmSet.add(nextSeqNo);
}
事務機制
事務機制是解決發送端無法感覺消息是否正确達到服務端的另外一種方案。事務的使用非常簡單,先直接上代碼感受下。
String message = "tx message";
try{
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.txCommit();
} catch(Exception e){
e.printStackTrace();
channel.txRollback();
}
使用txSelect方法開啟事務,隻有消息成功被Rabbit接收,事務才會送出,如果發生任何異常,消息都會被復原。
使用事務的缺點就是性能問題,因為發送一條消息之後,會阻塞發送端,直到Rabbit把消息持久化到磁盤,才會傳回響應給發送端,之後發送端才能繼續發送下一條。是以推薦使用Publisher Confirm方案。
好了,以上Producer使用Publisher Confirm和事務發送消息使用了。
RabbitMQ系列文章會陸續更新,歡迎各位小夥伴關注後面的技術分享。