天天看點

官方文檔中文版!Spring Cloud Stream 快速入門

本文内容翻譯自官方文檔, spring-cloud-stream docs ,對 Spring Cloud Stream的應用入門介紹。

一、Spring Cloud Stream 簡介

官方定義 Spring Cloud Stream 是一個建構消息驅動微服務的架構。

官方文檔中文版!Spring Cloud Stream 快速入門

Spring Cloud Stream建構在SpringBoot之上,提供了Kafka,RabbitMQ等消息中間件的個性化配置,引入了釋出訂閱、消費組和分區的語義概念,有效的簡化了上層研發人員對MQ使用的複雜度,讓開發人員更多的精力投入到核心業務的處理。

在實際開發過程中,服務與服務之間通信經常會使用到消息中間件,而以往使用了哪個中間件比如RabbitMQ,那麼該中間件和系統的耦合性就會非常高,如果我們要替換為Kafka那麼變動會比較大,使用Spring Cloud Stream來整合我們的消息中間件,可以降低系統和中間件的耦合性。

二、Spring Cloud Stream 解決什麼問題

無感覺的使用消息中間件

Stream解決了開發人員無感覺的使用消息中間件的問題,因為Stream對消息中間件的進一步封裝,可以做到代碼層面對中間件的無感覺。

中間件和服務的高度解耦

Spring Cloud Stream進行了配置隔離,隻需要調整配置,開發中可以動态的切換中間件(如rabbitmq切換為kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程。

關注公衆号:架構進化論,獲得第一手的技術資訊和原創文章

三、核心概念和應用模型

主要概念

Spring Cloud Stream 為各大消息中間件産品提供了個性化的自動化配置實作,引用了釋出-訂閱、消費組、分區的三個核心概念。

Spring Cloud Stream提供了很多抽象和基礎元件來簡化消息驅動型微服務應用。包含以下内容:

  • Spring Cloud Stream的應用模型
  • 綁定抽象
  • 持久化釋出/訂閱支援
  • 消費者組支援
  • 分片支援(Partitioning Support)
  • 可插拔API

應用模型

Spring Cloud Stream由一個中立的中間件核心組成。Spring Cloud Stream會注入輸入和輸出的channels,應用程式通過這些channels與外界通信,而channels則是通過一個明确的中間件Binder與外部brokers連接配接。

官方文檔中文版!Spring Cloud Stream 快速入門

各大消息中間件的綁定抽象

Spring Cloud Stream提供對Kafka,Rabbit MQ,Redis,和Gemfire的Binder實作。Spring Cloud Stream還包括了一個TestSupportBinder,TestSupportBinder預留一個未更改的channel以便于直接地、可靠地和channels通信。

內建Kafka

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>           

內建RabbitMQ

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>           

分區支援

Spring Cloud Stream支援在一個應用程式的多個執行個體之間資料分區,在分區的情況下,實體通信媒體(例如,topic代理)被視為多分區結構。一個或多個生産者應用程式執行個體将資料發送給多個消費應用執行個體,并保證共同的特性的資料由相同的消費者執行個體處理。

Spring Cloud Stream提供了一個通用的抽象,用于統一方式進行分區處理,是以分區可以用于自帶分區的代理(如kafka)或者不帶分區的代理(如rabbiemq)

分區在有狀态進行中是一個很重要的概念,其重要性展現在性能和一緻性上,要確定所有相關資料被一并處理,例如,在時間窗平均計算的例子中,給定傳感器測量結果應該都由同一應用執行個體進行計算。

四、程式設計模型

Spring Cloud Stream提供了一些預定義的注解,用于綁定輸入和輸出channels,以及如何監聽channels。

通過@EnableBinding觸發綁定

将@EnableBinding注解添加到應用的配置類,就可以把一個spring應用轉換成Spring Cloud Stream應用,@EnableBinding注解本身就包含@Configuration注解,會觸發Spring Cloud Stream 基本配置。

@Import(...)
@Configuration
@EnableIntegration
public @interface EnableBinding {
    ...
    Class<?>[] value() default {};
}
           

@Input 與 @Output

一個Spring Cloud Stream應用可以有任意數目的input和output通道,後者通過@Input和@Output注解在接口中定義。

@StreamListener

定義在方法中,被修飾的方法注冊為消息中間件上資料流的事件監聽器,注解中屬性值對應了監聽的消息通道名。

Source,Sink和Processor

Spring Cloud Stream提供了三個開箱即用的預定義接口。

Source用于有單個輸出(outbound)通道的應用。

public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();

}
           

Sink用于有單個輸入(inbound)通道的應用。

public interface Sink {

  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();

}
           

Processor用于單個應用同時包含輸入和輸出通道的情況。

public interface Processor extends Source, Sink {
}
           

五、Stream極簡執行個體

下面是一個非常簡單的 SpringBootApplication應用,通過依賴Spring Cloud Stream,從Input通道監聽消息然後傳回應答到Output通道,隻要添加配置檔案就可以應用。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}           

下面解釋下這個示例中相關注解的應用:

@EnableBinding聲明了這個應用程式綁定了2個通道:INPUT和OUTPUT。這2個通道是在接口Processor中定義的(Spring Cloud Stream預設設定)。所有通道都是配置在一個具體的消息中間件或綁定器中。

@StreamListener(Processor.INPUT)表明這裡在input中提取消息,并且處理。

@SendTo(Processor.OUTPUT)表明在output中傳回消息。

總結

這篇文章根據Spring Cloud Stream的官方文檔,對Stream做了一個整體的介紹,包括設計目标,應用場景,業務模型以及對外開放的注解,後面我會通過一個執行個體,示範 Spring Cloud Stream 的應用。

官方文檔中文版!Spring Cloud Stream 快速入門