Spring Cloud Stream是一個建構消息驅動微服務的架構,抽象了MQ的使用方式, 提供統一的API操作。Spring Cloud Stream通過 Binder(綁定器) inputs/outputs
、
channel完成應用程式和MQ的解耦。Spring Cloud Stream的模型如下圖:

- Binder
負責綁定應用程式和MQ中間件,即指定應用程式是和KafKa互動還是和RabbitMQ互動或者和其他的MQ中間件互動
- inputs/outputs channel
inputs/outputs channel抽象釋出訂閱消息的方式,即無論是什麼類型的MQ應用程式都通過統一的方式釋出訂閱消息
Spring Cloud Stream主要配置
-
binder
綁定MQ中間件及配置
-
bindings
管理所有的Topic
-
指定釋出訂閱的Topicdes1tination
-
指定釋出訂閱消息的格式contentType
-
指定消費者組,(一條消息隻能被一組消息者中的一個消息者消費)group
示例
高版本的Spring Cloud Stream提供兩種使用方式,一種是使用yml配置的方式綁定
生産/消費
者,另一種是通過
Function
的方式綁定
生産/消費
者。以下代碼為使用
Function
生産/消費
者
- 引入依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <version>3.1.0</version> </dependency>
- 配置yml
spring: cloud: stream: kafka: binder: auto-create-topics: true # 自動建立topics brokers: ***.****.***.***:9092 bindings: logP-out-0: # 對用在ProducersConfig中的生産函數logP destination: log # logP将資料發送的topic contentType: application/json logC-in-0: # 對用在ConsumersConfig中的生産函數logC destination: log group: log_group addAgeC-in-0: destination: addAge group: addAge_group function: definition: logP;logC;addAgeC # 指定對應的函數為Spring Cloud Stream中的生産消費通道
-
編寫生産者
方式1
方式2@Configuration public class ProducersConfig { private BlockingQueue<Person> unbounded = new LinkedBlockingQueue<>(); /** * 對應yml中配置的logP-out-0通道,即topic log * @return java.util.function.Supplier<com.example.kafka.entity.Person> * @Date 2020-12-27 **/ @Bean public Supplier<Person> logP(){ return () -> unbounded.poll(); } /** * 調用本方法向log topic發送消息 * * @param person: * @return void * @Date 2020-12-27 **/ public void log(Person person){ unbounded.offer(person); } }
@RestController public class UserController { @Autowired private StreamBridge streamBridge; @PostMapping("/addAge") public boolean addAge(@RequestBody Person person){ person.setAge(RandomUtil.randomInt(10, 90)); person.setSuccess(RandomUtil.randomBoolean()); person.setBirthday(new Date()); // 通過streamBridge直接對應的topic發送消息 return streamBridge.send("addAge", person); } }
- 編寫消費者
@Configuration public class ConsumersConfig { /** * 對應yml中配置的logC-in-0通道,即topic log。 * 消費topic log中的消息 * * @return java.util.function.Consumer<com.example.kafka.entity.Person> * @Date 2020-12-27 **/ @Bean public Consumer<Person> logC() { return person -> { System.out.println("Received: " + person); }; } /** * 對應yml中配置的addAgeC-in-0通道,即topic addAge。 * 消費topic addAge中的消息 * * @return java.util.function.Consumer<com.example.kafka.entity.Person> * @Date 2020-12-27 **/ @Bean public Consumer<Person> addAgeC(){ return person -> { person.setAge(person.getAge() + 10); System.out.println("Consumer addAge: " + person.toString()); }; } }
- 發送消息
@RestController public class UserController { @Autowired private StreamBridge streamBridge; @Autowired private ProducersConfig producersConfig; @PostMapping("/log") public void log(@RequestBody Person person){ producersConfig.log(person); } @PostMapping("/addAge") public boolean addAge(@RequestBody Person person){ person.setAge(RandomUtil.randomInt(10, 90)); person.setSuccess(RandomUtil.randomBoolean()); person.setBirthday(new Date()); System.out.println("Producer addAge: " + person.toString()); return streamBridge.send("addAge", person); } }