天天看點

Spring Cloud Stream 實作消息服務

Spring Cloud Stream 實作消息服務

  • ​​說明​​
  • ​​消息中間件​​
  • ​​建立項目​​
  • ​​依賴​​
  • ​​生産者​​
  • ​​消費者​​
  • ​​測試​​

說明

  • 項目運作的前提條件:服務注冊中心已經啟動。
  • 本項目的服務注冊與發現是基于 Spring Cloud Consul 實作。具體内容部落客寫在另一篇部落格:
    • ​​Spring Cloud Consul 服務注冊中心​​
  • 本部落格是 ​​Spring Cloud 微服務化(精簡完整版)​​的一部分。

消息中間件

  • 部落客選擇的是 Kafka。安裝教程在另外一篇部落格
    • ​​Kafka 安裝​​

建立項目

  • 建立兩個項目:
    • 消息生産者:awesome-message-producer-server
      • Spring Cloud Stream 實作消息服務
      • 主要有四部分關鍵内容:
        • 依賴
        • 配置
        • 生成消息接口類(核心内容):MessageSource.java
        • 控制器類(核心内容):SmsController.java
    • 消息消費者:awesome-message-consumer-server
      • Spring Cloud Stream 實作消息服務
      • 主要有四部分關鍵内容
        • 依賴
        • 配置
        • 監聽消息接口類(核心内容):MessageSink.java
        • 消息消費者類(核心内容): SmsConsumer.java

依賴

  • 兩個項目需要引入的依賴完全一樣。
  • ​<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> ​

生産者

  • 消息生産者:awesome-message-producer-server
    • application.yml
    • ​spring: cloud: stream: default-binder: kafka bindings: sms_message_producer: destination: sms-message ​

  • 生成消息接口類:
    • ​package awesome.message.producer.server.source;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;/** * @author: Andy * @time: 2019/4/20 19:53 * @since */ public interface MessageSource { String SMS_MESSAGE_PRODUCER = "sms_message_producer"; @Output(SMS_MESSAGE_PRODUCER) MessageChannel smsMessage();}​

  • 控制器類:
    • ​package awesome.message.producer.server.controller;import awesome.message.producer.server.bean.SmsMessage;import awesome.message.producer.server.source.MessageSource;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.http.HttpStatus;import org.springframework.http.ResponseEntity;import org.springframework.integration.support.MessageBuilder;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RestController;/** * @author: Andy * @time: 2019/4/20 19:23 * @since */ @RestController@EnableBinding(MessageSource.class)public class SmsController { @Autowired private MessageSource messageSource; /** * 發送短信消息 * * * @param message * @return: {@link ResponseEntity<?> } * @author: Andy * @time: 2019/4/20 20:35 * @since */ @PostMapping("/api/message/sms_message") public ResponseEntity<?> sendSmsMessage(@RequestBody SmsMessage message){ try { messageSource.smsMessage().send(MessageBuilder.withPayload(message).build()); } catch (Exception e) { //todo: do something e.printStackTrace(); return new ResponseEntity<>("消息 SMS 發送失敗!", HttpStatus.INTERNAL_SERVER_ERROR); } return new ResponseEntity<>("消息 SMS 發送成功!", HttpStatus.OK); }}​

消費者

  • 消息消費者:awesome-message-consumer-server
    • application.yml
    • ​spring: cloud: stream: default-binder: kafka bindings: sms_message_producer: destination: sms-message group: message-sever-consumer ​

  • 監聽消息接口類:
    • ​package awesome.message.consumer.server.sink;import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface MessageSink { String SMS_MESSAGE_CONSUMER = "sms_message_consumer"; @Input(SMS_MESSAGE_CONSUMER) SubscribableChannel smsMessages();}​

  • 消息消費者類:
    • ​package awesome.message.consumer.server.consumer;import awesome.message.consumer.server.bean.SmsMessage;import awesome.message.consumer.server.sink.MessageSink;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.messaging.Message;/** * @author: Andy * @time: 2019/4/20 20:39 * @since */ @EnableBinding(MessageSink.class)public class SmsConsumer { @StreamListener(MessageSink.SMS_MESSAGE_CONSUMER) public void consumeSmsMessage(Message<SmsMessage> message){ System.out.println(String.format("Consumes message:[%s] ", message.getPayload().getContent())); }}​

測試

  • 生産者生成消息:
  • 消費者消費消息:
  • Spring Cloud Stream 實作消息服務