天天看點

RabbitMQ 學習(七)----釋出确認

文章目錄

  • ​​RabbitMQ 學習(七)----釋出确認​​
  • ​​(1)開啟釋出确認的方法​​
  • ​​(2)單個釋出确認​​
  • ​​(3)批量确認釋出​​
  • ​​(4)異步釋出确認​​

RabbitMQ 學習(七)----釋出确認

釋出确認是一個保證RabbitMQ 可靠性的一個機制

  保證生産者将資訊成功的發送到 RabbitMQ的 server端了,那麼broker就會回一個确認,如果沒有收到或者收到拒絕資訊,那麼說明可能網絡不好沒有發送成功,server端當機了,broker拒絕接收等情況,如果不進行後續處理,那麼資訊就會丢失,生産者收到失敗的消息使用回調函數在進行處理。

  生産者将信道設定成 confirm 模式,所有在該信道上釋出的消息都會指定一個唯一的ID,一旦消息投遞到隊列中,就是發送成功了,broker會立刻發送一個确認ack 給生産者,這個時候,生産者就知道消息已經發送成功了。

  如果隊列和資訊是持久化的,那麼确認消息會在将消息寫入磁盤之後再發出,broker傳回的确認包含 确認消息的序列号,還可以設定 multiple,表示此序号前的所有消息都得到了處理。

  一旦釋出消息,生産者等待确認的同時繼續發送下一條消息,如果rabbitMq自身内部錯誤導緻消息為發送成功,生産者就可以再回調方法中繼續處理。

為了保證RabbitMQ的可靠性,生産者怎麼做?

1、設定要求隊列持久化

2、設定隊列中的消息持久化

3、釋出确認,保證寫入磁盤,broker成功收到

(1)開啟釋出确認的方法

  channel 的cofirm模式預設是沒有開啟的,如果需要開啟需要調用 confirmSelect(),當我們使用釋出确認的時候。需要使用channel調用該方法。

Channel channel =connection.createChannel();
channel.confirmSelect();      

(2)單個釋出确認

  這是一種同步确認釋出的方式,就是釋出一個消息之後等待确認後,後續的消息才能繼續釋出。​

​waitForConfiemOrDie(long)​

​ 這個方法隻有當消息被确認才會傳回,如果在指定的時間内未傳回就會抛出異常。

  這種确認方式最大的缺點:速度特别慢。如果消息沒有确認,就會阻塞後續消息的發送,造成發送消息的速度很慢。

public class SingleConfirm {
    /**
     * 釋出确認模式
     * 1、單個确認
     * @param args
     */
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {

            channel = connection.createChannel();

            // 開啟确認模式
            channel.confirmSelect();

            // 聲明隊列
            channel.queueDeclare("confirm", true, false, false, null);

            long begin = System.currentTimeMillis();

            // 批量發送消息,每次發送進行确認
            for (int i = 0; i <1000 ; i++) {

                String message = i+"";

                // 釋出單條消息
                channel.basicPublish("", "confirm", null, message.getBytes());

                // 單個消息發送之後,馬上釋出确認,使用 waitForConfirms
                if(channel.waitForConfirms()){
                    //System.out.println("消息發送成功:"+i);
                }

            }

            long end = System.currentTimeMillis();

            System.out.println("發送1000條資料,使用單個釋出确認的時間為:"+(end-begin));

        } catch (Exception e){
            e.printStackTrace();
        }finally {
            RabbitMQUtils.close(channel, connection);
        }

    }
}      

非常浪費時間

RabbitMQ 學習(七)----釋出确認

(3)批量确認釋出

  每發送一部分消息,批量同步确認一次,若有消息無法發出,該模式無法确認是哪個消息無法發送;

釋出1000條消息,每發送100條确認一次

public class MultipleConfirm {
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {

            channel = connection.createChannel();

            // 開啟确認模式
            channel.confirmSelect();

            // 聲明隊列
            channel.queueDeclare("confirm", true, false, false, null);

            long begin = System.currentTimeMillis();

            // 批量确認消息的數量,沒發送100個傳回一個确認ack
            int notAck = 100;

            // 批量發送消息,每次發送進行确認
            for (int i = 0; i <1000 ; i++) {

                String message = i+"";

                // 釋出單條消息
                channel.basicPublish("", "confirm", null, message.getBytes());

                if((i+1)%notAck==0){
                    // 每發送100條确認一次,檢視是否這一批是否有發送失敗的情況
                    channel.waitForConfirmsOrDie();
                }


            }

            long end = System.currentTimeMillis();

            System.out.println("發送1000條資料,使用批量釋出确認的時間為:"+(end-begin));

        } catch (Exception e){
            e.printStackTrace();
        }finally {
            RabbitMQUtils.close(channel, connection);
        }

    }
}      

時間為 396毫秒

RabbitMQ 學習(七)----釋出确認

(4)異步釋出确認

  生産者發送消息與 接收确認這兩個步驟不是同步的,是異步的,生産者隻管發送,同時使用監聽(addConfirmListener)傳回的确認,對成功确認、失敗确認兩種情況分别進行處理。非常高效且安全

  • 開啟确認模式
  • 聲明确認成功的callback
  • 聲明确認失敗的callback
  • 開啟确認監聽 addConfirmListener() ,設定callback
  • 信道發送消息,不需要額外設定接收waitForConfirm什麼的
public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {

            channel = connection.createChannel();

            // 開啟确認模式
            channel.confirmSelect();

            // 聲明隊列
            channel.queueDeclare("confirm", true, false, false, null);

            // 作為接收成功的函數式接口 參數
            ConfirmCallback ackCallback =(deliveryTag, multiple)-> System.out.println("确認的消息: "+deliveryTag);
                // 表示接收成功的回調函數


            // 作為接收失敗的函數式接口 參數
            ConfirmCallback nackCallback = (deliveryTag,multiple)-> System.out.println("接收失敗!");
                // 表示接收失敗的回調函數


            // 這是一個異步的監聽 消息傳回确認資訊的 反應
            channel.addConfirmListener(ackCallback,nackCallback);

            long begin = System.currentTimeMillis();


            // 批量發送消息,每次發送進行确認
            for (int i = 0; i <1000 ; i++) {
                String message = i+"";
                // 釋出單條消息
                channel.basicPublish("", "confirm", null, message.getBytes());

            }

            long end = System.currentTimeMillis();

            System.out.println("發送1000條資料,使用異步釋出确認的時間為:"+(end-begin)+"ms");

        } catch (Exception e){
            e.printStackTrace();
        }
        
        // 執行完不能關閉連接配接,還要繼續監聽确認的資訊
        /**
         finally {
            RabbitMQUtils.close(channel, connection);
        }
         */

    }