天天看點

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 簡介、建立消息生産者、建立消息消費者、自定義消息通道、分組與持久化、設定 RoutingKey)

1、概念:SpringCloudStream

2、具體内容

2.1、SpringCloudStream 簡介

SpringCloudStream 就是使用了基于消息系統的微服務處理架構。對于消息系統而言一共分為兩類:基于應用标準的 JMS、基于協定标準的 AMQP,在整個 SpringCloud 之中支援有 RabbitMQ、Kafka 元件的消息系統。利用 SpringCloudStream 可以實作更加友善的消息系統的整合處理,但是推薦還是基于 RabbitMQ 實作會更好一些。

為什麼 SpringCloud 中要提供有一個類似于消息驅動的 SpringCloudStream 呢?

如果通過 Java 曆史上的分布式的開發架構大家不難發現,對于消息系統,實際上最初的 SUN 公司是非常看中的,是以在 EJB 的時代裡面專門提供有消息驅動 Bean(Message Driven Bean、MDB)利用消息驅動 Bean 可以進行消息的處理操作。利用消息驅動 bean 的模式可以簡化使用者的操作複雜度,直接傳遞一些各類的資料即可實作業務的處理操作。

于是在 SpringBoot 的之中為了友善開發者去整合消息元件,也提供有一系列的處理支援,但是如果按照這些方式來在 SpringCloud 之中進行消息處理,有些人會認為比較麻煩,是以在 SpringCloud 裡面将消息整合的處理操作進行了進一步的抽象操作, 實作了更加簡化的消息處理。

總結:SpringCloudStream 就是實作了 MDB 功能,同時可以更加簡化友善的整合消息元件。

SpringCloudStream的工作原理:

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 簡介、建立消息生産者、建立消息消費者、自定義消息通道、分組與持久化、設定 RoutingKey)

說明:最底層是消息服務,中間層是綁定層,綁定層和底層的消息服務進行綁定,頂層是消息生産者和消息消費者,頂層可以向綁定層生産消息和和擷取消息消費

 2.2、建立消息生産者

 本次基于 RabbitMQ 實作消息的生産者的微服務操作,在整個的生産者項目之中,首先建立了一個新的 Maven 子產品: microcloud-stream-provider-8401。

 1、 【microcloud-stream-provider-8401】修改 pom.xml 配置檔案,在這個配置檔案之中要追加如下的依賴程式包:

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
           

 2、 【microcloud-stream-provider-8401】修改 application.yml 配置檔案,追加如下的綁定處理配置:

server:
  port: 8401
eureka: 
  client: # 用戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://edmin:[email protected]:7001/eureka,http://edmin:[email protected]:7002/eureka,http://edmin:[email protected]:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: send-8401.com  # 在資訊清單時顯示主機名稱
    prefer-ip-address: true     # 通路的路徑變為IP位址
spring:
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為對象json,如果是文本則設定“text/plain”
          binder: defaultRabbit # 設定要綁定的消息服務的具體設定
  application:
    name: microcloud-stream-provider
           

 3、 修改 hosts 配置檔案,追加主機映射:

 127.0.0.1 send-8401.com

 4、 【microcloud-stream-provider-8401】定義一個消息的發送接口:

package cn.study.microcloud.service;

import cn.study.vo.Company;

public interface IMessageProvider {

    /**
    * 實作消息的發送,本次發送的消息是一個對象(自動變為json)
    * @param company VO對象,該對象不為null*/
      public void send(Company company) ;
}
           

 5、 【microcloud-stream-provider-8401】定義消息發送的實作子類:

package cn.study.microcloud.service.impl;

import javax.annotation.Resource;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import cn.study.microcloud.service.IMessageProvider;
import cn.study.vo.Company;

@EnableBinding(Source.class) // 可以了解為是一個消息的發送管道的定義
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output; // 消息的發送管道

    @Override
    public void send(Company company) {
        this.output.send(MessageBuilder.withPayload(company).build()); // 建立并發送消息
    }
}
           

 6、 【microcloud-stream-provider-8401】分析一下 Source 類的源代碼:

public interface Source {

    String OUTPUT = "output"; // 之前所設定的消息發送的管道

    @Output(Source.OUTPUT)
    MessageChannel output();

}
           

 7、 【microcloud-stream-provider-8401】定義程式主類:

package cn.study.microcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableDiscoveryClient
public class StreamProvider_8401_StartSpringCloudApplication {
    public static void main(String[] args) {
        SpringApplication.run(StreamProvider_8401_StartSpringCloudApplication.class, args);
    }
}
           

 8、 【microcloud-stream-provider-8401】編寫測試類:

 · 保證你的 pom.xml 檔案之中存在有測試的依賴程式包:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
           

 · 編寫具體的測試程式類:

package cn.study.microcloud.test;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;

import cn.study.microcloud.StreamProvider_8401_StartSpringCloudApplication;
import cn.study.microcloud.service.IMessageProvider;
import cn.study.vo.Company;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = StreamProvider_8401_StartSpringCloudApplication.class)
@WebAppConfiguration
public class TestMessageProvider {
    @Resource
    private IMessageProvider messageProvider;

    @Test
    public void testSend() {
        Company company = new Company();
        company.setTitle("studyjava");
        company.setNote("更多資源請登入:www.study.cn");
        this.messageProvider.send(company); // 消息發送
    }
}
           

 9、 啟動 RabbitMQ 以及相應的微服務進行消息的發送處理,如果可以檢測到 RabbitMQ 上的活動資訊就表示該微服務建立成功。

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 簡介、建立消息生産者、建立消息消費者、自定義消息通道、分組與持久化、設定 RoutingKey)

 2.3、建立消息消費者

 在之前已經成功的實作了消息的發送處理,但是這個消息由于隻是一個臨時消息并且隻是發送到了 RabbitMQ 之中,那麼現在 如果要想進行該消息的接收就必須通過 RabbitMQ 擷取消息内容。

 1、 【microcloud-stream-consumer-8402】通過“microcloud-stream-provider-8401”子產品複制本子產品;

 2、 【microcloud-stream-consumer-8402】一定要保證 pom.xml 檔案之中擁有如下的依賴包:

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
           

 3、 修改 hosts 主機映射,追加新的主機名稱:

 127.0.0.1 receive-8402.com

 4、 【microcloud-stream-consumer-8402】修改 application.yml 配置檔案:

server:
  port: 8402
eureka: 
  client: # 用戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://edmin:[email protected]:7001/eureka,http://edmin:[email protected]:7002/eureka,http://edmin:[email protected]:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: receive-8402.com  # 在資訊清單時顯示主機名稱
    prefer-ip-address: true     # 通路的路徑變為IP位址
spring:
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為對象json,如果是文本則設定“text/plain”
          binder: defaultRabbit # 設定要綁定的消息服務的具體設定
  application:
    name: microcloud-stream-consumer
           

 5、 【microcloud-stream-consumer-8402】定義一個消息的監聽程式類:

package cn.study.microcloud.service;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import cn.study.vo.Company;

@Component
@EnableBinding(Sink.class)
public class MessageListener {
    @StreamListener(Sink.INPUT)
    public void input(Message<Company> message) {
        System.err.println("【*** 消息接收 ***】" + message.getPayload());
    }
}
           

 6、 【microcloud-stream-consumer-8402】觀察 Sink 源代碼:

public interface Sink {

    String INPUT = "input";

    @Input(Sink.INPUT)
    SubscribableChannel input();

}
           

 7、 首先啟動消息的消費端,而後再啟動消息的生産者發送消息。

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 簡介、建立消息生産者、建立消息消費者、自定義消息通道、分組與持久化、設定 RoutingKey)

那麼此時實作了了一個基于 RabbitMQ 定義的 SpringCloudStream 基本操作功能。

 2.4、自定義消息通道

 現在已經實作了一個基礎的 SpringCloudStream 處理操作,但是在本次操作之中一直使用的都是系統中提供好的 Source (output)、Sink(input),如果說現在使用者有需要也可以定義自己的通道名稱。

 1、 【micocloud-api】由于現在有兩個子產品都需要使用到自定義消息通道的配置,是以應該将這個配置定義為一個公共的程式處理 類,修改 pom.xml 配置檔案,引入相應的開發包:

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
           

 2、 【micocloud-api】使用一個自定義的通道:

package cn.study.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface DefaultProcess {
    public static final String OUTPUT = "study_output"; // 輸出通道名稱
    public static final String INPUT = "study_input"; // 輸入通道名稱

    @Input(DefaultProcess.INPUT)
    public SubscribableChannel input();

    @Output(DefaultProcess.OUTPUT)
    public MessageChannel output();
}
           

 3、 【microcloud-stream-provider-8401】修改 application.yml 配置檔案:

spring:
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        study_output: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為對象json,如果是文本則設定“text/plain”
          binder: defaultRabbit # 設定要綁定的消息服務的具體設定
           

 4、 【microcloud-stream-consumer-8402】修改 application.yml 配置檔案:

spring:
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        study_input: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為對象json,如果是文本則設定“text/plain”
          binder: defaultRabbit # 設定要綁定的消息服務的具體設定
           

 5、 【microcloud-stream-provider-8401】修改消息的發送子類:

package cn.study.microcloud.service.impl;

import javax.annotation.Resource;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import cn.study.channel.DefaultProcess;
import cn.study.microcloud.service.IMessageProvider;
import cn.study.vo.Company;

@EnableBinding(DefaultProcess.class) // 可以了解為是一個消息的發送管道的定義
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output; // 消息的發送管道

    @Override
    public void send(Company company) {
        this.output.send(MessageBuilder.withPayload(company).build()); // 建立并發送消息
    }
}
           

 6、 【microcloud-stream-consumer-8402】修改 MessageListener 程式類:

package cn.study.microcloud.service;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import cn.study.channel.DefaultProcess;
import cn.study.vo.Company;

@Component
@EnableBinding(DefaultProcess.class)
public class MessageListener {
    @StreamListener(Sink.INPUT)
    public void input(Message<Company> message) {
        System.err.println("【*** 消息接收 ***】" + message.getPayload());
    }
}
           

 7、 随後就可以使用自定義的新的通道名稱進行 Stream 處理操作了。

 2.5、分組與持久化

 在上面的程式裡面成功的實作了消息的發送以及接收,但是需要注意一個問題,所發送的消息在預設情況下它都屬于一種臨時消息,也就是說如果現在沒有消費者進行消費處理,那麼該消息是不會被保留的。

 如果要想實作持久化的消息處理,重點在于消息的消費端配置,同時也需要考慮到一個分組的情況(有分組就表示該消息可以進行持久化)。

 1、 【microcloud-stream-consumer-8402】修改 application.yml 配置檔案,追加分組配置:

spring:
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        study_input: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為對象json,如果是文本則設定“text/plain”
          binder: defaultRabbit # 設定要綁定的消息服務的具體設定
          group: study-group # 進行操作的分組,實際上就表示持久化
           

 在 SpringCloudStream 之中如果要設定持久化隊列,則名稱為“destination.group”。此時關閉掉消費端的微服務之後該隊列資訊依然會被保留在 RabbitMQ 之中。而後在關閉消費端的情況下去運作消息生産者,發送完消息後再運作消息的消費端仍然可以接收到之前的消息。

 2.6、設定 RoutingKey

 預設情況下之前的程式都是屬于廣播消息,也就是說所有的消費者都可以接收發送消息内容,在 RabbitMQ 裡面支援有直連消息,而直連消息主要是通過 RoutingKey 來實作,利用直連消息可以實作準确的消息消費端的接收處理。

 1、 【microcloud-stream-consumer-8402】修改 application.yml 配置檔案:

server:
  port: 8402
eureka: 
  client: # 用戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://edmin:[email protected]:7001/eureka,http://edmin:[email protected]:7002/eureka,http://edmin:[email protected]:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: receive-8402.com  # 在資訊清單時顯示主機名稱
    prefer-ip-address: true     # 通路的路徑變為IP位址
spring:
  cloud:
    stream:
      rabbit: # 進行rabbit的相關綁定配置
        bindings:
          study_input:
            consumer: # 進行消費端配置
              bindingRoutingKey: study-key # 設定一個RoutingKey資訊
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        study_input: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: RKExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為對象json,如果是文本則設定“text/plain”
          binder: defaultRabbit # 設定要綁定的消息服務的具體設定
          group: study-group # 進行操作的分組,實際上就表示持久化
          
  application:
    name: microcloud-stream-consumer
           

 2、 【microcloud-stream-provider-8401】定義 RoutingKey 的表達式配置:

server:
  port: 8401
eureka: 
  client: # 用戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://edmin:[email protected]:7001/eureka,http://edmin:[email protected]:7002/eureka,http://edmin:[email protected]:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: send-8401.com  # 在資訊清單時顯示主機名稱
    prefer-ip-address: true     # 通路的路徑變為IP位址
spring:
  cloud:
    stream:
      rabbit: # 進行rabbit的相關綁定配置
        bindings:
          study_output:
            producer: # 進行消費端配置
              routing-key-expression: '''study-key''' #定義 RoutingKey 的表達式配置
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        study_output: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: RKExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為對象json,如果是文本則設定“text/plain”
          binder: defaultRabbit # 設定要綁定的消息服務的具體設定
  application:
    name: microcloud-stream-provider
           

 3、 首先運作消費端程式,随後在運作生産端,隻有 RoutingKey 比對了之後才可以正常進行消息的接收處理。

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 簡介、建立消息生産者、建立消息消費者、自定義消息通道、分組與持久化、設定 RoutingKey)

繼續閱讀