天天看點

SpringBoot消息機制(整合RabbitMQ)

RabbitMQ簡介

核心概念

Broker: 消息代理,這裡指安裝了消息中間件的伺服器,也就是RabbitMQ的服務端,生産者和消費者都需要與服務端建立長連接配接,當消息發送者發送消息後,将由消息代理接管,消息代理保證消息傳遞到指定目的地

Publisher:消息的生産者,也是一個向交換器釋出消息的用戶端應用程式

Message : 消息中包含 routing-key(路由鍵),該路由鍵會與綁定鍵比對,用于判斷該消息會被發送到哪個隊列中

Exchange:交換器,接受生産者發送的消息,并将消息路由到指定隊列

Queue:隊列,一個消息可投入一個或多個隊列中,等待消費者從中擷取消息

Binding:綁定關系,交換器拿到消息後,會把消息發送給隊列,那它會發送到哪個隊列呢?由綁定關系決定

Consumer:消息的消費者,也是一個擷取消息的用戶端應用程式

Connection:無論生産者發送消息,還是消費者接收消息,應用程式都需要與消息伺服器建立連接配接,并且一個用戶端(生産者端,消費者端)隻能與消息伺服器建立一條連接配接(長連接配接)

Channel:信道,每條連接配接可以建立多個信道,用來發送和接收不同類型的消息

Virtual Host:虛拟主機,用于隔離環境,比如生産環境和開發環境,會有不同的交換機隊列綁定關系組成,且兩邊互相隔離互不影響,虛拟主機以路徑辨別,比如生産環境/prod、開發環境/dev

RabbitMQ整體架構

SpringBoot消息機制(整合RabbitMQ)

RabbitMQ工作流程

​ 生産者會與消息伺服器建立一條長連接配接,并在連接配接裡面開辟一條通道來發送資料,資料就是我們所說的消息,每一個消息都必須指定路由鍵,消息會由代理伺服器交給交換機處理,交換機會根據消息中的路由鍵與交換機與隊列之間的綁定關系進行比對,決定把消息放入哪個隊列中,而消費中同樣會與伺服器建立一個長連接配接,同時它會對指定隊列進行監聽,如果發現隊列中存在消息,那消費者會通過連接配接中的隊列擷取消息

Exchange交換器類型

Exchange交換器,在RabbitMQ中占有重要角色,再解釋下它的作用:

​ 生産者産生的消息,首先會發給Broker消息代理伺服器,然後代理伺服器會把消息交給Exchange 交換器,并且交換器與隊列之間會存在綁定關系(Binding),(一個交換器可以綁定多個隊列,同時一個隊列可以被多個交換器綁定),由交換器根據消息中的路由規則及綁定關系決定把消息發送到哪個隊列中

RabbitMQ常用的交換器類型有: fanout 、 direct 、 topic 、 headers 四種。

1)Direct

direct類型的交換器為點對點通信模式,路由規則很簡單,它會把消息路由到那些BindingKey和RoutingKey完全比對的

隊列中,如下圖:

SpringBoot消息機制(整合RabbitMQ)

2)Fanout

廣播模式,會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中,如圖:

SpringBoot消息機制(整合RabbitMQ)

3)Topic

釋出訂閱模式,topic類型的交換器在direct比對規則上進行了擴充,也是将消息路由到BindingKey和RoutingKey

相比對的隊列中,這裡的比對規則稍微不同,可以進行模糊比對,它約定:

綁定鍵(BindingKey)和路由鍵(RoutingKey)的字元串可以是由

"."

分隔成單詞;

路由鍵(RoutingKey)BindingKey中可以存在兩種特殊字元

“*”

“#”

,用于模糊比對,其中

"*"

用于比對一個單詞,

"#"

用于比對0個或多個單詞。

SpringBoot消息機制(整合RabbitMQ)

4)headers

headers類型的交換器性能很差,不實用

RabbitMQ界面操作

概覽界面

安裝完RabbitMQ,通路15672端口即可進入圖形化界面

SpringBoot消息機制(整合RabbitMQ)

其中:

  1. 導入導出配置,可以把目前RabbitMQ伺服器的交換機、隊列、綁定關系等資訊同步到另一台伺服器
  2. 端口資訊如下:
SpringBoot消息機制(整合RabbitMQ)

建立隊列

SpringBoot消息機制(整合RabbitMQ)

參數解釋:

  1. Name:隊列的名字
  2. Durability:是否持久化,有兩個參數可選,Durable持久化、Transient臨時隊列
  3. Auto delete :是否自動删除。如果是,則在連接配接至少一個使用者之後,然後斷開所有使用者的連接配接,隊列将自行删除。
  4. Arguments:隊列可以設定一些參數

建立交換機

SpringBoot消息機制(整合RabbitMQ)

參數解釋:

  1. Name:交換機的名字
  2. Type:交換機類型 fanout 、 direct 、 topic 、 headers
  3. Durability:是否持久化,有兩個參數可選,Durable持久化、Transient臨時隊列
  4. Auto delete :是否自動删除。如果是,則在連接配接至少一個使用者之後,然後斷開所有使用者的連接配接,隊列将自行删除。
  5. Internal:是不是内部的交換機,如果是,則用戶端無法向交換機發送消息
  6. Arguments:交換機建立時可以設定一些參數

點選界面建立的交換機名稱,可以進入建立綁定關系、發送消息、删除交換機頁面,如下圖:

SpringBoot消息機制(整合RabbitMQ)

建立綁定關系

SpringBoot消息機制(整合RabbitMQ)

交換機可以綁定隊列(To queue),也可以綁定交換機(To exchange)

Routing Key:綁定關系,與消息中的路由鍵映射,決定把消息發送到哪個隊列

釋出消息

SpringBoot消息機制(整合RabbitMQ)

如圖所示:有消息中的路由鍵

hbger.msg

與綁定關系

hbger.msg

确定了把消息發送到了

hbger.msg

隊列

SpringBoot消息機制(整合RabbitMQ)

接收消息

SpringBoot消息機制(整合RabbitMQ)

參數解釋:

Ack Mode:回複模式

  1. Nack message requeue true:擷取消息,不告訴RabbitMQ服務端已經收到消息,并且把消息從新放入隊列中
  2. Ack message requeue false:擷取消息,告訴RabbitMQ服務端已經收到消息了,并且把消息從隊列中删除
  3. Reject requeue true:拒絕消息,把消息重新放回隊列
  4. Reject requeue false:拒絕消息,把消息從隊列中删除

SpringBoot整合RabbitMQ

1) 導入amqp依賴

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>
<!--自定義消息轉化器Jackson2JsonMessageConverter所需依賴-->
   <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
   </dependency>
           

RabbitMQ的自動配置類為:

RabbitAutoConfiguration

,引入amqp後這個類會自動生效

通過查閱

RabbitAutoConfiguration

源碼,發現它向容器中注入了RabbitTemplate、AmqpAdmin這兩個類

AmqpAdmin 完成對Exchange,Queue,Binding的操作

RabbitTemplate 類是發送和接收消息的工具類

2) 配置相關屬性,所有的屬性均是與

RabbitProperties

進行綁定的,以

spring.rabbitmq

開頭

# 指定rebbitmq伺服器主機
spring.rabbitmq.host=127.0.0.1
# 用戶端的連接配接端口預設為 5672
spring.rabbitmq.port=5672
# 使用者名
spring.rabbitmq.username=guest
# 密碼
spring.rabbitmq.password=guest
# 虛拟主機
spring.rabbitmq.virtual-host=/
           

3)在主程式類中添加

@EnableRabbit

注解,開啟RabbitMQ功能

@SpringBootApplication
@EnableRabbit   //開啟RabbitMQ功能
public class CrudApplication {

    public static void main(String[] args) {
        SpringApplication.run(CrudApplication.class, args);
    }

}
           

AmqpAdmin

AmqpAdmin主要完成對Exchange,Queue,Binding的操作

建立交換機

@Test
    public void createExchange(){
        /**
         * DirectExchange(String name, boolean durable, 
         							boolean autoDelete, Map<String, Object> arguments)
         * name:交換機名字
         * durable:是否持久化
         * autoDelete:是否自動删除
         * arguments:參數資訊
         */
        DirectExchange directExchange = new DirectExchange("hbger.exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
    }
           

建立隊列

@Test
    public void createQueue(){
        /**
         * Queue(String name, boolean durable, boolean exclusive,
         *                  boolean autoDelete, @Nullable Map<String, Object> arguments)
         * name:隊列名字
         * durable:是否持久化
         * exclusive:是否排他,排他的意思是如果有一條連接配接,連接配接了隊列,那麼其他連接配接就無法連接配接隊列
         * autoDelete:是否自動删除
         * arguments:參數資訊
         */
        Queue queue = new Queue("hbger.queue",true,false,false);
        amqpAdmin.declareQueue(queue);
    }
           

建立綁定關系

@Test
    public void createBinding(){
        /**
         * Binding(String destination, Binding.DestinationType destinationType,
         *                String exchange, String routingKey, @Nullable Map<String, Object> arguments)
         * destination:目的地
         * destinationType:目的地類型
         * exchange:交換機
         * routingKey:綁定關系
         * arguments:參數資訊
         */
        Binding binding = new Binding("hbger.queue", Binding.DestinationType.QUEUE,
                                    "hbger.exchange","hbger",null);
        amqpAdmin.declareBinding(binding);
    }
           

RabbitTemplate

RabbitTemplate類是發送和接收消息的工具類

發送消息

如果發送的消息是一個對象,會使用序列化機制,對象必須實作Serializable接口

@Test
    public  void sendMessage(){
        //發送普通消息
        rabbitTemplate.convertAndSend("hbger.exchange","hbger","HAHA");
        //發送對象消息
        Employee employee = new Employee();
        employee.setId(10086);
        employee.setLastName("HAHA");
        rabbitTemplate.convertAndSend("hbger.exchange","hbger",employee);
    }
           

RabbitMQ預設的消息轉化器是SimpleMessageConverter

若要以Json方式存儲對象,就要自定義消息轉換器

@Configuration
public class RabbitMQConfig {
    @Bean
    public MessageConverter messageConverter() {
        //在容器中導入Json的消息轉換器
        return new Jackson2JsonMessageConverter();
    }
}
           

擷取消息

使用

@RabbitListener

監聽指定隊列,如果隊列中存在消息,就會觸發該方法擷取消息

一個隊列可以被多個方法監聽,但一條消息隻能被一個方法取出,且一次隻能取出一條

方法執行完後才能去接收下一條消息

/**
     * queues : 監聽的隊列
     * 參數可以寫以下類型
     *      Message:原生消息詳細資訊
     *      Employee:發送的消息類型
     *      Channel :目前傳輸資料的通道
     */
    @RabbitListener(queues={"hbger.queue"})
    public void getMessage(Message message, Employee employee, Channel channel){
        //消息體資訊{"id":10086,"lastName":"HAHA"}
        byte[] body = message.getBody();
        //消息屬性資訊
        MessageProperties messageProperties = message.getMessageProperties();
        //消息頭資訊
        Object header = messageProperties.getHeaders();

        System.out.println("接收的消息為------"+messageProperties.toString());
    }
           

@RabbitListener

可以标注在類和方法上,

@RabbitHandler

隻能标注在方法上

這兩個注解可以配合使用,使用重載方法監聽指定隊列擷取不同類型的消息

@Component
@RabbitListener(queues={"hbger.queue"})
public class GetMessage {

    /**
     * queues : 監聽的隊列
     * 參數可以寫以下類型
     *      Message:原生消息詳細資訊
     *      Employee:發送的消息類型
     *      Channel :目前傳輸資料的通道
     */
    @RabbitHandler
    public void getMessage(Message message, Employee employee, Channel channel){
        //消息體資訊{"id":10086,"lastName":"HAHA"}
        byte[] body = message.getBody();
        //消息屬性資訊
        MessageProperties messageProperties = message.getMessageProperties();
        //消息頭資訊
        Object header = messageProperties.getHeaders();

        System.out.println("接收的消息為------"+messageProperties.toString());
    }
    @RabbitHandler
    public void getMessage(Message message, Department department, Channel channel){
        //消息體資訊{"id":10086,"lastName":"HAHA"}
        byte[] body = message.getBody();
        //消息屬性資訊
        MessageProperties messageProperties = message.getMessageProperties();
        //消息頭資訊
        Object header = messageProperties.getHeaders();

        System.out.println("接收的消息為------"+messageProperties.toString());
    }
}
           

消息确認機制

目的是為了保證消息的可靠抵達,防止由于網絡抖動,伺服器當機等原因導緻消息丢失

為了防止消息丢失這種情況發生:

​ 1)我們可以使用事務消息,由于資料是在通道中傳輸的,我可以設定通道為事務模式解決這一問題,但是這種方式在性能方面的開銷比較大,一般也不推薦使用

​ 2)引入特殊時機确認回調,生産者(Publisher)發送消息,代理伺服器(Broker)如果接受到了消息,就會回調confirmCallback方法,Broker使用交換機将消息投遞給隊列,這一過程如果失敗了,則會回調returnCallback 方法;而消費端,會通過ack機制告訴代理伺服器是否擷取到了消息,如果正确接收,告訴伺服器從隊列删除消息,如果未接收,告訴伺服器重新投遞等操作

可以通過結合資料庫事務表的方式,保證消息正确抵達

發送端确認機制

#開啟發送端确認
spring.rabbitmq.publisher-confirms=true
           

ConfirmCallback

@Configuration
public class RabbitMQConfig {

     @Autowired
    RabbitTemplate rabbitTemplate;
	
    @PostConstruct  
    public void initRabbitTemplate(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *  correlationData 用來表示目前消息唯一關聯資料
             *  ack  代理伺服器是否成功收到消息
             *  cause 未收到消息的原因
             * **/
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("代理伺服器成功接收消息");
            }
        });
    }
}    
           

ReturnCallback

#開啟消息未能抵達隊列的回調
spring.rabbitmq.publisher-returns=true
#隻要未能抵達隊列,以異步發送優先回調ReturnCallback
spring.rabbitmq.template.mandatory=true
           
@Configuration
public class RabbitMQConfig {

     @Autowired
    RabbitTemplate rabbitTemplate;
	
    @PostConstruct  
    public void initRabbitTemplate(){
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 若消息沒有抵達消息隊列就會觸發這個回調
             * @param message 投遞失敗的消息詳細資訊
             * @param replyCode 回複的狀态碼
             * @param replyText 回複的文本内容
             * @param exchange 當時這個消息發給哪個交換機
             * @param routingKey 當時這個消息用的路由鍵
             **/
            @Override
            public void returnedMessage(Message message, int replyCode,
                                        String replyText, String exchange, String routingKey) {
                System.out.println("消息失敗回調函數觸發");
            }
        });
    }
}    
           

接收端确認機制

ack機制,可以參考圖形界面,接收消息界面Ack Mode:回複模式參數

預設是自動确認模式,消費端如果正确接收,就會告訴伺服器把消息從隊列删除消息,

比如隊列中有3個消息未被接收,如果消費端此時正在接收消息,當接收完一個消息時伺服器當機了,這時會發現隊列中的消息已經全部被移除,導緻消息丢失

為解決這一問題,我們可以開啟手動确認模式

隻要沒有明确回複伺服器,已經接收到消息,隊列中的消息會一直存在

#手動回複是否簽收消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
           
/**
     * queues : 監聽的隊列
     * 參數可以寫以下類型
     *      Message:原生消息詳細資訊
     *      Employee:發送的消息類型
     *      Channel :目前傳輸資料的通道
     */
    @RabbitHandler
    public void getMessage(Message message, Employee employee, Channel channel){

        //消息屬性資訊
        MessageProperties messageProperties = message.getMessageProperties();
        //通道内按消息順序自增,消息在channel中标記消息用的
        long deliveryTag = messageProperties.getDeliveryTag();
        
        /**
         * 确認接收到了消息
         * void basicAck(long deliveryTag, boolean multiple)
         *      deliveryTag消息辨別
         *      multipl是否批量接收
         */
        try {
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {

        }
        
        /**
         * 拒收消息
         * void basicAck(long deliveryTag, boolean multiple, boolean requeue)
         *         deliveryTag:消息辨別
         *         multipl:是否批量拒收
         *         requeue:是否重新放入隊列,如果為false消息會從隊列中删除
         */
        try {
            channel.basicNack(deliveryTag,false,true);
        } catch (IOException e) {
            
        }
    }
           

繼續閱讀