Spring Cloud Stream 實作消息服務
-
- 說明
- 消息中間件
- 建立項目
- 依賴
- 生産者
- 消費者
- 測試
說明
-
- 項目運作的前提條件:服務注冊中心已經啟動。
- 本項目的服務注冊與發現是基于 Spring Cloud Consul 實作。具體内容部落客寫在另一篇部落格:
-
- Spring Cloud Consul 服務注冊中心
- 本部落格是 Spring Cloud 微服務化(精簡完整版)的一部分。
消息中間件
- 部落客選擇的是 Kafka。安裝教程在另外一篇部落格
-
- Kafka 安裝
建立項目
- 建立兩個項目:
-
-
- 消息生産者:awesome-message-producer-server
-
-
-
Spring Cloud Stream 實作消息服務 - 主要有四部分關鍵内容:
-
-
- 依賴
- 配置
- 生成消息接口類(核心内容):MessageSource.java
- 控制器類(核心内容):SmsController.java
-
-
- 消息消費者:awesome-message-consumer-server
-
-
-
Spring Cloud Stream 實作消息服務 - 主要有四部分關鍵内容
-
-
- 依賴
- 配置
- 監聽消息接口類(核心内容):MessageSink.java
- 消息消費者類(核心内容): SmsConsumer.java
-
-
-
依賴
-
- 兩個項目需要引入的依賴完全一樣。
-
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
生産者
-
- 消息生産者:awesome-message-producer-server
-
-
- application.yml
-
spring: cloud: stream: default-binder: kafka bindings: sms_message_producer: destination: sms-message
-
- 生成消息接口類:
-
-
package awesome.message.producer.server.source;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;/** * @author: Andy * @time: 2019/4/20 19:53 * @since */ public interface MessageSource { String SMS_MESSAGE_PRODUCER = "sms_message_producer"; @Output(SMS_MESSAGE_PRODUCER) MessageChannel smsMessage();}
-
- 控制器類:
-
-
package awesome.message.producer.server.controller;import awesome.message.producer.server.bean.SmsMessage;import awesome.message.producer.server.source.MessageSource;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.http.HttpStatus;import org.springframework.http.ResponseEntity;import org.springframework.integration.support.MessageBuilder;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RestController;/** * @author: Andy * @time: 2019/4/20 19:23 * @since */ @RestController@EnableBinding(MessageSource.class)public class SmsController { @Autowired private MessageSource messageSource; /** * 發送短信消息 * * * @param message * @return: {@link ResponseEntity<?> } * @author: Andy * @time: 2019/4/20 20:35 * @since */ @PostMapping("/api/message/sms_message") public ResponseEntity<?> sendSmsMessage(@RequestBody SmsMessage message){ try { messageSource.smsMessage().send(MessageBuilder.withPayload(message).build()); } catch (Exception e) { //todo: do something e.printStackTrace(); return new ResponseEntity<>("消息 SMS 發送失敗!", HttpStatus.INTERNAL_SERVER_ERROR); } return new ResponseEntity<>("消息 SMS 發送成功!", HttpStatus.OK); }}
-
消費者
-
- 消息消費者:awesome-message-consumer-server
-
-
- application.yml
-
spring: cloud: stream: default-binder: kafka bindings: sms_message_producer: destination: sms-message group: message-sever-consumer
-
- 監聽消息接口類:
-
-
package awesome.message.consumer.server.sink;import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface MessageSink { String SMS_MESSAGE_CONSUMER = "sms_message_consumer"; @Input(SMS_MESSAGE_CONSUMER) SubscribableChannel smsMessages();}
-
- 消息消費者類:
-
-
package awesome.message.consumer.server.consumer;import awesome.message.consumer.server.bean.SmsMessage;import awesome.message.consumer.server.sink.MessageSink;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.messaging.Message;/** * @author: Andy * @time: 2019/4/20 20:39 * @since */ @EnableBinding(MessageSink.class)public class SmsConsumer { @StreamListener(MessageSink.SMS_MESSAGE_CONSUMER) public void consumeSmsMessage(Message<SmsMessage> message){ System.out.println(String.format("Consumes message:[%s] ", message.getPayload().getContent())); }}
-
測試
-
- 生産者生成消息:
- 消費者消費消息:
-
Spring Cloud Stream 實作消息服務