文章目錄
- RabbitMQ 學習(七)----釋出确認
- (1)開啟釋出确認的方法
- (2)單個釋出确認
- (3)批量确認釋出
- (4)異步釋出确認
RabbitMQ 學習(七)----釋出确認
釋出确認是一個保證RabbitMQ 可靠性的一個機制
保證生産者将資訊成功的發送到 RabbitMQ的 server端了,那麼broker就會回一個确認,如果沒有收到或者收到拒絕資訊,那麼說明可能網絡不好沒有發送成功,server端當機了,broker拒絕接收等情況,如果不進行後續處理,那麼資訊就會丢失,生産者收到失敗的消息使用回調函數在進行處理。
生産者将信道設定成 confirm 模式,所有在該信道上釋出的消息都會指定一個唯一的ID,一旦消息投遞到隊列中,就是發送成功了,broker會立刻發送一個确認ack 給生産者,這個時候,生産者就知道消息已經發送成功了。
如果隊列和資訊是持久化的,那麼确認消息會在将消息寫入磁盤之後再發出,broker傳回的确認包含 确認消息的序列号,還可以設定 multiple,表示此序号前的所有消息都得到了處理。
一旦釋出消息,生産者等待确認的同時繼續發送下一條消息,如果rabbitMq自身内部錯誤導緻消息為發送成功,生産者就可以再回調方法中繼續處理。
為了保證RabbitMQ的可靠性,生産者怎麼做?
1、設定要求隊列持久化
2、設定隊列中的消息持久化
3、釋出确認,保證寫入磁盤,broker成功收到
(1)開啟釋出确認的方法
channel 的cofirm模式預設是沒有開啟的,如果需要開啟需要調用 confirmSelect(),當我們使用釋出确認的時候。需要使用channel調用該方法。
Channel channel =connection.createChannel();
channel.confirmSelect();
(2)單個釋出确認
這是一種同步确認釋出的方式,就是釋出一個消息之後等待确認後,後續的消息才能繼續釋出。
waitForConfiemOrDie(long)
這個方法隻有當消息被确認才會傳回,如果在指定的時間内未傳回就會抛出異常。
這種确認方式最大的缺點:速度特别慢。如果消息沒有确認,就會阻塞後續消息的發送,造成發送消息的速度很慢。
public class SingleConfirm {
/**
* 釋出确認模式
* 1、單個确認
* @param args
*/
public static void main(String[] args) {
Connection connection = RabbitMQUtils.getConnect();
Channel channel = null;
try {
channel = connection.createChannel();
// 開啟确認模式
channel.confirmSelect();
// 聲明隊列
channel.queueDeclare("confirm", true, false, false, null);
long begin = System.currentTimeMillis();
// 批量發送消息,每次發送進行确認
for (int i = 0; i <1000 ; i++) {
String message = i+"";
// 釋出單條消息
channel.basicPublish("", "confirm", null, message.getBytes());
// 單個消息發送之後,馬上釋出确認,使用 waitForConfirms
if(channel.waitForConfirms()){
//System.out.println("消息發送成功:"+i);
}
}
long end = System.currentTimeMillis();
System.out.println("發送1000條資料,使用單個釋出确認的時間為:"+(end-begin));
} catch (Exception e){
e.printStackTrace();
}finally {
RabbitMQUtils.close(channel, connection);
}
}
}
非常浪費時間
(3)批量确認釋出
每發送一部分消息,批量同步确認一次,若有消息無法發出,該模式無法确認是哪個消息無法發送;
釋出1000條消息,每發送100條确認一次
public class MultipleConfirm {
public static void main(String[] args) {
Connection connection = RabbitMQUtils.getConnect();
Channel channel = null;
try {
channel = connection.createChannel();
// 開啟确認模式
channel.confirmSelect();
// 聲明隊列
channel.queueDeclare("confirm", true, false, false, null);
long begin = System.currentTimeMillis();
// 批量确認消息的數量,沒發送100個傳回一個确認ack
int notAck = 100;
// 批量發送消息,每次發送進行确認
for (int i = 0; i <1000 ; i++) {
String message = i+"";
// 釋出單條消息
channel.basicPublish("", "confirm", null, message.getBytes());
if((i+1)%notAck==0){
// 每發送100條确認一次,檢視是否這一批是否有發送失敗的情況
channel.waitForConfirmsOrDie();
}
}
long end = System.currentTimeMillis();
System.out.println("發送1000條資料,使用批量釋出确認的時間為:"+(end-begin));
} catch (Exception e){
e.printStackTrace();
}finally {
RabbitMQUtils.close(channel, connection);
}
}
}
時間為 396毫秒
(4)異步釋出确認
生産者發送消息與 接收确認這兩個步驟不是同步的,是異步的,生産者隻管發送,同時使用監聽(addConfirmListener)傳回的确認,對成功确認、失敗确認兩種情況分别進行處理。非常高效且安全
- 開啟确認模式
- 聲明确認成功的callback
- 聲明确認失敗的callback
- 開啟确認監聽 addConfirmListener() ,設定callback
- 信道發送消息,不需要額外設定接收waitForConfirm什麼的
public static void main(String[] args) {
Connection connection = RabbitMQUtils.getConnect();
Channel channel = null;
try {
channel = connection.createChannel();
// 開啟确認模式
channel.confirmSelect();
// 聲明隊列
channel.queueDeclare("confirm", true, false, false, null);
// 作為接收成功的函數式接口 參數
ConfirmCallback ackCallback =(deliveryTag, multiple)-> System.out.println("确認的消息: "+deliveryTag);
// 表示接收成功的回調函數
// 作為接收失敗的函數式接口 參數
ConfirmCallback nackCallback = (deliveryTag,multiple)-> System.out.println("接收失敗!");
// 表示接收失敗的回調函數
// 這是一個異步的監聽 消息傳回确認資訊的 反應
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
// 批量發送消息,每次發送進行确認
for (int i = 0; i <1000 ; i++) {
String message = i+"";
// 釋出單條消息
channel.basicPublish("", "confirm", null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("發送1000條資料,使用異步釋出确認的時間為:"+(end-begin)+"ms");
} catch (Exception e){
e.printStackTrace();
}
// 執行完不能關閉連接配接,還要繼續監聽确認的資訊
/**
finally {
RabbitMQUtils.close(channel, connection);
}
*/
}