天天看點

Spring Boot與RabbitMQSpring Boot與消息

Spring Boot與消息

大多應用中, 可通過消息服務中間件來提升系統異步通信, 擴充解耦能力

消息服務中有消息代理(message broker) 和 目的地(destination) 兩個重要概念

當消息發送者發送消息以後, 将由消息代理接管, 消息代理保證消息傳遞到指定目的地

消息隊列

消息隊列主要有兩種形式的目的地
  1. 隊列(queue): 點對點消息通信 (point-to-point)
  2. 主題(topic): 釋出(publish) / 訂閱(subscribe) 消息通信

點對點式

概念: 消息發送者發送消息, 消息代理将其放入一個隊列中, 消息接收者從隊列中擷取消息内容, 消息讀取後被移出隊列;

消息隻有唯一的發送者和接收者, 但并不是說隻有一個接收者, 當一條消息被其中一個接收者接收後, 其餘接收者則接收不到該消息。

釋出訂閱式

概念: 發送者(釋出者) 發送消息到主題, 多個接收者(訂閱者) 監聽(訂閱)這個主題, 那麼就會在消息到達時同時收到消息

消息服務規範

JMS

Java Message Service(Java 消息服務): 基于JVM消息代理的規範, ActiveMQ, HometMQ 是JMS實作。

AMQP

Advanced Message Queuing Protocol(進階消息隊列協定) 相容JMS, RabbitMQ 是AMQP的實作。

Spring 支援

spring-jms

提供了對JMS的支援

spring-rabbit

提供了對AMQP的支援

需要

ConnectionFactory

的實作來連接配接消息代理

@JmsListener

(JMS) /

@RabbitLisrener

(AMQP) 注解在方法上監聽消息代理釋出的消息

@EnableJms

/

@EnableRabbit

開啟支援

RabbitMQ

RabbitMQ是一個由erlang開發的AMQP的開源實作。

核心概念

Message

消息, 消息是不具名的, 它由消息頭和消息體組成. 消息體是不透明的, 而消息頭則由一系列的可選屬性組成, 這些屬性包括

routing-key

(路由鍵),

priority

(相對于其他消息的優先權),

delivery-mode

(指出該消息可能需要持久性存儲)等…

Publisher

消息的生産者, 也是一個向交換器釋出消息的用戶端應用程式

Exchange

交換器, 用來接收生産者發送的消息并将這些消息路由給伺服器中的隊列.

Exchange有4種類型:

direct

(預設: 可實作點對點),

fanout

,

topic

, 和

headers

, 不同的類型的Exchange轉發消息的政策有所差別

Queue

消息隊列, 用來儲存消息直到發送給消費者. 它是消息的容器, 也是消息的終點. 一個消息可投入一個或多個隊列. 消息一直在隊列裡面, 等待消費者連接配接到這個隊列将其取走.

Binding

綁定, 用于消息隊列的交換器之間的關聯. 一個綁定就是基于路由鍵将交換器和消息隊列連接配接起來的路由規則, 是以可以将交換器了解成一個由綁定構成的路由表.

Exchange 和 Queue 的綁定可以是多對多的關系

Connection

網絡連接配接, 比如一個TCP連接配接

Channel

信道, 多路複用連接配接中的一條獨立的雙向資料流通道. 信道是建立在真實的TCP連接配接内的虛拟連接配接, AMQP指令都是通過信道發出去的, 不管是釋出消息, 訂閱隊列, 還是接收消息, 這些動作都是通過信道完成. 因為對于作業系統來說建立和銷毀TCP都是非常昂貴的開銷, 是以引入了信道的概念, 以複用一條TCP連接配接

Consumer

消息的消費者, 表示一個從消息隊列中取得消息的用戶端應用程式

Virtual Host

虛拟主機, 表示一批交換器, 消息隊列 和相關對象. 虛拟主機是共享相同的身份認證和加密環境的獨立伺服器域.

每個

vhost

本質上是一個mini版的RabbitMQ伺服器, 擁有自己的隊列, 交換器, 綁定, 和權限機制.

vhost

是AMQP概念的基礎, 必須在連接配接時指定, RabbitMQ預設的

vhost

/

.

Broler

表示消息隊列伺服器實體

流程圖:

Spring Boot與RabbitMQSpring Boot與消息

運作機制

AMQP中的消息路由

AMQP中消息路由過程和Java開發者屬性的JMS存在一些差别, AMQP中增加了

Exchange

Binding

的角色. 生産者把消息釋出到

Exchange

上, 消息最終到達隊列并被消費者接收, 而

Binding

決定交換器的消息應該發送到哪個隊列.

Spring Boot與RabbitMQSpring Boot與消息

Exchange類型

Exchange

分發消息時更具類型的不同分發政策有差別, 目前共四種類型:

direct

,

fanout

,

topic

,

headers

.

headers

比對AMQP消息的

header

而不是路由鍵,

header

交換器和

direct

交換器完全一緻, 但性能差很多, 目前幾乎用不到了, 是以直接使用另外三種類型.

direct Exchange

直連式交換器, 消息中的路由鍵

routing key

如果和

Binding

中的

binding key

一緻, 交換器就會将消息發送到對應的隊列中, 路由鍵與隊列名完全比對.

Spring Boot與RabbitMQSpring Boot與消息

Fanout Exchange

每個發到

fanout

類型交換器的消息都會分到所有綁定的隊列上去.

fanout

交換器不處理路由鍵, 隻是簡單的将隊列綁定到交換器上, 每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上. 類似子網廣播, 每台子網内的主機都獲得了一份複制的消息.

fanout

類型轉發消息式最快的.

Spring Boot與RabbitMQSpring Boot與消息

Topic Exchange

topic

交換器通過模式比對配置設定消息的路由鍵屬性, 将路由鍵和某個模式進行比對, 此時隊列需要綁定到一個模式上. 它将路由鍵和綁定鍵的字元串切分成單詞, 這些單詞之間用點隔開. 它同樣也會識别兩個通配符:

#

,

*

.

#

比對零個或多個單詞,

*

比對一個單詞.

Spring Boot與RabbitMQSpring Boot與消息

RabbitMQ整合

安裝RabbitMQ

在Docker中下載下傳RabbitMQ鏡像, 鏡像版本中帶有[

management

]字尾的為有管理頁面的版本

啟動時映射端口, 預設端口

5672

, 管理頁面端口

15672

啟動後可通過浏覽器通路其管理頁面, 預設使用者名/密碼: guest/guest

Spring Boot與RabbitMQSpring Boot與消息

引入依賴坐标

spring-boot-starter-amqp

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
           

配置RabbitMQ

在主配置檔案中配置

# 配置RabbitMQ
spring:
  rabbitmq:
    host: 172.20.10.2
    port: 5672
    username: guest
    password: guest
    virtual-host: /
           

RabbitTemplate

RabbitTemplate

可實作對消息的發送和接收, 以及對對象的自動序列化操作,

RabbitAutoConfigurtion

已經自動配置了

RabbitTemplate

, 隻需要在容器中擷取

@SpringBootTest
class AmqpApplicationTests {

   @Autowired
   RabbitTemplate template;

   @Test
   void contextLoads() {
      ...
   }
}
           

使用

RabbitTemplate

convertAndSend()

方法可以将對象實作java的序列化并發送

@Test
void contextLoads() {
   UserInfo userInfo = new UserInfo("001", "陳曉龍", "12345678");
   //對象被預設序列化後發送
   template.convertAndSend("exchange.direct", "study.news", userInfo);
}
           

在隊列中檢視其消息結果如圖:

Spring Boot與RabbitMQSpring Boot與消息

使用

RabbitTemplate

receiveAndConvert()

方法可接收隊列中的消息, 并反序列化為java對象

@Test
public void receive(){
   //接收資料
   Object o = template.receiveAndConvert("study.news");
   System.out.println(o.getClass());
   System.out.println(o);
}
           
Spring Boot與RabbitMQSpring Boot與消息

序列化為Json資料

編寫配置類, 使用

Jackson2JsonMessageConverter

替換原有的序列化, 并注入到容器中

@Configuration
public class MyAmqpConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
}
           

其在隊列中的結果為:

Spring Boot與RabbitMQSpring Boot與消息

AmqpAdmin

使用

AmqpAdmin

可以通過java代碼來實作

Exchange

,

Binding

,

Queue

的建立和删除

@SpringBootTest
class AmqpApplicationTests {

   @Autowired
   AmqpAdmin amqpAdmin;

   /**
    * 建立交換器
    */
   @Test
   public void createExchange(){
      amqpAdmin.declareExchange(new DirectExchange("exchange.amqpAdmin"));
   }

   /**
    * 建立隊列
    */
   @Test
   public void createQueue(){
      amqpAdmin.declareQueue(new Queue("queue.amqpAdmin", true));
   }

   /**
    * 綁定規則
    */
   @Test
   public void creatBinding(){
      amqpAdmin.declareBinding(new Binding("queue.amqpAdmin", Binding.DestinationType.QUEUE, "exchange.amqpAdmin", "queue.#", null));
   }

}
           

繼續閱讀