文章目錄
-
- (十五)SpringCloud Stream消息驅動
-
- 1、消息驅動概述
-
- 1.1 什麼是SpringCloudStream
- 1.2 設計思想
-
- 1.2.1 标準的MQ
- 1.2.2 為什麼用Cloud Stream
- 1.2.3 stream憑什麼可以統一底層差異
- 1.2.4 Binder
- 1.2.5 Stream中的消息通信方式遵循了釋出-訂閱模式
- 1.3 Spring Cloud Stream标準流程套路
- 1.4 編碼API和常用注解
- 2、案例說明
- 3、消息驅動之生産者
-
- 3.1 建立model——cloud-stream-rabbitmq-provider8801
- 3.2 POM
- 3.3 YML
- 3.4 業務類
-
- 3.4.1 發送消息接口
- 3.4.2 發送消息接口的實作類
- 3.4.3 Conteoller
- 3.5 主啟動類
- 3.6 測試
- 4、消息驅動之消費者
-
- 4.1 建立model——cloud-stream-rabbitmq-comsumer8802
- 4.2 POM
- 4.3 YML
- 4.4 業務類
- 4.5 主啟動類
- 4.6 測試
- 5、分組消費與持久化
-
- 5.1 依照8802,clone出來一份運作8803
- 5.2 啟動
- 5.3 運作後兩個問題
- 5.4 解決重複消費
- 5.5 分組
-
- 5.5.1 原理:
- 5.5.2 檢視目前的group:
- 5.5.3 将8802/8803都變成不同組,group兩個不同
- 5.5.4 結論:
- 5.6 解決消息持久化
(十五)SpringCloud Stream消息驅動
1、消息驅動概述
1.1 什麼是SpringCloudStream
官方定義Spring Cloud Stream是一個建構消息驅動微服務的架構。
應用程式通過inputs或者 outputs來與Spring Cloud Stream中binder對象互動。
通過我們配置來binding(綁定),而Spring Cloud Stream 的binder對象負責與消息中間件互動。
是以,我們隻需要搞清楚如何與Spring Cloud Stream互動就可以友善使用消息驅動的方式。
通過使用Spring Integration來連接配接消息代理中間件以實作消息事件驅動。
Spring Cloud Stream為一些供應商的消息中間件産品提供了個性化的自動化配置實作,引用了釋出-訂閱、消費組、分區的三個核心概念。
目前僅支援RabbitMQ、Kafka。
一句話就是:屏蔽底層消息中間件的差異,降低切換版本,統一消息的程式設計模型
官網
-
https://spring.io/projects/spring-cloud-stream#overview
-
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/
- Spring Cloud Stream中文指導手冊:
-
https://m.wang1314.com/doc/webapp/topic/20971999.html
-
1.2 設計思想
1.2.1 标準的MQ
- 生産者/消費者之間靠消息媒介傳遞資訊内容——Message
- 消息必須走特定的通道——消息通道MessageChannel
- 消息通道裡的消息如何被消費呢,誰負責收發處理——消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器訂閱
1.2.2 為什麼用Cloud Stream
比方說我們用到了RabbitMQ和Kafka,由于這兩個消息中間件的架構上的不同,
像RabbitMQ有exchange,kafka有Topic和Partitions分區,
這些中間件的差異性導緻我們實際項目開發給我們造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,後面的業務需求,我想往另外一種消息隊列進行遷移,這時候無疑就是一個災難性的, 一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候springcloud Stream給我們提供了一種解耦合的方式。
1.2.3 stream憑什麼可以統一底層差異
在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行資訊互動的時候,由于各消息中間件建構的初衷不同,它們的實作細節上會有較大的差異性。
通過定義綁定器作為中間層,完美地實作了應用程式與消息中間件細節之間的隔離。
通過向應用程式暴露統一的Channel通道, 使得應用程式不需要再考慮各種不同的消息中間件實作。
通過
定義綁定器Binder
作為中間層,實作了應用程式與消息中間件細節之間的隔離。
1.2.4 Binder
Stream對消息中間件的進一 步封裝,可以做到代碼層面對中間件的無感覺,甚至于動态的切換中間件(rabbitmq切換為kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程
通過定義綁定器Binder作為中間層,實作了應用程式與消息中間件細節之間的隔離。
INPUT:消費者
OUTPUT:生産者
1.2.5 Stream中的消息通信方式遵循了釋出-訂閱模式
Topic主題進行廣播
在RabbitMQ就是Exchange
在kafka中就是Topic
1.3 Spring Cloud Stream标準流程套路
- Binder:連接配接中間件、屏蔽差異
- Channel:通道,是隊列Queue的一種抽象,在消息通訊系統中就是實作存儲和轉發的媒介,通過對Channel對隊列進行配置
- Source和Sink:簡單的可了解為參照對象是Spring Cloud Stream自身,從Stream釋出消息就是輸出,接受消息就是輸入
1.4 編碼API和常用注解
2、案例說明
RabbitMQ環境已經OK
工程中建立三個子子產品:
- cloud-stream-rabbitmq-provider8801,作為生産者進行發消息子產品
- cloud-stream-rabbitmq-consumer8802,作為消息接收子產品
- cloud-stream-rabbitmq-consumer8803,作為消息接收子產品
3、消息驅動之生産者
3.1 建立model——cloud-stream-rabbitmq-provider8801
3.2 POM
新引進的pom
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
<dependencies>
<!--stream-rabbit-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.atguigu.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3.3 YML
server:
port: 8801
spring:
application:
name: cloud-stream-provider #微服務名稱
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息元件類型
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定消息類型,本次為json,文本則設定“text/plain”
binder: defaultRabbit # 設定要綁定的消息服務的具體設定
eureka:
client: # 用戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
instance-id: send-8801.com # 在資訊清單時顯示主機名稱
prefer-ip-address: true # 通路的路徑變為IP位址
3.4 業務類
3.4.1 發送消息接口
package com.atguigu.springcloud.service;
//消息發送者的接口
public interface IMessageProvider {
public String send();
}
3.4.2 發送消息接口的實作類
新知識:
-
//定義消息的推送廣告@EnableBinding(Source.class)
-
;//消息發送管道private MessageChannel output
-
MessageBuilder
package com.atguigu.springcloud.service.impl;
......
@EnableBinding(Source.class) //定義消息的推送廣告
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output;//消息發送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*******serial: "+serial);
return null;
}
}
3.4.3 Conteoller
package com.atguigu.springcloud.controller;
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class MessageController {
@Resource
private IMessageProvider messageProvider;
@GetMapping("/sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}
3.5 主啟動類
package com.atguigu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}
3.6 測試
- 啟動7001eureka
- 啟動rabbitmq
-
http://localhost:15672/
-
- 啟動8801
- 通路
http://localhost:8801/sendMessage
背景列印:
RabbitMQ顯示:
4、消息驅動之消費者
4.1 建立model——cloud-stream-rabbitmq-comsumer8802
4.2 POM
跟生産者一樣
4.3 YML
server:
port: 8802
spring:
application:
name: cloud-stream-comsumer #微服務名稱
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息元件類型
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定消息類型,本次為json,文本則設定“text/plain”
binder: defaultRabbit # 設定要綁定的消息服務的具體設定
eureka:
client: # 用戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
instance-id: receive-8802.com # 在資訊清單時顯示主機名稱
prefer-ip-address: true # 通路的路徑變為IP位址
4.4 業務類
controller
package com.atguigu.springcloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消費者1号,------接收到的消息為: "+message.getPayload()+"\t port: "+serverPort);
}
}
4.5 主啟動類
package com.atguigu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class,args);
}
}
4.6 測試
啟動:7001 、 8801 、 8802
通路:
http://localhost:8801/sendMessage
結果:
- 生産者8801:
- 消費者8802
- RabbitMQ:
5、分組消費與持久化
5.1 依照8802,clone出來一份運作8803
cloud-stream-rabbitmq-consumer8803
5.2 啟動
- RabbitMQ
- 7001 服務注冊
- 8801 消息生産
- 8802 消息消費
- 8803 消息消費
通路兩次8801:
http://localhost:8801/sendMessage
8801:
8802:
8803:
5.3 運作後兩個問題
- 重複消費
- 消息持久化
5.4 解決重複消費
目前是8802/8803同時都收到了,存在重複消費問題
如何解決:
分組和持久化屬性
group=
生産實際案例:
比如在如下場景中,訂單系統我們做叢集部署,都會從RabbitMQ中擷取訂單資訊,
那如果一個訂單同時被兩個服務擷取到,那麼就會造成資料錯誤,我們得避免這種情況。
這時我們就可以使用Stream中的
消息分組
來解決
注意在Stream中處于同一個group中的多個消費者是競争關系,就能夠保證消息隻會被其中一個應用消費一次。
不同組是可以全面消費的(重複消費)
同—組内會發生竟争關系,隻有其中一個可以消費。
首先來了解一下分組
5.5 分組
5.5.1 原理:
微服務應用放置于同一個group中,就能夠保證消息隻會被其中一個應用消費一次。不同的組是可以消費的,同一個組内會發生競争關系,隻有其中一個可以消費。
5.5.2 檢視目前的group:
首先看一下,目前的group:
兩個消費者的group:不一樣
是以屬于不同的組,就會重複消費
5.5.3 将8802/8803都變成不同組,group兩個不同
group: atguiguA、atguiguB
修改YML:
8802:group: atguiguA
8803:group: atguiguB
5.5.4 結論:
還是重複消費
解決:
8802/8803實作了輪詢分組,每次隻有一個消費者 8801子產品的發的消息隻能被8802或8803其中一個接收到,這樣避免了重複消費
8802/8803都變成相同組,group兩個相同
都改成
group:atguiguA
測試:8801通路2次
檢視8802:
檢視8803:
同一個組的多個微服務執行個體,每次隻會有一個拿到
是以解決了重複消費
5.6 解決消息持久化
通過上述,解決了重複消費問題,再看看持久化
停止8802/8803并去除掉8802的分組group:atguiguA,8803的分組group:atguiguA沒有去掉
8801先發送4條資訊到rabbitmq
先啟動8802,無分組屬性配置,背景沒有打出來消息
然後啟動8803,有分組屬性配置,背景打出來了MQ上的消息
總結:要想在服務停止後也能接收到生産者發來的消息,就配置group屬性