Spring Cloud Stream
在微服務的開發過程中,可能會經常用到消息中間件,通過消息中間件在服務與服務之間傳遞消息,不管你使用的是哪款消息中間件,比如RabbitMQ、Kafka和RocketMQ,那麼消息中間件和服務之間都有一點耦合性,這個耦合性就是指如果我原來使用的RabbitMQ,現在要替換為RocketMQ,那麼我們的微服務都需要修改,變動會比較大,因為這兩款消息中間件有一些差別,如果我們使用Spring Cloud Stream來整合我們的消息中間件,那麼這樣就可以降低微服務和消息中間件的耦合性,做到輕松在不同消息中間件間切換,當然Spring Cloud Stream官方隻支援rabbitmq 和 kafka,spring cloud alibaba新寫了一個starter可以支援RocketMQ;
按照官方的定義,Spring Cloud Stream 是一個建構消息驅動微服務的架構;
Spring Cloud Stream解決了開發人員無感覺的使用消息中間件的問題,因為Spring Cloud Stream對消息中間件的進一步封裝,可以做到代碼層面對消息中間件的無感覺,甚至于動态的切換中間件(rabbitmq切換為rocketmq或者kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程;
Spring Cloud Stream 内部有幾個概念:Binder 、Binding、input、output;
1、Binder: 跟外部消息中間件內建的元件,用來建立Binding,各消息中間件都有自己的 Binder 實作;
比如 Kafka 的實作 KafkaMessageChannelBinder,RabbitMQ 的實作 RabbitMessageChannelBinder 以及 RocketMQ 的實作 RocketMQMessageChannelBinder;
2、Binding: 包括 Input Binding 和 Output Binding;
Binding 在消息中間件與應用程式提供的 Provider 和 Consumer 之間提供了一個橋梁,實作了開發者隻需使用應用程式的 Provider 或 Consumer 生産或消費資料即可,屏蔽了開發者與底層消息中間件的接觸;
3、input
應用程式通過input(相當于消費者consumer)與Spring Cloud Stream中Binder互動,而Binder負責與消息中間件互動,是以,我們隻需關注如何與Binder互動即可,而無需關注與具體消息中間件的互動。
4、Output
output(相當于生産者producer)與Spring Cloud Stream中Binder互動;
組成
說明
Binder
Binder是應用與消息中間件之間的封裝,目前實作了Kafka和RabbitMQ的Binder,通過Binder可以很友善的連接配接中間件,可以動态的改變消息類型(對應于Kafka的topic,RabbitMQ的exchange),這些都可以通過配置檔案來實作;
@Input
該注解辨別輸入通道,通過該輸入通道接收消息進入應用程式
@Output
該注解辨別輸出通道,釋出的消息将通過該通道離開應用程式
@StreamListener
監聽隊列,用于消費者的隊列的消息接收
@EnableBinding
将信道channel和exchange、topic綁定在一起
消息生産者
1、建立SpringBoot應用31-rocket-spring-cloud-stream;
2、添加依賴:
配置檔案
相容性問題:
注意版本需要使用springboot2.2.5
消息發送:
消息接收:
可以通過調用SenderService中的方法進行發送資訊,也可以通過在啟動類中的Main方法中進行調用SenderService的方法進行發送資訊:
在前面的案例中,我們已經實作了一個基礎的 Spring Cloud Stream 消息傳遞處理操作,但在操作之中使用的是系統提供的 Source (output)、Sink(input),接下來我們來看一下自定義信道名稱;
Apache RocketMQ在4.3.0版中已經支援分布式事務消息,這裡RocketMQ采用了2PC的思想來實作了送出事務消息,同時增加一個補償邏輯來處理二階段逾時或者失敗的消息,如下圖所示:
上圖說明了事務消息的大緻方案,其中分為兩個流程:正常事務消息的發送及送出、事務消息的補償流程;
1.事務消息發送及送出:
(1) 發送消息(half消息);
(2) 服務端響應消息寫入結果;
(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行);
(4) 根據本地事務狀态執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)
2.補償流程:
(1) 對沒有Commit/Rollback的事務消息(pending狀态的消息),從服務端發起一次“回查”;
(2) Producer收到回查消息,檢查回查消息對應的本地事務的狀态;
(3) 根據本地事務狀态,重新Commit或者Rollback;
其中,補償階段用于解決消息Commit或者Rollback發生逾時或者失敗的情況;
事務消息一共有三種狀态:送出狀态、復原狀态、中間狀态;
TransactionStatus.CommitTransaction: 送出事務,代表消費者可以消費此消息;
TransactionStatus.RollbackTransaction: 復原事務,代表消息将被删除,不能被消費;
TransactionStatus.Unknown: 中間狀态,代表需要檢查消息隊列來确定狀态;
MQ内部邏輯:
自定義信道-重寫Source
自定義信道-重寫Sink
消費者接收消息:
*spring.cloud.stream.rocketmq.binder.name-server*
RocketMQ NameServer 位址(老版本使用 namesrv-addr 配置項);
Default: 127.0.0.1:9876.
*spring.cloud.stream.rocketmq.binder.access-key*
阿裡雲賬号 AccessKey。
Default: null.
*spring.cloud.stream.rocketmq.binder.secret-key*
阿裡雲賬号 SecretKey。
*spring.cloud.stream.rocketmq.binder.enable-msg-trace*
是否為 Producer 和 Consumer 開啟消息軌迹功能
Default: true.
*spring.cloud.stream.rocketmq.binder.customized-trace-topic*
消息軌迹開啟後存儲的 topic 名稱。
Default: RMQ_SYS_TRACE_TOPIC.
下面的這些配置是以 spring.cloud.stream.rocketmq.bindings..consumer. 為字首的 RocketMQ Consumer 相關的配置。
*enable*
是否啟用 Consumer;
預設值: true.
*tags*
Consumer 基于 TAGS 訂閱,多個 tag 以 || 分割;
預設值: empty.
*sql*
Consumer 基于 SQL 訂閱;
*broadcasting*
Consumer 是否是廣播消費模式。如果想讓所有的訂閱者都能接收到消息,可以使用廣播模式;
預設值: false.
*orderly*
Consumer 是否同步消費消息模式;
*delayLevelWhenNextConsume*
異步消費消息模式下消費失敗重試政策:
-1,不重複,直接放入死信隊列
0,broker 控制重試政策
>0,client 控制重試政策
預設值: 0.
*suspendCurrentQueueTimeMillis*
同步消費消息模式下消費失敗後再次消費的時間間隔;
預設值: 1000.
下面的這些配置是以 spring.cloud.stream.rocketmq.bindings..producer. 為字首的 RocketMQ Producer 相關的配置;
是否啟用 Producer;
*group*
Producer group name;
*maxMessageSize*
消息發送的最大位元組數;
預設值: 8249344.
*transactional*
是否發送事務消息;
*sync*
是否使用同步得方式發送消息;
*vipChannelEnabled*
是否在 Vip Channel 上發送消息;
*sendMessageTimeout*
發送消息的逾時時間(毫秒);
預設值: 3000.
*compressMessageBodyThreshold*
消息體壓縮閥值(當消息體超過 4k 的時候會被壓縮);
預設值: 4096.
*retryTimesWhenSendFailed*
在同步發送消息的模式下,消息發送失敗的重試次數;
預設值: 2.
*retryTimesWhenSendAsyncFailed*
在異步發送消息的模式下,消息發送失敗的重試次數;
*retryNextServer*
消息發送失敗的情況下是否重試其它的 broker;