介紹Spring Cloud Stream
Spring Cloud Stream是建構消息驅動的微服務應用程式的架構。Spring Cloud Stream基于Spring Boot建立獨立的生産級Spring應用程式,并使用Spring Integration提供與消息代理的連接配接。它提供了來自幾家供應商的中間件的意見配置,介紹了持久釋出訂閱語義,消費者組和分區的概念。
您可以将@EnableBinding注釋添加到應用程式,以便立即連接配接到消息代理,并且可以将@StreamListener添加到方法中,以使其接收流處理的事件。以下是接收外部消息的簡單接收器應用程式。
@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {
public static void main(String[] args) {
SpringApplication.run(VoteRecordingSinkApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void processVote(Vote vote) {
votingService.recordVote(vote);
}
}
@EnableBinding注釋需要一個或多個接口作為參數(在這種情況下,該參數是單個Sink接口)。接口聲明輸入和/或輸出通道。Spring Cloud Stream提供了接口Source,Sink和Processor; 您還可以定義自己的界面。
以下是Sink接口的定義:
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
@Input注釋辨別輸入通道,通過該輸入通道接收到的消息進入應用程式; @Output注釋辨別輸出通道,釋出的消息将通過該通道離開應用程式。@Input和@Output注釋可以使用頻道名稱作為參數; 如果未提供名稱,将使用注釋方法的名稱。
Spring Cloud Stream将為您建立一個界面的實作。您可以在應用程式中通過自動連接配接來使用它,如下面的測試用例示例。
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)
@WebAppConfiguration
@DirtiesContext
public class StreamApplicationTests {
@Autowired
private Sink sink;
@Test
public void contextLoads() {
assertNotNull(this.sink.input());
}
}
程式設計模型
Binder
Binder 是 Spring Cloud Stream 的一個抽象概念,是應用與消息中間件之間的粘合劑。
目前 Spring Cloud Stream 實作了 Kafka 和 Rabbit MQ 的binder。通過 binder ,可以很友善的連接配接中間件,可以動态的改變消息的destinations(對應于 Kafka 的topic,Rabbit MQ 的 exchanges),這些都可以通過外部配置項來做到。甚至可以任意的改變中間件的類型而不需要修改一行代碼。
Publish-Subscribe
消息的釋出(Publish)和訂閱(Subscribe)是事件驅動的經典模式。Spring Cloud Stream 的資料互動也是基于這個思想。生産者把消息通過某個 topic 廣播出去(Spring Cloud Stream 中的 destinations)。其他的微服務,通過訂閱特定 topic 來擷取廣播出來的消息來觸發業務的進行。
這種模式,極大的降低了生産者與消費者之間的耦合。即使有新的應用的引入,也不需要破壞目前系統的整體結構。
Consumer Groups
“Group”, Kafka 中的概念。Spring Cloud Stream 的這個分組概念的意思基本和 Kafka 一緻。
微服務中動态的縮放同一個應用的數量以此來達到更高的處理能力是非常必須的。對于這種情況,同一個事件防止被重複消費,隻要把這些應用放置于同一個 “group” 中,就能夠保證消息隻會被其中一個應用消費一次。
Message
Message,就是所說的消息體,用來承載傳輸的資訊用的。Message分為兩部分,header和payload。header是頭部資訊,用來存儲傳輸的一些特性屬性參數。payload是用來裝載資料的,他可以攜帶的任何Object對象 不同的對象在binder中傳輸 可以指定不同的mini類型 具體參考
http://cloud.spring.io/spring-cloud-static/Edgware.SR4/single/spring-cloud.html#contenttypemanagement可以通過application.yml中設定 輸入input和輸出output的mini類型
spring.cloud.stream.bindings..content-type
MessageChannel
消息管道,生産者生産一個消息到channel,消費者從channel消費一個消息,是以channel可以對消息元件解耦,并且提供一個友善的攔截功能和監控功能。
預設的通道
輸入(SubscribableChannel)和輸出通道(MessageChannel)參考 Processor接口
springcloudstream提供通道的定義 比如自定義通過可以使用接口
public interface OrderChannel {
String INPUT = "input_order";
String OUTPUT="ouput_order";
/**
* input注解制定通道的名稱 将來在yml中配置該通道的實際綁定的topic或者訂閱組
* @return
*/
@Input(INPUT)
SubscribableChannel orderInput();
/**
* output注解指定輸出通道的名稱
* @return
*/
@Output(OUTPUT)
MessageChannel orderOutput();
}
以下 代碼參考 Source Sink Processor接口 将來在yml關于該通道的配置既可以
spring:
cloud:
stream:
bindings:
通道名稱:
destination: mydest
java B2B2C 多租戶電子商城系統