天天看點

SpringCloud 2020.0.4 系列之 Stream 延遲消息 的實作

1. 概述

老話說的好:對待工作要有責任心,不僅要完成自己的部分,還要定期了解整體的進展。

言歸正傳,我們在開發産品時,常常會遇到一段時間後檢查狀态的場景,例如:使用者下單場景,如果訂單生成30分鐘後,使用者還沒有完成支付,則系統自動将訂單關閉。

在沒有消息中間件之前,常常是啟動一個定時程式,固定間隔的去檢查,不僅耗費系統資源,還會有較大的時間誤差。

今天我們就來聊一下 RabbitMQ 的 延遲消息 功能的使用。

RabbitMQ 鏡像模式叢集的搭建,可參見我的另一篇文章《RabbitMQ 3.9.7 鏡像模式叢集的搭建》(https://www.cnblogs.com/w84422/p/15356202.html)

在早期的 SpringCloud 版本中常使用 @Input、@Output、@EnableBinding 和 @StreamListener 注解開發生産者與消費者。

官方原文:Deprecated as of 3.1 in favor of functional programming model。

SpringCloud 2020.0.4 版本中,已經不推薦這麼開發了,是以這裡我們也使用新的寫法(函數式程式設計方式) 開發。

閑話不多說,直接上代碼。

2. 延遲消息插件的安裝

2.1 下載下傳插件 

可以到 RabbitMQ 的官網下載下傳,選擇與安裝的 RabbitMQ 對應的版本

官網位址:https://www.rabbitmq.com/community-plugins.html

SpringCloud 2020.0.4 系列之 Stream 延遲消息 的實作
SpringCloud 2020.0.4 系列之 Stream 延遲消息 的實作

2.2  解壓插件

将插件 .ez 字尾名改為 .zip,然後使用 unzip 指令解壓。

2.3  拷貝插件到 RabbitMQ 的 plugins 檔案夾下

如果是用 rpm 包的方式安裝的 RabbitMQ,路徑為: /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.7/plugins

如果是 Docker 安裝,則 Docker 容器内的路徑為:/plugins

2.4 啟用延遲消息插件

# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2.5 重新開機 RabbitMQ

# /sbin/service rabbitmq-server stop

# /sbin/service rabbitmq-server start

3. 延遲消息發送DEMO

3.1 主要依賴 

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 健康檢查 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>      

3.2 消息實體類

@Setter
@Getter
public class MyMessage implements java.io.Serializable {

    // 消息體
    private String payload;
}      

3.3 生産者

// 發送延遲消息
    @PostMapping("/delayed")
    public String sendDelayedMessage(@RequestParam("body") String body,
                                     @RequestParam("seconds") Integer seconds) {

        MyMessage myMessage = new MyMessage();
        myMessage.setPayload(body);

        // 生産消息
        // 第一個參數是綁定名稱,格式為:自定義的綁定名稱-out-0,myDelayed是自定義的綁定名稱,out代表生産者,0是固定寫法
        // 自定義的綁定名稱必須與消費方法的方法名保持一緻
        // 第二個參數是發送的消息實體
        streamBridge.send("myDelayed-out-0",
                MessageBuilder.withPayload(myMessage)
                    .setHeader("x-delay", seconds * 1000)
                    .build()
        );
        log.info("發送延遲消息成功");
        return "SUCCESS";
    }      

3.4 消費者

// 消費延遲消息
    @Bean
    public Consumer<Message<MyMessage>> myDelayed() {  // 方法名必須與生産消息時自定義的綁定名稱一緻

        return message -> {
            log.info("接收延遲消息:{}", message.getPayload().getPayload());
        };
    }      

3.5 application.yml 配置

spring:
  application:
    name: my-stream-new
  rabbitmq:   # RabbitMQ 配置   
    addresses: 192.168.1.12:5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 16000
  cloud:
    function:
      # 定義消費者,多個用分号分隔,當存在大于1個的消費者時,不定義不會生效
      definition: myDelayed
    stream:
      bindings:
        # 延遲消息
        myDelayed-in-0: # 消費者綁定名稱,myDelayed是自定義的綁定名稱,in代表消費者,0是固定寫法
          destination: my-delayed-topic   # 對應的真實的 RabbitMQ Exchange
        myDelayed-out-0: # 生産者綁定名稱,myDelayed是自定義的綁定名稱,out代表生産者,0是固定寫法
          destination: my-delayed-topic   # 對應的真實的 RabbitMQ Exchange

      rabbit:
        bindings:
          myDelayed-in-0:
            consumer:
              delayedExchange: true  # 開啟延遲功能
          myDelayed-out-0:
            producer:
              delayedExchange: true  # 開啟延遲功能      

3.6 驗證延遲消息

發送延遲消息接口:

POST http://localhost:49000/stream/delayed?body=這是一條延遲消息!&seconds=10

自動生成的 Exchange

SpringCloud 2020.0.4 系列之 Stream 延遲消息 的實作

自動生成的 Queue

SpringCloud 2020.0.4 系列之 Stream 延遲消息 的實作

消費情況

SpringCloud 2020.0.4 系列之 Stream 延遲消息 的實作

4. 綜述

今天聊了一下 SpringCloud Stream 元件 延遲消息 的實作 ,希望可以對大家的工作有所幫助。

歡迎幫忙點贊、評論、轉發、加關注 :)

關注追風人聊Java,每天更新Java幹貨。

5. 個人公衆号

追風人聊Java,歡迎大家關注

SpringCloud 2020.0.4 系列之 Stream 延遲消息 的實作