天天看点

RocketMQ 集成 SpringBootRocketMQ 集成 SpringBoot总结

RocketMQ

  • RocketMQ 集成 SpringBoot
      • 1. 项目配置
        • 1.1 新建项目
        • 1.2 引入依赖
      • 2. 生产者实现
      • 3. 消费者实现
  • 总结

RocketMQ 集成 SpringBoot

1. 项目配置

1.1 新建项目

新建两个 SpringBoot 项目,项目名分别为:springboot-rocketmq-consumer、springboot-rocketmq-producter;

1.2 引入依赖

两个项目都导入下面 jar 包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
           

2. 生产者实现

  • springboot-rocketmq-producter 的配置文件 application.properties
    rocketmq.name-server=127.0.0.1:9876
    rocketmq.producer.group=my-group
    server.port=8081
               
  • 生产者实现代码
    @RestController
    public class HelloController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        @RequestMapping("01-hello")
        public String sendMsg(String message,String age) throws Exception{
            /**
             * 第一个参数 : topic ":" tag,主题后面是标签
             * 第二个参数 : message 消息内容
             */
            //底层是对 原生API 的封装, msg不需要转换成byte[],底层转
            // syncSend 同步请求 ,其他的就是方法名的改变
       		SendResult sendResult = rocketMQTemplate.syncSend("01-boot:", message);
            System.out.println(sendResult.getMsgId());
            System.out.println(sendResult.getSendStatus());
            return "success";
        }
    }
               

3. 消费者实现

  • springboot-rocketmq-consumer 的配置文件 application.properties
    rocketmq.name-server=127.0.0.1:9876
    server.port=8082
               
  • 生产者实现代码
    @Component
    @RocketMQMessageListener(
            topic = "01-boot",  	//获取 msg 的主题,对应msg所在的主题		
            consumerGroup = "wolfcode-consumer"		//消费者分组
    )
    public class HelloConsumer implements RocketMQListener<MessageExt> {
        /**
         * 消费者会一直监听指定主题下面的消息,如果消息来了就执行 onMessage 方法
         * 参数是消息内容
         * @param msg 消息内容
         */
        @Override
        public void onMessage(MessageExt messageExt) {
             System.out.println("消费消息"+messageExt);
        }
    }
               

总结

以上就是配置 Nacos 注册中心的总结了,代码仅供参考,欢迎讨论交流。

上篇博客:消息中间件 - RocketMQ 详解(从软件安装到案例实现)

继续阅读