RocketMQ出了4的版本,而且本身這個mq有事務消息,在分布式的場景中有很好的啟發性和作用,而且本身它也是阿裡開源到apache的一個項目,從出身還是實力來說都很不錯的。
1、建立項目sc-rocketmq,對應的pom.xml如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>spring-cloud</groupId>
<artifactId>sc-rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>sc-rocketmq</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
Producer單從分類producer的官網doc來看主要分成3種:
DefaultMQProducer
TransactionMQProducer
messagingAccessPoint.createProducer()
本文主要說的是DefaultMQProducer和TransactionMQProducer
預設的producer是DefaultMQProducer,從官方的文檔來看,前四個都是對這個producer的運用隻是set的值不同而已,而且是很細微的變化而已。

2、建立配置檔案application.yml
server:
port: 8182
spring:
application:
name: sc-rocketmq
rocketmq:
consumer:
groupName: consumerGroup # 消費者的組名
consumeThreadMin: 2
consumeThreadMax: 5
consumeMessageBatchMaxSize: 10
topics: rocketTopic,rocketTag
producer:
groupName: producerGroup # 生産者的組名
maxMessageSize: 100
sendMsgTimeout: 1000
retryTimesWhenSendFailed: 3
namesrvAddr: 127.0.0.1:9876 # NameServer位址
3、建立消息生産者類
讀取application.yml配置:
package sc.rocketmq.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ConfigurationProperties(prefix = "rocketmq.producer")
@Configuration
public class ProducerConfig {
private String namesrvAddr;
private String groupName;
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
@Override
public String toString() {
return "ProducerConfig [namesrvAddr=" + namesrvAddr + ", groupName=" + groupName + "]";
}
}
消息生産者:
package sc.rocketmq.config;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ProducerConfigure {
Logger log = LoggerFactory.getLogger(ProducerConfigure.class);
@Autowired
private ProducerConfig producerConfigure;
/**
* 建立普通消息發送者執行個體
*
* @return
* @throws MQClientException
*/
@Bean
// @ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true")
public DefaultMQProducer defaultProducer() throws MQClientException {
log.info(producerConfigure.toString());
log.info("defaultProducer 正在建立---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(producerConfigure.getGroupName());
producer.setNamesrvAddr(producerConfigure.getNamesrvAddr());
producer.setVipChannelEnabled(false);
producer.setRetryTimesWhenSendAsyncFailed(10);
producer.start();
log.info("rocketmq producer server開啟成功---------------------------------.");
return producer;
}
}
4、建立消息消費者類
package sc.rocketmq.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
public class ConsumerConfig {
private String groupName;
private String namesrvAddr;
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
@Override
public String toString() {
return "ConsumerConfig [groupName=" + groupName + ", namesrvAddr=" + namesrvAddr + "]";
}
}
消息消費者類(抽象類):
package sc.rocketmq.config;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@Configuration
public abstract class DefaultConsumerConfigure {
Logger log = LoggerFactory.getLogger(DefaultConsumerConfigure.class);
@Autowired
private ConsumerConfig consumerConfig;
// 開啟消費者監聽服務
public void listener(String topic, String tag) throws MQClientException {
log.info("開啟" + topic + ":" + tag + "消費者-------------------");
log.info(consumerConfig.toString());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());
consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());
consumer.subscribe(topic, tag);
// 開啟内部類實作監聽
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return DefaultConsumerConfigure.this.dealBody(msgs);
}
});
consumer.start();
log.info("rocketmq啟動成功---------------------------------------");
}
// 處理body的業務
public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs);
}
具體消息消費者類:
package sc.rocketmq.service;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import sc.rocketmq.config.DefaultConsumerConfigure;
@Configuration
public class CustomConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent> {
Logger log = LoggerFactory.getLogger(CustomConsumer.class);
@Override
public void onApplicationEvent(ContextRefreshedEvent arg0) {
try {
super.listener("t_TopicTest", "Tag1");
} catch (MQClientException e) {
log.error("消費者監聽器啟動失敗", e);
}
}
@Override
public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs) {
int num = 1;
log.info("進入");
for (MessageExt msg : msgs) {
log.info("第" + num + "次消息");
try {
String msgStr = new String(msg.getBody(), "utf-8");
log.info(msgStr);
} catch (UnsupportedEncodingException e) {
log.error("body轉字元串解析失敗");
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
這個CustomConsumer類實作了ApplicationListener,讓他在啟動的時候就開啟執行DefaultConsumerConfigure的listener方法
5、建立springboot啟動類RocketMqApplication.java
package sc.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMqApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMqApplication.class, args);
}
}
6、建立一個Controller,引入消息生産者
package sc.rocketmq.controller;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import sc.rocketmq.service.CustomConsumer;
@RestController
public class ProducerController {
Logger log = LoggerFactory.getLogger(CustomConsumer.class);
@Autowired
private DefaultMQProducer producer;
// @Autowired
// private TransactionMQProducer producer;
// @Autowired
// private TestTransactionListener testTransactionListener;
@GetMapping("/msg/product")
public void test(String info) throws Exception {
Message message = new Message("TopicTest", "Tag1", "12345", "rocketmq測試成功".getBytes());
// 這裡用到了這個mq的異步處理,類似ajax,可以得到發送到mq的情況,并做相應的處理
// 不過要注意的是這個是異步的
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("傳輸成功");
log.info(JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("傳輸失敗", e);
}
});
}
}
7、驗證是否成功
通路http://127.0.0.1:8080/msg/product
可以看到controller産生消息,然後CustomConsumer類的dealBody方法消息消息。
源碼:
https://gitee.com/hjj520/spring-cloud-2.x/tree/master/sc-apache-rocketmq
本文作者: java樂園
本文來自雲栖社群合作夥伴“
JAVA樂園”,了解相關資訊可以關注“
”