天天看點

rabbitmq ACK消費者确認機制

生産者可靠性投遞消息後,消費者也可能會産生一些問題,比如:沒有接受到消息,接收消息後在代碼執行過程中出現了異常等。在這種情況下我們需要進行額外的處理,那麼就需要手動進行消息的确認簽收,rabbitmq給我們提供了一個機制:ACK機制。額外的知識:RabbitMQ的死信隊列詳解 - 簡書 (jianshu.com)

ACK機制有三種方式:

  • 自動确認 acknowledge="none"
  • 手動确認 acknowledge="manual"
  • 根據異常情況來确認(暫時不怎麼用) acknowledge="auto"

這篇部落格是在這個基礎上寫的(17條消息) rabbitMQ 可靠性消息投遞_angen2018的部落格-CSDN部落格,具體代碼我會打包好了,放個連結下載下傳。

代碼具體實作:

手動确認,在可靠性投遞時使用

開啟手動确認

spring:
  rabbitmq:
    port: 5672
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    #開啟confirms這個模式
    #springboot2.2.0.RELEASE支援這個
    #publisher-confirm-type: correlated
    publisher-confirms: true
    #開啟return模式
    publisher-returns: true
    listener:
      direct:
        #開啟手動簽收
        acknowledge-mode: manual
           

測試

package com.example.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
@RabbitListener(queues = "queue_test6")
public class MyRabbitListener {
    /**
     *
     * @param message 消息封裝的對象,(包括了消息的序号,消息本身,消費者名稱等)
     * @param channel  連結的通道
     * @param msg  消息本身
     */
    @RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
    public void receiveMessage(Message message, Channel channel,String msg){
        //接收消息
        System.out.println(msg);
        MessageProperties messageProperties = message.getMessageProperties();
        try {

            //模拟業務-100
            System.out.println("消費後減100元");
            //模拟出問題
            int i = 10/0;


            //如果正常就簽收消息
            //參數1,消息的序号
            //參數二,是否批量簽收 true是批量簽收
            channel.basicAck(messageProperties.getDeliveryTag(),true);
        } catch (Exception e) {
            e.printStackTrace();
            //不正常就拒收消息(丢棄了)

            try {
                //參數1 消息序号
                //參數2 是否批量拒絕消息
                //參數3 是否把消息重新回到隊列中
                channel.basicNack(messageProperties.getDeliveryTag(),true,false);
                //不能批量處理拒絕消息。第二個參數,true會重新放回隊列,是以需要自己根據業務邏輯判斷什麼時候使用拒絕
                //channel.basicReject();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }

        }



    }

    //消費者監聽隊列
    /*@RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
    public void receiveMessage(String msg){
        //接收消息
        System.out.println(msg);
        //模拟業務-100
    }*/

    //消費者監聽隊列
    @RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
    public void receiveMessage(Map msg){
        //接收消息
        System.out.println(msg);
        //模拟業務-100
    }
}
           
package com.example.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
@RabbitListener(queues = "queue_test6")
public class MyRabbitListener {
    /**
     *
     * @param message 消息封裝的對象,(包括了消息的序号,消息本身,消費者名稱等)
     * @param channel  連結的通道
     * @param msg  消息本身
     */
    @RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
    public void receiveMessage(Message message, Channel channel,String msg){
        //接收消息
        System.out.println(msg);
        MessageProperties messageProperties = message.getMessageProperties();
        try {

            //模拟業務-100
            System.out.println("消費後減100元");
            //模拟出問題
            int i = 10/0;


            //如果正常就簽收消息
            //參數1,消息的序号
            //參數二,是否批量簽收 true是批量簽收
            channel.basicAck(messageProperties.getDeliveryTag(),true);
        } catch (Exception e) {
            e.printStackTrace();
            //不正常就拒收消息(丢棄了)

            try {
                //如果該消息重回過隊列就不投遞了,避免死循環
                if(messageProperties.getRedelivered()){
                    System.out.println("已經重新投遞過一次了");
                }else{
                    //參數1 消息序号
                    //參數2 是否批量拒絕消息
                    //參數3 是否把消息重新回到隊列中
                    channel.basicNack(messageProperties.getDeliveryTag(),true,true);
                }

                //不能批量處理拒絕消息。第二個參數,true會重新放回隊列,是以需要自己根據業務邏輯判斷什麼時候使用拒絕
                //channel.basicReject();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }

        }



    }

    //消費者監聽隊列
    /*@RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
    public void receiveMessage(String msg){
        //接收消息
        System.out.println(msg);
        //模拟業務-100
    }*/

    //消費者監聽隊列
    @RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
    public void receiveMessage(Map msg){
        //接收消息
        System.out.println(msg);
        //模拟業務-100
    }
}
           

小結:

如何保證消息的高可靠性傳輸(資料一緻)?

  1. 持久化:交換機持久化,隊列持久化,消息持久化
  2. 生産方确認Confirm , Return
  3. 消費方确認 ACK

繼續閱讀