天天看點

Spring Cloud Alibaba - Spring Cloud Stream 整合 RocketMQ

Spring Cloud Stream

在微服務的開發過程中,可能會經常用到消息中間件,通過消息中間件在服務與服務之間傳遞消息,不管你使用的是哪款消息中間件,比如RabbitMQ、Kafka和RocketMQ,那麼消息中間件和服務之間都有一點耦合性,這個耦合性就是指如果我原來使用的RabbitMQ,現在要替換為RocketMQ,那麼我們的微服務都需要修改,變動會比較大,因為這兩款消息中間件有一些差別,如果我們使用Spring Cloud Stream來整合我們的消息中間件,那麼這樣就可以降低微服務和消息中間件的耦合性,做到輕松在不同消息中間件間切換,當然Spring Cloud Stream官方隻支援rabbitmq 和 kafka,spring cloud alibaba新寫了一個starter可以支援RocketMQ;

按照官方的定義,Spring Cloud Stream 是一個建構消息驅動微服務的架構;

Spring Cloud Stream解決了開發人員無感覺的使用消息中間件的問題,因為Spring Cloud Stream對消息中間件的進一步封裝,可以做到代碼層面對消息中間件的無感覺,甚至于動态的切換中間件(rabbitmq切換為rocketmq或者kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程;

Spring Cloud Alibaba - Spring Cloud Stream 整合 RocketMQ

Spring Cloud Stream 内部有幾個概念:Binder 、Binding、input、output;

1、Binder: 跟外部消息中間件內建的元件,用來建立Binding,各消息中間件都有自己的 Binder 實作;

比如 Kafka 的實作 KafkaMessageChannelBinder,RabbitMQ 的實作 RabbitMessageChannelBinder 以及 RocketMQ 的實作 RocketMQMessageChannelBinder;

2、Binding: 包括 Input Binding 和 Output Binding;

Binding 在消息中間件與應用程式提供的 Provider 和 Consumer 之間提供了一個橋梁,實作了開發者隻需使用應用程式的 Provider 或 Consumer 生産或消費資料即可,屏蔽了開發者與底層消息中間件的接觸;

3、input

應用程式通過input(相當于消費者consumer)與Spring Cloud Stream中Binder互動,而Binder負責與消息中間件互動,是以,我們隻需關注如何與Binder互動即可,而無需關注與具體消息中間件的互動。

4、Output

output(相當于生産者producer)與Spring Cloud Stream中Binder互動;

組成

說明

Binder

Binder是應用與消息中間件之間的封裝,目前實作了Kafka和RabbitMQ的Binder,通過Binder可以很友善的連接配接中間件,可以動态的改變消息類型(對應于Kafka的topic,RabbitMQ的exchange),這些都可以通過配置檔案來實作;

@Input

該注解辨別輸入通道,通過該輸入通道接收消息進入應用程式

@Output

該注解辨別輸出通道,釋出的消息将通過該通道離開應用程式

@StreamListener

監聽隊列,用于消費者的隊列的消息接收

@EnableBinding

将信道channel和exchange、topic綁定在一起

消息生産者

1、建立SpringBoot應用31-rocket-spring-cloud-stream;

2、添加依賴:

配置檔案

Spring Cloud Alibaba - Spring Cloud Stream 整合 RocketMQ

相容性問題:

注意版本需要使用springboot2.2.5

消息發送:

消息接收:

可以通過調用SenderService中的方法進行發送資訊,也可以通過在啟動類中的Main方法中進行調用SenderService的方法進行發送資訊:

Spring Cloud Alibaba - Spring Cloud Stream 整合 RocketMQ

在前面的案例中,我們已經實作了一個基礎的 Spring Cloud Stream 消息傳遞處理操作,但在操作之中使用的是系統提供的 Source (output)、Sink(input),接下來我們來看一下自定義信道名稱;

Apache RocketMQ在4.3.0版中已經支援分布式事務消息,這裡RocketMQ采用了2PC的思想來實作了送出事務消息,同時增加一個補償邏輯來處理二階段逾時或者失敗的消息,如下圖所示:

Spring Cloud Alibaba - Spring Cloud Stream 整合 RocketMQ

上圖說明了事務消息的大緻方案,其中分為兩個流程:正常事務消息的發送及送出、事務消息的補償流程;

1.事務消息發送及送出:

(1) 發送消息(half消息);

(2) 服務端響應消息寫入結果;

(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行);

(4) 根據本地事務狀态執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)

2.補償流程:

(1) 對沒有Commit/Rollback的事務消息(pending狀态的消息),從服務端發起一次“回查”;

(2) Producer收到回查消息,檢查回查消息對應的本地事務的狀态;

(3) 根據本地事務狀态,重新Commit或者Rollback;

其中,補償階段用于解決消息Commit或者Rollback發生逾時或者失敗的情況;

事務消息一共有三種狀态:送出狀态、復原狀态、中間狀态;

TransactionStatus.CommitTransaction: 送出事務,代表消費者可以消費此消息;

TransactionStatus.RollbackTransaction: 復原事務,代表消息将被删除,不能被消費;

TransactionStatus.Unknown: 中間狀态,代表需要檢查消息隊列來确定狀态;

MQ内部邏輯:

自定義信道-重寫Source

自定義信道-重寫Sink

消費者接收消息:

*spring.cloud.stream.rocketmq.binder.name-server*

RocketMQ NameServer 位址(老版本使用 namesrv-addr 配置項);

Default: 127.0.0.1:9876.

*spring.cloud.stream.rocketmq.binder.access-key*

阿裡雲賬号 AccessKey。

Default: null.

*spring.cloud.stream.rocketmq.binder.secret-key*

阿裡雲賬号 SecretKey。

*spring.cloud.stream.rocketmq.binder.enable-msg-trace*

是否為 Producer 和 Consumer 開啟消息軌迹功能

Default: true.

*spring.cloud.stream.rocketmq.binder.customized-trace-topic*

消息軌迹開啟後存儲的 topic 名稱。

Default: RMQ_SYS_TRACE_TOPIC.

下面的這些配置是以 spring.cloud.stream.rocketmq.bindings..consumer. 為字首的 RocketMQ Consumer 相關的配置。

*enable*

是否啟用 Consumer;

預設值: true.

*tags*

Consumer 基于 TAGS 訂閱,多個 tag 以 || 分割;

預設值: empty.

*sql*

Consumer 基于 SQL 訂閱;

*broadcasting*

Consumer 是否是廣播消費模式。如果想讓所有的訂閱者都能接收到消息,可以使用廣播模式;

預設值: false.

*orderly*

Consumer 是否同步消費消息模式;

*delayLevelWhenNextConsume*

異步消費消息模式下消費失敗重試政策:

-1,不重複,直接放入死信隊列

0,broker 控制重試政策

>0,client 控制重試政策

預設值: 0.

*suspendCurrentQueueTimeMillis*

同步消費消息模式下消費失敗後再次消費的時間間隔;

預設值: 1000.

下面的這些配置是以 spring.cloud.stream.rocketmq.bindings..producer. 為字首的 RocketMQ Producer 相關的配置;

是否啟用 Producer;

*group*

Producer group name;

*maxMessageSize*

消息發送的最大位元組數;

預設值: 8249344.

*transactional*

是否發送事務消息;

*sync*

是否使用同步得方式發送消息;

*vipChannelEnabled*

是否在 Vip Channel 上發送消息;

*sendMessageTimeout*

發送消息的逾時時間(毫秒);

預設值: 3000.

*compressMessageBodyThreshold*

消息體壓縮閥值(當消息體超過 4k 的時候會被壓縮);

預設值: 4096.

*retryTimesWhenSendFailed*

在同步發送消息的模式下,消息發送失敗的重試次數;

預設值: 2.

*retryTimesWhenSendAsyncFailed*

在異步發送消息的模式下,消息發送失敗的重試次數;

*retryNextServer*

消息發送失敗的情況下是否重試其它的 broker;