天天看點

RabbitMQ實戰(四) - RabbitMQ & Spring整合開發0 相關源碼 1 你将學到2 SpringAMQP使用者管理元件 - RabbitAdmin9 總結參考

相關源碼

1 你将學到

  • RabbitMQ 整合 Spring AMQP實戰
  • RabbitMQ 整合 Spring Boot實戰
  • RabbitMQ 整合 Spring Cloud實戰

2 SpringAMQP使用者管理元件 - RabbitAdmin

RabbitAdmin 類可以很好的操作 rabbitMQ,在 Spring 中直接進行注入即可

autoStartup 必須設定為 true,否則 Spring 容器不會加載它.

2.1 源碼分析

RabbitAdmin 的底層實作

  • 從 Spring 容器中擷取 Exchange、Bingding、Routingkey 以及Queue 的 @Bean 聲明
  • 然後使用 rabbitTemplate 的 execute 方法進行執行對應的聲明、修改、删除等一系列 RabbitMQ 基礎功能操作。例如添加交換機、删除一個綁定、清空一個隊列裡的消息等等
  • 依賴結構

RabbitAdmin實作了4個Interface: AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,InitializingBean。

AmqpAdmin

為AMQP指定一組基本的便攜式AMQP管理操作

ApplicationEventPublisherAware

實作該接口的類,通過函數setApplicationEventPublisher()獲得它執行所在的ApplicationEventPublisher。

ApplicationContextAware

實作該接口的類,通過函數setApplicationContext()獲得它執行所在的ApplicationContext。一般用來初始化object

InitializingBean

若class中實作該接口,在Spring Container中的bean生成之後,自動調用函數afterPropertiesSet()。

因其實作了InitializingBean接口,其中隻有一個方法,且在Bean加載後就執行

該功能可以被用來檢查是否所有的mandatory properties都設定好

  • 以上Interfaces的執行順序

    ApplicationEventPublisherAware -> ApplicationContextAware -> InitializingBean.

RabbitAdmin借助于 ApplicationContextAware 和 InitializingBean來擷取我們在配置類中聲明的exchange, queue, binding beans等資訊并調用channel的相應方法來聲明。

  • 首先,RabbitAdmin借助于ApplicationContextAware來擷取ApplicationContext applicationContext
  • 然後,借助于InitializingBean以及上面的applicationContext來實作rabbitMQ entity的聲明

下面是RabbitAdmin中afterPropertiesSet()函數的代碼片段。這裡在建立connection的時候調用函數initialize()。

于是以此為突破口進行源碼分析

  • RabbitAdmin#afterPropertiesSet

這裡

最後分别調用函數declareExchanges(),declareQueues(),declareBindings()來聲明RabbitMQ Entity

  • 先定義了三個集合,利用applicationContext.getBeansOfType來獲得container中的Exchange,Queue,Binding聲明放入集合中
  • 然後調用filterDeclarables()來過濾不能declareable的bean
  • 按照RabbitMQ的方式拼接
  • 使用rabbitTemplate執行互動

    2.2 實操

回顧一下消費者配置

1. 設定交換機類型

2. 将隊列綁定到交換機

交換機類型:

    FanoutExchange 類型: 将消息分發到所有的綁定隊列,無 routingkey 的概念

    HeadersExchange 類型:通過添加屬性 key-value 比對

    DirectExchange :按照 routingkey 分發到指定隊列

    TopicExchange : 多關鍵字比對           
  • 測試代碼
  • 檢視管控台

    3 SpringAMQP - RabbitMQ聲明式配置使用SpringAMQP 聲明即在 rabbit 基礎 API 裡面聲明一個 exchange、Bingding、queue。使用SpringAMQP 去聲明,就需要使用 @Bean 的聲明方式

  • 3 消息模闆 - RabbitTemplate上節中最後提到,這是與與 SpringAMQP 整合發送消息的關鍵類,它提供了豐富的發送消息方法

包括可靠性投遞消息方法、回調監聽消息接口

ConfirmCallback

、傳回值确認接口

ReturnCallback

等.

同樣我們需要注入到 Spring 容器中,然後直接使用.

RabbitTemplate 在 Spring 整合時需要執行個體化,但是在 Springboot 整合時,在配置檔案裡添加配置即可

  • 先聲明bean
  • 測試4 SpringAMQP消息容器-SimpleMessageListenerContainer這個類非常的強大,我們可以對他進行很多的設定,用對于消費者的配置項,這個類都可以滿足。它有監聽單個或多個隊列、自動啟動、自動聲明功能。
  • 設定事務特性、事務管理器、事務屬性、事務并發、是否開啟事務、復原消息等。但是我們在實際生産中,很少使用事務,基本都是采用補償機制
  • 設定消費者數量、最小最大數量、批量消費
  • 設定消息确認和自動确認模式、是否重回隊列、異常捕獲 Handler 函數
  • 設定消費者标簽生成政策、是否獨占模式、消費者屬性等
  • 設定具體的監聽器、消息轉換器等等。

SimpleMessageListenerContainer 可以進行動态設定,比如在運作中的應用可以動态的修改其消費者數量的大小、接收消息的模式等。很多基于 RabbitMQ 的自制定化後端管控台在進行設定的時候,也是根據這一去實作的

5 SpringAMQP消息擴充卡-MessageListenerAdapter消息監聽擴充卡,通過反射将消息處理委托給目标監聽器的處理方法,并進行靈活的消息類型轉換.

允許監聽器方法對消息内容類型進行操作,完全獨立于RabbitMQ API

預設情況下,傳入Rabbit消息的内容在被傳遞到目标監聽器方法之前被提取,以使目标方法對消息内容類型進行操作以String或者byte類型進行操作,而不是原始Message類型。 (消息轉換器)

消息類型轉換委托給MessageConverter接口的實作類。 預設情況下,将使用SimpleMessageConverter。 (如果您不希望進行這樣的自動消息轉換,

那麼請自己通過#setMessageConverter MessageConverter設定為null)

如果目标監聽器方法傳回一個非空對象(通常是消息内容類型,例如String或byte數組),它将被包裝在一個Rabbit Message 中,并發送使用來自Rabbit ReplyTo屬性或通過#setResponseRoutingKey(String)指定的routingKey的routingKey來傳送消息。(使用rabbitmq 來實作異步rpc功能時候會使用到這個屬性)。

注意:發送響應消息僅在使用ChannelAwareMessageListener入口點(通常通過Spring消息監聽器容器)時可用。 用作MessageListener不支援生成響應消息。

源碼分析

繼承自

AbstractAdaptableMessageListener

類,實作了

MessageListener

ChannelAwareMessageListener

接口

MessageListener

ChannelAwareMessageListener

接口的

onMessage

方法就是具體容器監聽隊列處理隊列消息的方法

實操

  • 委托類MessageDelegate,類中定義的方法也就是目标監聽器的處理方法
  • 配置類代碼
  • 運作測試代碼
  • 結果

從源碼分析小節中的成員變量,我們可以看出使用MessageListenerAdapter處理器進行消息隊列監聽處理

  • 如果容器沒有設定setDefaultListenerMethod

    則處理器中預設的處理方法名是

    handleMessage

  • 如果設定了setDefaultListenerMethod

則處理器中處理消息的方法名就是setDefaultListenerMethod方法參數設定的值

也可以通過setQueueOrTagToMethodName方法為不同的隊列設定不同的消息處理方法。

MessageListenerAdapter

onMessage

方法

  • 如果将參數改為String運作會出錯!應當是位元組數組,這時就需要使用轉換器才能保證正常運作
  • 使用轉換器

測試代碼運作成功!

6 消息轉換器 - MessageConverter我們在進行發送消息的時候,正常情況下消息體為二進制的資料方式進行傳輸,如果希望内部幫我們進行轉換,或者指定自定義的轉換器,就需要用到

MessageConverter

  • 我們自定義常用轉換器,都需要實作這個接口,然後重寫其中的兩個方法

    常見的轉換器

  • Json 轉換器 - jackson2JsonMessageConverter

    Java 對象的轉換功能

  • DefaultJackson2JavaTypeMapper 映射器

    Java對象的映射關系

  • 自定義二進制轉換器

    比如圖檔類型、PDF、PPT、流媒體實操

  • Order類
  • 配置JSON轉換器
  • 配置Java對象轉換器
  • 測試代碼及結果
  • 多個Java對象映射轉換
  • 全局轉換器
  • 圖檔轉換器實作
  • PDF轉換器實作
  • 7 RabbitMQ與SpringBoot2.x整合實戰7.1 配置詳解
  • publisher-confirms

    實作一個監聽器監聽 broker 給我們傳回的确認請求

    RabbitTemplate.ConfirmCallback

  • publisher-returns

    保證消息對 broker 可達,若出現路由鍵不可達情況,則使用監聽器對不可達消息後續處理,保證消息路由成功 -

    RabbitTemplate.ReturnCallback

    在發送消息的時候對 template 進行配置

    mandatory = true

    保證監聽有效
在生産端還可以配置其他屬性,比如發送重試、逾時時間、次數、間隔等Pro
  • 配置檔案
  • 主配置
  • 添加一個自定義的交換機
  • 添加一個Q
  • 建立綁定關系
  • 測試及結果

    Con配置消費端的 RabbitListener 是一個組合注解,裡面可以注解配置 。

@QueueBinding @Queue @Exchange 直接通過這個組合注解一次性搞定消費端交換機、隊列、綁定、路由、并且配置監聽功能等。

  • 将Pro中的綁定全部删除,再啟動Con的sb服務

發送一個 Java 實體對象

  • 在Con聲明隊列、交換機、routingKey基本配置
  • Con

    Payload 注解中的路徑要跟Pro的實體路徑完全一緻,要不然會找到不到該類,這裡為了簡便就不寫一個 common.jar 了,在實際開發裡面,這個 Java Bean 應該放在 common.jar中

  • 注意實體要實作 Serializable 序列化接口,要不然發送消息會失敗
  • Pro 照樣跟着寫一個發消息的方法

8 RabbitMQ & Spring Cloud Stream整合實戰Spring Cloud全家桶在整個中小型網際網路公司異常的火爆,Spring Cloud Stream也就漸漸的被大家所熟知,本小節主要來紹RabbitMQ與Spring Cloud Stream如何內建8.1 程式設計模型要了解程式設計模型,您應該熟悉以下核心概念

  • 目标綁定器

    提供與外部消息傳遞系統內建的元件

  • 目标綁定

    外部消息傳遞系統和應用程式之間的橋接提供的生産者和消費者消息(由目标綁定器建立)

  • 消息

    生産者和消費者用于與目标綁定器(以及通過外部消息傳遞系統的其他應用程式)通信的規範資料結構

8.2 應用模型Spring Cloud Stream應用程式由中間件中立核心組成。該應用程式通過Spring Cloud Stream注入其中的輸入和輸出通道與外界通信。通過中間件特定的Binder實作,通道連接配接到外部代理。

8.3 RabbitMQ綁定概述預設情況下,RabbitMQ Binder實作将每個目标映射到TopicExchange。對于每個使用者組,Queue綁定到該TopicExchange。每個使用者執行個體都為其組的Queue具有相應的RabbitMQ Consumer執行個體。對于分區生成器和使用者,隊列以分區索引為字尾,并使用分區索引作為路由鍵。對于匿名使用者(沒有組屬性的使用者),使用自動删除隊列(具有随機的唯一名稱)。

Barista接口: Barista接口是定義來作為後面類的參數,這一接口定義來通道類型和通道名稱,通道名稱是作為配置用,通道類型則決定了app會使用這一 通道進行發送消息還是從中接收消息

8.4 擴充 - 注解

  • @Output:輸出注解,用于定義發送消息接口
  • @Input:輸入注解,用于定義消息的消費者接口
  • @StreamListener:用于定義監聽方法的注解

使用Spring Cloud Stream非常簡單,隻需要使用好這3個注解即可,在實作高性能消息的生産和消費的場景非常适合,但是使用SpringCloudStream架構有一個非常大的問題就是不能實作可靠性的投遞,也就是沒法保證消息的100%可靠性,會存在少量消息丢失的問題

這個原因是因為SpringCloudStream架構為了和Kafka兼顧是以在實際工作中使用它的目的就是針對高性能的消息通信的!這點就是在目前版本Spring Cloud Stream的定位

8.5 實操

Pro

  • pom核心檔案
  • Sender

注解

@EnableBinding

聲明了這個應用程式綁定了2個通道:INPUT和OUTPUT。這2個通道是在接口

Barista

中定義的(Spring Cloud Stream預設設定)。所有通道都是配置在一個具體的消息中間件或綁定器中

  • Barista接口
  • @Input

    聲明了它是一個輸入類型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。這一名字與上述配置app2的配置檔案中position1應該一緻,表明注入了一個名字叫做input_channel的通道,它的類型是input,訂閱的主題是position2處聲明的mydest這個主題

  • @Output

    聲明了它是一個輸出類型的通道,名字是output_channel。這一名字與app1中通道名一緻,表明注入了一個名字為output_channel的通道,類型是output,釋出的主題名為mydest。

  • Bindings — 聲明輸入和輸出通道的接口集合。
  • Binder — 消息中間件的實作,如Kafka或RabbitMQ
  • Channel — 表示消息中間件和應用程式之間的通信管道
  • StreamListeners — bean中的消息處理方法,在中間件的MessageConverter特定事件中進行對象序列化/反序列化之後,将在信道上的消息上自動調用消息處理方法。
  • Message Schemas — 用于消息的序列化和反序列化,這些模式可以靜态讀取或者動态加載,支援對象類型的演變。

将消息釋出到指定目的地是由釋出訂閱消息模式傳遞。釋出者将消息分類為主題,每個主題由名稱辨別。訂閱方對一個或多個主題表示興趣。中間件過濾消息,将感興趣的主題傳遞給訂閱伺服器。訂閱方可以分組,消費者組是由組ID辨別的一組訂戶或消費者,其中從主題或主題的分區中的消息以負載均衡的方式遞送。

  • Pom核心檔案
  • 應用啟動類
  • 接收
  • 啟動Con服務,檢視管控台
  • 運作Pro測試代碼及結果

9 總結

本文我們學習了Spring AMQP的相關知識,通過實戰對RabbitMQ內建Spring有了直覺的認識,這樣為

我們後續的學習、工作使用都打下了堅實的基礎,最後我們整合了SpringBoot與Spring Cloud Stream,更友善更高效的內建到我們的應用服務中去!

參考

SpringAMQP 使用者管理元件 RabbitAdmin 以及聲明式配置 Spring Boot - RabbitMQ源碼分析 SpringAMQP 之 RabbitTemplate SpringAMQP 消息容器 - SimpleMessageListenerContainer MessageListenerAdapter詳解 SpringAMQP 消息轉換器 - MessageConverter RabbitMQ 與 SpringBoot2.X 整合 Spring Cloud Stream