什麼是消息驅動?
SpringCloud Stream消息驅動可以簡化開發人員對消息中間件的使用複雜度,讓系統開發人員更多盡力專注與核心業務邏輯的開發。SpringCloud Stream基于SpringBoot實作,自動配置化的功能可以幫助我們快速上手學習,類似與我們之前學習的orm架構,可以平滑的切換多種不同的資料庫。
目前SpringCloud Stream 目前隻支援 rabbitMQ和kafka
消息驅動原理
綁定器
通過定義綁定器作為中間層,實作了應用程式與消息中間件細節之間的隔離。通過向應用程式暴露統一的Channel通過,是的應用程式不需要再考慮各種不同的消息中間件的實作。當需要更新消息中間件,或者是更換其他消息中間件産品時,我們需要做的就是更換對應的Binder綁定器而不需要修改任何應用邏輯 。

在該模型圖上有如下幾個核心概念:
- Source: 當需要發送消息時,我們就需要通過Source,Source将會把我們所要發送的消息(POJO對象)進行序列化(預設轉換成JSON格式字元串),然後将這些資料發送到Channel中;
- Sink: 當我們需要監聽消息時就需要通過Sink來,Sink負責從消息通道中擷取消息,并将消息反序列化成消息對象(POJO對象),然後交給具體的消息監聽處理進行業務處理;
- Channel: 消息通道是Stream的抽象之一。通常我們向消息中間件發送消息或者監聽消息時需要指定主題(Topic)/消息隊列名稱,但這樣一旦我們需要變更主題名稱的時候需要修改消息發送或者消息監聽的代碼,但是通過Channel抽象,我們的業務代碼隻需要對Channel就可以了,具體這個Channel對應的是那個主題,就可以在配置檔案中來指定,這樣當主題變更的時候我們就不用對代碼做任何修改,進而實作了與具體消息中間件的解耦;
- Binder: Stream中另外一個抽象層。通過不同的Binder可以實作與不同消息中間件的整合,比如上面的示例我們所使用的就是針對Kafka的Binder,通過Binder提供統一的消息收發接口,進而使得我們可以根據實際需要部署不同的消息中間件,或者根據實際生産中所部署的消息中間件來調整我們的配置。
消息驅動環境搭建
生産者環境
Maven依賴資訊
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<!-- SpringBoot整合Web元件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
</dependencies>
application.yml資訊
server:
port: 9000
spring:
application:
name: spingcloud-stream-producer
# rabbitmq:
# host: 192.168.174.128
# port: 5672
# username: guest
# password: guest
建立管道
// 建立管道接口
public interface SendMessageInterface {
// 建立一個輸出管道,用于發送消息
@Output("my_msg")
SubscribableChannel sendMsg();
}
發送消息
@RestController
public class SendMsgController {
@Autowired
private SendMessageInterface sendMessageInterface;
@RequestMapping("/sendMsg")
public String sendMsg() {
String msg = UUID.randomUUID().toString();
System.out.println("生産者發送内容msg:" + msg);
Message build = MessageBuilder.withPayload(msg.getBytes()).build();
sendMessageInterface.sendMsg().send(build);
return "success";
}
}
啟動服務
@SpringBootApplication
@EnableBinding(SendMessageInterface.class) // 開啟綁定
public class AppProducer {
public static void main(String[] args) {
SpringApplication.run(AppProducer.class, args);
}
}
消費者環境
Maven
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<!-- SpringBoot整合Web元件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
</dependencies>
application.yml
server:
port: 9000
spring:
application:
name: spingcloud-stream-consumer
# rabbitmq:
# host: 192.168.174.128
# port: 5672
# username: guest
# password: guest
管道中綁定消息
public interface RedMsgInterface {
// 從管道中擷取消息
@Input("my_msg")
SubscribableChannel redMsg();
}
消費者擷取消息
@Component
public class Consumer {
@StreamListener("my_msg")
public void listener(String msg) {
System.out.println("消費者擷取生産消息:" + msg);
}
}
啟動消費者
@SpringBootApplication
@EnableBinding(RedMsgInterface.class)
public class AppConsumer {
public static void main(String[] args) {
SpringApplication.run(AppConsumer.class, args);
}
}
消費組
server:
port: 8001
spring:
application:
name: spring-cloud-stream
# rabbitmq:
# host: 192.168.174.128
# port: 5672
# username: guest
# password: guest
cloud:
stream:
bindings:
mymsg: ###指定 管道名稱
#指定該應用執行個體屬于 stream 消費組
group: stream
修改消費者
@Component
public class Consumer {
@Value("${server.port}")
private String serverPort;
@StreamListener("my_msg")
public void listener(String msg) {
System.out.println("消費者擷取生産消息:" + msg + ",端口号:" + serverPort);
}
}
更改環境為kafka
Maven依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
生産者配置
server:
port: 9000
spring:
cloud:
stream:
# 設定成使用kafka
kafka:
binder:
# Kafka的服務端清單,預設localhost
brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092
# Kafka服務端連接配接的ZooKeeper節點清單,預設localhost
zkNodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181
minPartitionCount: 1
autoCreateTopics: true
autoAddPartitions: true
消費者配置
server:
port: 8000
spring:
application:
name: springcloud_kafka_consumer
cloud:
instance-count: 1
instance-index: 0
stream:
kafka:
binder:
brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092
zk-nodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181
auto-add-partitions: true
auto-create-topics: true
min-partition-count: 1
bindings:
input:
destination: my_msg
group: s1
consumer:
autoCommitOffset: false
concurrency: 1
partitioned: false