天天看點

SpringCloud學習之(十五)SpringCloud Stream消息驅動

文章目錄

    • (十五)SpringCloud Stream消息驅動
      • 1、消息驅動概述
        • 1.1 什麼是SpringCloudStream
        • 1.2 設計思想
          • 1.2.1 标準的MQ
          • 1.2.2 為什麼用Cloud Stream
          • 1.2.3 stream憑什麼可以統一底層差異
          • 1.2.4 Binder
          • 1.2.5 Stream中的消息通信方式遵循了釋出-訂閱模式
        • 1.3 Spring Cloud Stream标準流程套路
        • 1.4 編碼API和常用注解
      • 2、案例說明
      • 3、消息驅動之生産者
        • 3.1 建立model——cloud-stream-rabbitmq-provider8801
        • 3.2 POM
        • 3.3 YML
        • 3.4 業務類
          • 3.4.1 發送消息接口
          • 3.4.2 發送消息接口的實作類
          • 3.4.3 Conteoller
        • 3.5 主啟動類
        • 3.6 測試
      • 4、消息驅動之消費者
        • 4.1 建立model——cloud-stream-rabbitmq-comsumer8802
        • 4.2 POM
        • 4.3 YML
        • 4.4 業務類
        • 4.5 主啟動類
        • 4.6 測試
      • 5、分組消費與持久化
        • 5.1 依照8802,clone出來一份運作8803
        • 5.2 啟動
        • 5.3 運作後兩個問題
        • 5.4 解決重複消費
        • 5.5 分組
          • 5.5.1 原理:
          • 5.5.2 檢視目前的group:
          • 5.5.3 将8802/8803都變成不同組,group兩個不同
          • 5.5.4 結論:
        • 5.6 解決消息持久化

(十五)SpringCloud Stream消息驅動

1、消息驅動概述

1.1 什麼是SpringCloudStream

官方定義Spring Cloud Stream是一個建構消息驅動微服務的架構。

應用程式通過inputs或者 outputs來與Spring Cloud Stream中binder對象互動。

通過我們配置來binding(綁定),而Spring Cloud Stream 的binder對象負責與消息中間件互動。

是以,我們隻需要搞清楚如何與Spring Cloud Stream互動就可以友善使用消息驅動的方式。

通過使用Spring Integration來連接配接消息代理中間件以實作消息事件驅動。

Spring Cloud Stream為一些供應商的消息中間件産品提供了個性化的自動化配置實作,引用了釋出-訂閱、消費組、分區的三個核心概念。

目前僅支援RabbitMQ、Kafka。

一句話就是:屏蔽底層消息中間件的差異,降低切換版本,統一消息的程式設計模型

官網
  • https://spring.io/projects/spring-cloud-stream#overview

  • https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/

  • Spring Cloud Stream中文指導手冊:
    • https://m.wang1314.com/doc/webapp/topic/20971999.html

1.2 設計思想

1.2.1 标準的MQ
SpringCloud學習之(十五)SpringCloud Stream消息驅動
  • 生産者/消費者之間靠消息媒介傳遞資訊内容——Message
  • 消息必須走特定的通道——消息通道MessageChannel
  • 消息通道裡的消息如何被消費呢,誰負責收發處理——消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器訂閱
1.2.2 為什麼用Cloud Stream

比方說我們用到了RabbitMQ和Kafka,由于這兩個消息中間件的架構上的不同,

像RabbitMQ有exchange,kafka有Topic和Partitions分區,

SpringCloud學習之(十五)SpringCloud Stream消息驅動

這些中間件的差異性導緻我們實際項目開發給我們造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,後面的業務需求,我想往另外一種消息隊列進行遷移,這時候無疑就是一個災難性的, 一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候springcloud Stream給我們提供了一種解耦合的方式。

1.2.3 stream憑什麼可以統一底層差異

在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行資訊互動的時候,由于各消息中間件建構的初衷不同,它們的實作細節上會有較大的差異性。

通過定義綁定器作為中間層,完美地實作了應用程式與消息中間件細節之間的隔離。

通過向應用程式暴露統一的Channel通道, 使得應用程式不需要再考慮各種不同的消息中間件實作。

通過

定義綁定器Binder

作為中間層,實作了應用程式與消息中間件細節之間的隔離。

1.2.4 Binder

Stream對消息中間件的進一 步封裝,可以做到代碼層面對中間件的無感覺,甚至于動态的切換中間件(rabbitmq切換為kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程

SpringCloud學習之(十五)SpringCloud Stream消息驅動

通過定義綁定器Binder作為中間層,實作了應用程式與消息中間件細節之間的隔離。

INPUT:消費者

OUTPUT:生産者

1.2.5 Stream中的消息通信方式遵循了釋出-訂閱模式

Topic主題進行廣播

在RabbitMQ就是Exchange

在kafka中就是Topic

1.3 Spring Cloud Stream标準流程套路

SpringCloud學習之(十五)SpringCloud Stream消息驅動
SpringCloud學習之(十五)SpringCloud Stream消息驅動
  • Binder:連接配接中間件、屏蔽差異
  • Channel:通道,是隊列Queue的一種抽象,在消息通訊系統中就是實作存儲和轉發的媒介,通過對Channel對隊列進行配置
  • Source和Sink:簡單的可了解為參照對象是Spring Cloud Stream自身,從Stream釋出消息就是輸出,接受消息就是輸入

1.4 編碼API和常用注解

SpringCloud學習之(十五)SpringCloud Stream消息驅動

2、案例說明

RabbitMQ環境已經OK

工程中建立三個子子產品:

  • cloud-stream-rabbitmq-provider8801,作為生産者進行發消息子產品
  • cloud-stream-rabbitmq-consumer8802,作為消息接收子產品
  • cloud-stream-rabbitmq-consumer8803,作為消息接收子產品

3、消息驅動之生産者

3.1 建立model——cloud-stream-rabbitmq-provider8801

3.2 POM

新引進的pom

<dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
   </dependency>
           
<dependencies>
        <!--stream-rabbit-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>com.atguigu.springcloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>${project.version}</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
           

3.3 YML

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider  #微服務名稱
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為json,文本則設定“text/plain”
          binder: defaultRabbit  # 設定要綁定的消息服務的具體設定

eureka:
  client: # 用戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: send-8801.com  # 在資訊清單時顯示主機名稱
    prefer-ip-address: true     # 通路的路徑變為IP位址
           

3.4 業務類

3.4.1 發送消息接口
package com.atguigu.springcloud.service;

//消息發送者的接口
public interface IMessageProvider {
    public String send();
}
           
3.4.2 發送消息接口的實作類

新知識:

  • @EnableBinding(Source.class)

    //定義消息的推送廣告
  • private MessageChannel output

    ;//消息發送管道
  • MessageBuilder

package com.atguigu.springcloud.service.impl;
......
@EnableBinding(Source.class)  //定義消息的推送廣告
public class MessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output;//消息發送管道
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*******serial: "+serial);
        return null;
    }
}
           
3.4.3 Conteoller
package com.atguigu.springcloud.controller;

import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class MessageController {
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}
           

3.5 主啟動類

package com.atguigu.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}
           

3.6 測試

  • 啟動7001eureka
  • 啟動rabbitmq
    • http://localhost:15672/

    • SpringCloud學習之(十五)SpringCloud Stream消息驅動
  • 啟動8801
  • 通路

    http://localhost:8801/sendMessage

背景列印:

SpringCloud學習之(十五)SpringCloud Stream消息驅動

RabbitMQ顯示:

SpringCloud學習之(十五)SpringCloud Stream消息驅動

4、消息驅動之消費者

4.1 建立model——cloud-stream-rabbitmq-comsumer8802

4.2 POM

跟生産者一樣

4.3 YML

server:
  port: 8802

spring:
  application:
    name: cloud-stream-comsumer  #微服務名稱
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用于于binding整合
          type: rabbit # 消息元件類型
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定消息類型,本次為json,文本則設定“text/plain”
          binder: defaultRabbit  # 設定要綁定的消息服務的具體設定

eureka:
  client: # 用戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: receive-8802.com  # 在資訊清單時顯示主機名稱
    prefer-ip-address: true     # 通路的路徑變為IP位址
           

4.4 業務類

controller

package com.atguigu.springcloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消費者1号,------接收到的消息為:  "+message.getPayload()+"\t  port: "+serverPort);
    }
}
           

4.5 主啟動類

package com.atguigu.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}

           

4.6 測試

啟動:7001 、 8801 、 8802

通路:

http://localhost:8801/sendMessage

結果:

  • 生産者8801:
    SpringCloud學習之(十五)SpringCloud Stream消息驅動
  • 消費者8802
SpringCloud學習之(十五)SpringCloud Stream消息驅動
  • RabbitMQ:
    SpringCloud學習之(十五)SpringCloud Stream消息驅動

5、分組消費與持久化

5.1 依照8802,clone出來一份運作8803

cloud-stream-rabbitmq-consumer8803

5.2 啟動

  • RabbitMQ
  • 7001 服務注冊
  • 8801 消息生産
  • 8802 消息消費
  • 8803 消息消費

通路兩次8801:

http://localhost:8801/sendMessage

8801:

SpringCloud學習之(十五)SpringCloud Stream消息驅動

8802:

SpringCloud學習之(十五)SpringCloud Stream消息驅動

8803:

SpringCloud學習之(十五)SpringCloud Stream消息驅動

5.3 運作後兩個問題

  • 重複消費
  • 消息持久化

5.4 解決重複消費

目前是8802/8803同時都收到了,存在重複消費問題

如何解決:

分組和持久化屬性

group=

生産實際案例:

比如在如下場景中,訂單系統我們做叢集部署,都會從RabbitMQ中擷取訂單資訊,

那如果一個訂單同時被兩個服務擷取到,那麼就會造成資料錯誤,我們得避免這種情況。

這時我們就可以使用Stream中的

消息分組

來解決

SpringCloud學習之(十五)SpringCloud Stream消息驅動

注意在Stream中處于同一個group中的多個消費者是競争關系,就能夠保證消息隻會被其中一個應用消費一次。

不同組是可以全面消費的(重複消費)

同—組内會發生竟争關系,隻有其中一個可以消費。

首先來了解一下分組

5.5 分組

5.5.1 原理:

微服務應用放置于同一個group中,就能夠保證消息隻會被其中一個應用消費一次。不同的組是可以消費的,同一個組内會發生競争關系,隻有其中一個可以消費。

5.5.2 檢視目前的group:

首先看一下,目前的group:

兩個消費者的group:不一樣

SpringCloud學習之(十五)SpringCloud Stream消息驅動

是以屬于不同的組,就會重複消費

5.5.3 将8802/8803都變成不同組,group兩個不同

group: atguiguA、atguiguB

修改YML:

8802:group: atguiguA

8803:group: atguiguB

SpringCloud學習之(十五)SpringCloud Stream消息驅動
5.5.4 結論:

還是重複消費

解決:

8802/8803實作了輪詢分組,每次隻有一個消費者 8801子產品的發的消息隻能被8802或8803其中一個接收到,這樣避免了重複消費

8802/8803都變成相同組,group兩個相同

都改成

group:atguiguA

測試:8801通路2次

SpringCloud學習之(十五)SpringCloud Stream消息驅動

檢視8802:

SpringCloud學習之(十五)SpringCloud Stream消息驅動

檢視8803:

SpringCloud學習之(十五)SpringCloud Stream消息驅動

同一個組的多個微服務執行個體,每次隻會有一個拿到

是以解決了重複消費

5.6 解決消息持久化

通過上述,解決了重複消費問題,再看看持久化

停止8802/8803并去除掉8802的分組group:atguiguA,8803的分組group:atguiguA沒有去掉

8801先發送4條資訊到rabbitmq

SpringCloud學習之(十五)SpringCloud Stream消息驅動

先啟動8802,無分組屬性配置,背景沒有打出來消息

然後啟動8803,有分組屬性配置,背景打出來了MQ上的消息

SpringCloud學習之(十五)SpringCloud Stream消息驅動
總結:要想在服務停止後也能接收到生産者發來的消息,就配置group屬性