天天看點

《springcloud 五》springcloud stream

什麼是消息驅動?

SpringCloud Stream消息驅動可以簡化開發人員對消息中間件的使用複雜度,讓系統開發人員更多盡力專注與核心業務邏輯的開發。SpringCloud Stream基于SpringBoot實作,自動配置化的功能可以幫助我們快速上手學習,類似與我們之前學習的orm架構,可以平滑的切換多種不同的資料庫。

目前SpringCloud Stream 目前隻支援 rabbitMQ和kafka

消息驅動原理

綁定器

通過定義綁定器作為中間層,實作了應用程式與消息中間件細節之間的隔離。通過向應用程式暴露統一的Channel通過,是的應用程式不需要再考慮各種不同的消息中間件的實作。當需要更新消息中間件,或者是更換其他消息中間件産品時,我們需要做的就是更換對應的Binder綁定器而不需要修改任何應用邏輯 。

《springcloud 五》springcloud stream

在該模型圖上有如下幾個核心概念:

  • Source: 當需要發送消息時,我們就需要通過Source,Source将會把我們所要發送的消息(POJO對象)進行序列化(預設轉換成JSON格式字元串),然後将這些資料發送到Channel中;
  • Sink: 當我們需要監聽消息時就需要通過Sink來,Sink負責從消息通道中擷取消息,并将消息反序列化成消息對象(POJO對象),然後交給具體的消息監聽處理進行業務處理;
  • Channel: 消息通道是Stream的抽象之一。通常我們向消息中間件發送消息或者監聽消息時需要指定主題(Topic)/消息隊列名稱,但這樣一旦我們需要變更主題名稱的時候需要修改消息發送或者消息監聽的代碼,但是通過Channel抽象,我們的業務代碼隻需要對Channel就可以了,具體這個Channel對應的是那個主題,就可以在配置檔案中來指定,這樣當主題變更的時候我們就不用對代碼做任何修改,進而實作了與具體消息中間件的解耦;
  • Binder: Stream中另外一個抽象層。通過不同的Binder可以實作與不同消息中間件的整合,比如上面的示例我們所使用的就是針對Kafka的Binder,通過Binder提供統一的消息收發接口,進而使得我們可以根據實際需要部署不同的消息中間件,或者根據實際生産中所部署的消息中間件來調整我們的配置。

消息驅動環境搭建

生産者環境

Maven依賴資訊

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>
        <!-- SpringBoot整合Web元件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
    </dependencies>      

application.yml資訊

server:
  port: 9000
spring:
  application:
    name: spingcloud-stream-producer
#  rabbitmq:
#    host: 192.168.174.128
#    port: 5672
#    username: guest
#    password: guest      

建立管道

// 建立管道接口
public interface SendMessageInterface {

    // 建立一個輸出管道,用于發送消息
    @Output("my_msg")
    SubscribableChannel sendMsg();

}      

發送消息

@RestController
public class SendMsgController {
    @Autowired
    private SendMessageInterface sendMessageInterface;

    @RequestMapping("/sendMsg")
    public String sendMsg() {
        String msg = UUID.randomUUID().toString();
        System.out.println("生産者發送内容msg:" + msg);
        Message build = MessageBuilder.withPayload(msg.getBytes()).build();
        sendMessageInterface.sendMsg().send(build);
        return "success";
    }

}      

啟動服務

@SpringBootApplication
@EnableBinding(SendMessageInterface.class) // 開啟綁定
public class AppProducer {

    public static void main(String[] args) {
        SpringApplication.run(AppProducer.class, args);
    }

}      

消費者環境

Maven

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>
        <!-- SpringBoot整合Web元件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
    </dependencies>      

application.yml

server:
  port: 9000
spring:
  application:
    name: spingcloud-stream-consumer
#  rabbitmq:
#    host: 192.168.174.128
#    port: 5672
#    username: guest
#    password: guest      

管道中綁定消息

public interface RedMsgInterface {

    // 從管道中擷取消息
    @Input("my_msg")
    SubscribableChannel redMsg();
}      

消費者擷取消息

@Component
public class Consumer {

    @StreamListener("my_msg")
    public void listener(String msg) {
        System.out.println("消費者擷取生産消息:" + msg);
    }

}      

啟動消費者

@SpringBootApplication
@EnableBinding(RedMsgInterface.class)
public class AppConsumer {

    public static void main(String[] args) {
        SpringApplication.run(AppConsumer.class, args);
    }

}      

消費組

server:
  port: 8001
spring:
  application:
    name: spring-cloud-stream
#  rabbitmq:
#    host: 192.168.174.128
#    port: 5672
#    username: guest
#    password: guest
  cloud:
    stream:
      bindings:
        mymsg: ###指定 管道名稱
          #指定該應用執行個體屬于 stream 消費組
          group: stream      

修改消費者

@Component
public class Consumer {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener("my_msg")
    public void listener(String msg) {
        System.out.println("消費者擷取生産消息:" + msg + ",端口号:" + serverPort);
    }

}      

更改環境為kafka

Maven依賴

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>      

生産者配置

server:
  port: 9000
spring:
  cloud:
    stream:
      # 設定成使用kafka
      kafka:
        binder:
          # Kafka的服務端清單,預設localhost
          brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092
          # Kafka服務端連接配接的ZooKeeper節點清單,預設localhost
          zkNodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181
          minPartitionCount: 1
          autoCreateTopics: true
          autoAddPartitions: true      

消費者配置

server:
  port: 8000
spring:
  application:
    name: springcloud_kafka_consumer
  cloud:
     instance-count: 1
     instance-index: 0
     stream:
        kafka:
          binder:
            brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092
            zk-nodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181
            auto-add-partitions: true
            auto-create-topics: true
            min-partition-count: 1
        bindings:
          input:
            destination: my_msg
            group: s1
            consumer:
              autoCommitOffset: false
              concurrency: 1
              partitioned: false      

繼續閱讀