天天看点

SpringCloud 2020.0.4 系列之 Stream 延迟消息 的实现

1. 概述

老话说的好:对待工作要有责任心,不仅要完成自己的部分,还要定期了解整体的进展。

言归正传,我们在开发产品时,常常会遇到一段时间后检查状态的场景,例如:用户下单场景,如果订单生成30分钟后,用户还没有完成支付,则系统自动将订单关闭。

在没有消息中间件之前,常常是启动一个定时程序,固定间隔的去检查,不仅耗费系统资源,还会有较大的时间误差。

今天我们就来聊一下 RabbitMQ 的 延迟消息 功能的使用。

RabbitMQ 镜像模式集群的搭建,可参见我的另一篇文章《RabbitMQ 3.9.7 镜像模式集群的搭建》(https://www.cnblogs.com/w84422/p/15356202.html)

在早期的 SpringCloud 版本中常使用 @Input、@Output、@EnableBinding 和 @StreamListener 注解开发生产者与消费者。

官方原文:Deprecated as of 3.1 in favor of functional programming model。

SpringCloud 2020.0.4 版本中,已经不推荐这么开发了,因此这里我们也使用新的写法(函数式编程方式) 开发。

闲话不多说,直接上代码。

2. 延迟消息插件的安装

2.1 下载插件 

可以到 RabbitMQ 的官网下载,选择与安装的 RabbitMQ 对应的版本

官网地址:https://www.rabbitmq.com/community-plugins.html

SpringCloud 2020.0.4 系列之 Stream 延迟消息 的实现
SpringCloud 2020.0.4 系列之 Stream 延迟消息 的实现

2.2  解压插件

将插件 .ez 后缀名改为 .zip,然后使用 unzip 命令解压。

2.3  拷贝插件到 RabbitMQ 的 plugins 文件夹下

如果是用 rpm 包的方式安装的 RabbitMQ,路径为: /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.7/plugins

如果是 Docker 安装,则 Docker 容器内的路径为:/plugins

2.4 启用延迟消息插件

# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2.5 重启 RabbitMQ

# /sbin/service rabbitmq-server stop

# /sbin/service rabbitmq-server start

3. 延迟消息发送DEMO

3.1 主要依赖 

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 健康检查 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>      

3.2 消息实体类

@Setter
@Getter
public class MyMessage implements java.io.Serializable {

    // 消息体
    private String payload;
}      

3.3 生产者

// 发送延迟消息
    @PostMapping("/delayed")
    public String sendDelayedMessage(@RequestParam("body") String body,
                                     @RequestParam("seconds") Integer seconds) {

        MyMessage myMessage = new MyMessage();
        myMessage.setPayload(body);

        // 生产消息
        // 第一个参数是绑定名称,格式为:自定义的绑定名称-out-0,myDelayed是自定义的绑定名称,out代表生产者,0是固定写法
        // 自定义的绑定名称必须与消费方法的方法名保持一致
        // 第二个参数是发送的消息实体
        streamBridge.send("myDelayed-out-0",
                MessageBuilder.withPayload(myMessage)
                    .setHeader("x-delay", seconds * 1000)
                    .build()
        );
        log.info("发送延迟消息成功");
        return "SUCCESS";
    }      

3.4 消费者

// 消费延迟消息
    @Bean
    public Consumer<Message<MyMessage>> myDelayed() {  // 方法名必须与生产消息时自定义的绑定名称一致

        return message -> {
            log.info("接收延迟消息:{}", message.getPayload().getPayload());
        };
    }      

3.5 application.yml 配置

spring:
  application:
    name: my-stream-new
  rabbitmq:   # RabbitMQ 配置   
    addresses: 192.168.1.12:5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 16000
  cloud:
    function:
      # 定义消费者,多个用分号分隔,当存在大于1个的消费者时,不定义不会生效
      definition: myDelayed
    stream:
      bindings:
        # 延迟消息
        myDelayed-in-0: # 消费者绑定名称,myDelayed是自定义的绑定名称,in代表消费者,0是固定写法
          destination: my-delayed-topic   # 对应的真实的 RabbitMQ Exchange
        myDelayed-out-0: # 生产者绑定名称,myDelayed是自定义的绑定名称,out代表生产者,0是固定写法
          destination: my-delayed-topic   # 对应的真实的 RabbitMQ Exchange

      rabbit:
        bindings:
          myDelayed-in-0:
            consumer:
              delayedExchange: true  # 开启延迟功能
          myDelayed-out-0:
            producer:
              delayedExchange: true  # 开启延迟功能      

3.6 验证延迟消息

发送延迟消息接口:

POST http://localhost:49000/stream/delayed?body=这是一条延迟消息!&seconds=10

自动生成的 Exchange

SpringCloud 2020.0.4 系列之 Stream 延迟消息 的实现

自动生成的 Queue

SpringCloud 2020.0.4 系列之 Stream 延迟消息 的实现

消费情况

SpringCloud 2020.0.4 系列之 Stream 延迟消息 的实现

4. 综述

今天聊了一下 SpringCloud Stream 组件 延迟消息 的实现 ,希望可以对大家的工作有所帮助。

欢迎帮忙点赞、评论、转发、加关注 :)

关注追风人聊Java,每天更新Java干货。

5. 个人公众号

追风人聊Java,欢迎大家关注

SpringCloud 2020.0.4 系列之 Stream 延迟消息 的实现