4.1 釋出确認原理
生産者将信道設定成 confirm 模式,一旦信道進入 confirm 模式,所有在該信道上面釋出的消息都将會被指派一個唯一的 ID(從 1 開始),一旦消息被投遞到所有比對的隊列之後,broker 就會發送一個确認給生産者(包含消息的唯一 ID),這就使得生産者知道消息已經正确到達目的隊列了,如果消息和隊列是可持久化的,那麼确認消息會在将消息寫入磁盤之後發出,broker 回傳給生産者的确認消息中 delivery-tag 域包含了确認消息的序列号,此外 broker 也可以設定 basic.ack 的 multiple 域,表示到這個序列号之前的所有消息都已經得到了處理。
confirm 模式最大的好處在于他是異步的,一旦釋出一條消息,生産者應用程式就可以在等信道傳回确認的同時繼續發送下一條消息,當消息最終得到确認之後,生産者應用便可以通過回調方法來處理該确認消息,如果 RabbitMQ 因為自身内部錯誤導緻消息丢失,就會發送一條 nack 消息,生産者應用程式同樣可以在回調方法中處理該 nack 消息。
4.2 釋出确認的政策
4.2.1 開啟釋出确認的方法
釋出确認預設是沒有開啟的,如果要開啟需要調用方法 confirmSelect,每當你要想使用釋出确認,都需要在 channel 上調用該方法。
Channel channel = connection.createChannel();
channel.confirmSelect();
4.2.2 單個确認釋出
這是一種簡單的确認方式,它是一種同步确認釋出的方式,也就是釋出一個消息之後隻有它被确認釋出,後續的消息才能繼續釋出,waitForConfirmsOrDie(long)這個方法隻有在消息被确認的時候才傳回,如果在指定時間範圍内這個消息沒有被确認那麼它将抛出異常。
這種确認方式有一個最大的缺點就是:釋出速度特别的慢,因為如果沒有确認釋出的消息就會阻塞所有後續消息的釋出,這種方式最多提供每秒不超過數百條釋出消息的吞吐量。當然對于某些應用程式來說這可能已經足夠了。
public static void publishMessageIndividually() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開啟釋出确認
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服務端傳回 false 或逾時時間内未傳回,生産者可以消息重發
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息發送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("釋出" + MESSAGE_COUNT + "個單獨确認消息,耗時" + (end - begin) +
"ms");
}
}
4.2.3 批量确認模式
上面那種方式非常慢,與單個等待确認消息相比,先釋出一批消息然後一起确認可以極大地提高吞吐量,當然這種方式的缺點就是:當發生故障導緻釋出出現問題時,不知道是哪個消息出現問題了,我們必須将整個批處理儲存在記憶體中,以記錄重要的資訊而後重新釋出消息。當然這種方案仍然是同步的,也一樣阻塞消息的釋出。
public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開啟釋出确認
channel.confirmSelect();
//批量确認消息大小
int batchSize = 100;
//未确認消息個數
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//為了確定還有剩餘沒有确認消息 再次确認
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("釋出" + MESSAGE_COUNT + "個批量确認消息,耗時" + (end - begin) +
"ms");
}
}
4.2.4 異步确認釋出
異步确認雖然程式設計邏輯比上兩個要複雜,但是成本效益最高,無論是可靠性還是效率都沒得說, 他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功, 下面就讓我們來詳細講解異步确認是怎麼實作的。
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開啟釋出确認
channel.confirmSelect();
/**
* 線程安全有序的一個哈希表,适用于高并發的情況
* 1.輕松的将序号與消息進行關聯
* 2.輕松批量删除條目 隻要給到序列号
* 3.支援并發通路
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/**
* 确認收到消息的一個回調
* 1.消息序列号
* 2.true 可以确認小于等于目前序列号的消息
* false 确認目前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//傳回的是小于等于目前序列号的未确認消息 是一個 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除該部分未确認消息
confirmed.clear();
} else {
//隻清除目前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("釋出的消息" + message + "未被确認,序列号" + sequenceNumber);
};
/**
* 添加一個異步确認的監聽器
* 1.确認收到消息的回調
* 2.未收到消息的回調
*/
channel.addConfirmListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()擷取下一個消息的序列号
* 通過序列号與消息體進行一個關聯
* 全部都是未确認的消息體
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("釋出" + MESSAGE_COUNT + "個異步确認消息,耗時" + (end - begin) +
"ms");
}
}
4.2.5 如何處理異步未确認消息
最好的解決的解決方案就是把未确認的消息放到一個基于記憶體的能被釋出線程通路的隊列, 比如說用 ConcurrentLinkedQueue 這個隊列在 confirm callbacks 與釋出線程之間進行消息的傳遞。
4.2.6 以上三種釋出确認速度對比
單獨釋出消息:同步等待确認,簡單,但吞吐量非常有限。
批量釋出消息:批量同步等待确認,簡單,合理的吞吐量,一旦出現問題但很難推斷出是哪條消息出現了問題。
異步處理: 最佳性能和資源使用,在出現錯誤的情況下可以很好地控制,但是實作起來稍微難些。
public static void main(String[] args) throws Exception {
//1.單個确認
publishMessageIndividually();
//2.批量确認
publishMessageBatch();
//3.異步确認釋出
publishMessageAsync();
}
釋出1000個單獨确認消息,耗時12033ms
釋出1000個批量确認消息,耗時180ms
釋出1000個異步确認消息,耗時43ms