SpringCloud Stream消息驅動

簡介
什麼是SpringCloudStream
- 官方定義 SpringCloud Stream 是一個建構消息驅動微服務對架構。
- 應用程式通過 inputs 或者 outputs 來 與Springcloud Stream 中 binder 對象互動 通過我們配置來 binding(綁定),而 SpringCloud Stream 的 binder 對象負責與消息中間件互動,是以我們隻需要搞清楚如何與 Spring Coud Stream 互動就可以友善使用消息驅動的方式。
- 通過使用SpringIntegration 來連接配接消息代理中間件實作消息事件驅動。
- Spring Cloud Stream 為一些供應商的消息中間件産品提供來個性化的自動化配置實作,引用來釋出-訂閱、消費組、分區的三個核心概念。
- ==目前值支援 RabbitMQ、Kafka==
SpringCloud Stream消息驅動 - 官網 https://spring.io/projects/spring-cloud-stream#overview
Spring Cloud Stream 是用于建構與消息傳遞系統的高度可伸縮的事件驅動微服務架構,該架構提供來一個靈活的程式設計模型,它履歷在已經建立和熟悉的Spring 熟語和最佳實踐上,包括支援持久化的釋出訂閱、消費組以及消息分區這三個核心概念
API:
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/中文指導手冊:
https://m.wang1314.com/doc/webapp/topic/20971999.html設計思想
标準MQ
為什麼用Cloud Stream
- 比方說我們用到了RabbitMQ 和 Kafka ,由于這兩個消息中間件的架構上的不同,像RabbitMQ 有 exchange,Kafka有 Topic 和 Partitions 分區,
- 這些中間件的差異性導緻我們實際項目開發給我們造成一定困擾,我們如果用了兩個消息隊列的其中一種,後面業務需求,我們想往另一個消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都需要推到重新做,因為它跟我們系統耦合度很高,這時候SpringCloud Stream 給我們提供了一種解耦合的方式
SpringCloud Stream消息驅動
stream憑什麼可以統一底層差異
Biinder
- INPUT 對應于消費者
- OUTPUT 對應于生産者
- 在沒有綁定器這個概念的情況下,我們springBoot 應用要直接與消息中間件進行互動的時候,由于消息中間件構造不同,他們的實作細節上會有較大的差異性,通過定義綁定器作為中間層,完美地實作了應用與消息中間件細節之間的隔離。通過向應用程式暴露統一的Channel 通道,使得應用程式不需要再考慮各種不同的消息中間件實作
- 通過綁定器Binder作為中間層,實作了應用程式與消息中間件的隔離
- 通過定義綁定器Binder 作為中間層,實作了應用程式與消息中間件之間細節的隔離。
- Stream 中的消息通信方式遵循了釋出訂閱 Topic主題進行廣播
Spring Cloud Stream标準流程套路
編碼API和常用注解
案例:
消息驅動生産者
- 建立子產品 cloud-stream-rabbitmq-provider8801
- pom
<dependencies>
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--監控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--熱部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #在此處配置要綁定的rabbitmq的服務資訊
defaultRabbit: #表示定義的名稱,用于binding整合
type: rabbit #消息元件類型
environment: #設定rabbitmq的相關環境配置
spring:
rabbitmq:
host: ip端口号 #RabbitMQ在本機的用localhost,在伺服器的用伺服器的ip位址
port: 5672
username: guest
password: guest
bindings: #服務的整合處理
output: #這個名字是一個通道的名稱
destination: studyExchange #表示要使用的Exchange名稱定義
content-type: application/json #設定消息類型,本次為json
binder: defaultRabbit #設定要綁定的消息服務的具體設定(爆紅不影響使用,位置沒錯)
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #設定心跳的時間間隔(預設是30S)
lease-expiration-duration-in-seconds: 5 #如果超過5S間隔就登出節點 預設是90s
instance-id: send-8801.com #在資訊清單時顯示主機名稱
prefer-ip-address: true #通路的路徑變為IP位址
啟動類
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}
- 建立業務類
建立service包 --- IMessageProvider接口 與實作類
public interface IMessageProvider {
public String send();
}
@EnableBinding(Source.class) //定義消息的推送管道(Source是spring的)
public class IMessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; //消息發送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build()); //MessageBuilder是spring的integration.support.MessageBuilder
System.out.println("*******serial: " + serial);
return null;
}
}
@RestController
public class SendMessageController {
@Resource
private IMessageProvider iMessageProvider;
@GetMapping("/sendMessage")
public String sendMessage(){
return iMessageProvider.send();
}
}
消費者
- 建立子產品cloud-stream-rabbitmq-consumer8802
<dependencies>
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--監控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--熱部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
server:
port: 8802
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #在此處配置要綁定的rabbitmq的服務資訊
defaultRabbit: #表示定義的名稱,用于binding整合
type: rabbit #消息元件類型
environment: #設定rabbitmq的相關環境配置
spring:
rabbitmq:
host: ip端口 #RabbitMQ在本機的用localhost,在伺服器的用伺服器的ip位址
port: 5672
username: 使用者名
password: 密碼
bindings: #服務的整合處理
input: #這個名字是一個通道的名稱
destination: studyExchange #表示要使用的Exchange名稱定義
content-type: application/json #設定消息類型,本次為json,本文要設定為“text/plain”
binder: defaultRabbit #設定要綁定的消息服務的具體設定(爆紅不影響使用,位置沒錯)
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #設定心跳的時間間隔(預設是30S)
lease-expiration-duration-in-seconds: 5 #如果超過5S間隔就登出節點 預設是90s
instance-id: receive-8802.com #在資訊清單時顯示主機名稱
prefer-ip-address: true #通路的路徑變為IP位址
- 主啟動類
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class, args);
}
}
- 建立 Controller
@EnableBinding(Sink.class)
@Controller
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT) //監聽
public void input(Message<String> message){
System.out.println("消費者1号------>收到的消息:" + message.getPayload() + "\t port:" + serverPort);
}
}
啟動Eureka 消息驅動生産者,消費者
- http://localhost:8801/sendMessage (8801發送消息)
SpringCloud Stream消息驅動 - 8802接收到消息:
SpringCloud Stream消息驅動
- 個人部落格: http://blog.yanxiaolong.cn/ .