天天看点

springboot集成阿里云rocketMQ代码示例

集成目标:完成生产者发送消息,消费者接收消息的整个流程

集成步骤:

    1、引入jar包依赖

<!--rocketMq消息队列-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.4.Final</version>
        </dependency>
           

2、初始化生产者连接

package com.gaozhen.webservicedemo.config;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Properties;

@Component
public class RocketMqProducerConfiguration {
    @Value("GID_sgcc_1")
    private String producerGroupName;

    @Value("172.16.205.55:9876")
    private String namesrvAddr;

    @Value("36Rl3QPMNNXJifNC")
    private String accessKey;

    @Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
    private String secretKey;

    private static Producer producer;

    @PostConstruct
    public void init() {
        // producer 实例配置初始化
        Properties properties = new Properties();
        //您在控制台创建的Producer ID
       // properties.setProperty(PropertyKeyConst.ProducerId,RocketMqConfig.producerGroupName);
        properties.setProperty(PropertyKeyConst.ProducerId,producerGroupName);
        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
       // properties.setProperty(PropertyKeyConst.AccessKey, RocketMqConfig.accessKey);
        properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
        //properties.setProperty(PropertyKeyConst.SecretKey, RocketMqConfig.secretKey);
        properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
        //设置发送超时时间,单位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
       // properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, RocketMqConfig.namesrvAddr);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
        producer = ONSFactory.createProducer(properties);
        //在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
        producer.start();
    }

    /**
     * 初始化生产者
     * @return
     */
    public Producer getProducer(){
        return producer;
    }



}
           

3、使用初始化的生产者producer发送消息massage

package com.gaozhen.webservicedemo.controller;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.gaozhen.webservicedemo.config.RocketMqProducerConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController
public class TestController {

    @Autowired
    private RocketMqProducerConfiguration rocketMqProducerConfiguration;

    @GetMapping("/sendMsg")
    public String sendMsg(){

        String toTopic = "topic_sx";
        String tag = "tag1";
        Message msg = new Message(toTopic, tag, "topic_sx,tag1发送的信息".getBytes());
        try {
            SendResult result = rocketMqProducerConfiguration.getProducer().send(msg);
            if(result!=null){
                System.out.println(new Date() + " Send mq message success. Topic is:"+ toTopic + " messageId is: " + result.getMessageId());
            } else {
                //logger.warn(".sendResult is null.........");
                System.out.println(".sendResult is null.........");
            }
            return "发送Mq消息成功";
        } catch (Exception e) {
            e.printStackTrace();
            return "发送Mq消息失败:"+ e.getMessage();
        }
    }

}
           

4、初始化消费者监听listener

package com.gaozhen.webservicedemo.config;

import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.gaozhen.webservicedemo.service.RocketMqListener;
import com.gaozhen.webservicedemo.util.UUIDUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Properties;

@Component
public class RocketMqConsumerConfiguration {
    @Autowired
    RocketMqListener rocketMqListener;

    @Value("GID_sgcc_1")
    private String consumerGroupName;

    @Value("172.16.205.55:9876")
    private String namesrvAddr;

    @Value("36Rl3QPMNNXJifNC")
    private String accessKey;

    @Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
    private String secretKey;

    public static final String tag = "tag1";

    private static Consumer consumer;

    @PostConstruct
    public void init() {
        // consumer 实例配置初始化
        Properties properties = new Properties();
        //您在控制台创建的consumer ID
        //properties.setProperty(PropertyKeyConst.ConsumerId, RocketMqConfig.consumerGroupName);
        properties.setProperty(PropertyKeyConst.ConsumerId, consumerGroupName);
        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
        //properties.setProperty(PropertyKeyConst.AccessKey, RocketMqConfig.accessKey);
        properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
        //properties.setProperty(PropertyKeyConst.SecretKey, RocketMqConfig.secretKey);
        properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
        //设置发送超时时间,单位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
        //properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, RocketMqConfig.namesrvAddr);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
        properties.setProperty(PropertyKeyConst.InstanceName, UUIDUtil.getUUID32());
        consumer = ONSFactory.createConsumer(properties);
        //------------------------------订阅topic-------------------------------------------------
        consumer.subscribe("topic_sx",tag, rocketMqListener);//监听第一个topic,new对应的监听器
        // 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
        consumer.start();
        System.out.println("ConsumerConfig start success.");
    }

    /**
     * 初始化消费者
     * @return
     */
    public Consumer getconsumer(){
        return consumer;
    }
}
           

5、其中的rocketMqListener实现MessageListener的自定义接收消息的监听类

package com.gaozhen.webservicedemo.service;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.springframework.stereotype.Service;
@Service
public class RocketMqListener implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        try {
            System.out.println("MessageListener.consume ok:" + message);
            byte[] body = message.getBody();
            String messageBody = new String(body);// 获取到接收的消息,由于接收到的是byte数组,所以需要转换成字符串
            System.out.println("收到发送的信息: " + messageBody);

        } catch (Exception e) {
            System.out.println("MessageListener.consume error:" + e.getMessage() );
        }
        System.out.println("MessageListener.Receive message");
        // 如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
        return Action.CommitMessage;
    }

}
           

最后,当访问sendMsg接口,生产者讲发送一个条消息到制定的topic和tag中去,消费者也必须用相同的topic和tag来接收,其中topic和tag可以理解为消息的一级标题和二级标签,如果不清楚tag可以用通配符“*”或者null来接收全部topic的消息,groupid可以一致也可以不一致,具体三者的区别和用法,我将另外写一篇文章重点介绍