1、先安裝rocketmq,配置環境變量,這裡就不寫怎麼安裝了,cmd指令行進入bin目錄,運作name-server和broker,分别用如下兩個指令行
start mqnamesrv.cmd
不要關閉指令行視窗,當然也可以用背景運作的方式運作這兩個檔案
2、建立springboot項目,下面是依賴包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>RELEASE</version>
</dependency>
3、配置檔案 application.properties
###producer
#該應用是否啟用生産者
rocketmq.producer.isOnOff=on
#發送同一類消息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用[email protected](pid代表jvm名字)作為唯一标示
rocketmq.producer.groupName=hpGroup
#mq的nameServer位址
rocketmq.producer.namesrvAddr=127.0.0.1:9876
#消息最大長度 預設1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#發送消息逾時時間,預設3000
rocketmq.producer.sendMsgTimeout=3000
#發送消息失敗重試次數,預設2
rocketmq.producer.retryTimesWhenSendFailed=2
###consumer
##該應用是否啟用消費者
rocketmq.consumer.isOnOff=on
rocketmq.consumer.groupName=hpGroup
#mq的nameServer位址
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
#該消費者訂閱的主題和tags("*"号表示訂閱該主題下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
rocketmq.consumer.topics=rocketTopic~*
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
#設定一次消費消息的條數,預設為1條
rocketmq.consumer.consumeMessageBatchMaxSize=1
rocket.group=rocketGroup
rocket.topic=rocketTopic
rocket.tag=rocketTag
注:要保證rocketmq.consumer.topics去除 ~ 之後的值和rocket.group的值一緻,~隻是分隔符,也可以選擇其他分隔符。
4、
生産者配置類代碼:
package com.hp.rocket.rocket;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ProducerConfig {
private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;
@Value("${rocketmq.producer.groupName}")
private String groupName;
@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize ;
@Value("${rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;
@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;
@Bean
public DefaultMQProducer getRocketMQProducer() {
DefaultMQProducer producer;
producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
//如果需要同一個jvm中不同的producer往不同的mq叢集發送消息,需要設定不同的instanceName
if(this.maxMessageSize!=null){
producer.setMaxMessageSize(this.maxMessageSize);
}
if(this.sendMsgTimeout!=null){
producer.setSendMsgTimeout(this.sendMsgTimeout);
}
//如果發送消息失敗,設定重試次數,預設為2次
if(this.retryTimesWhenSendFailed!=null){
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
}
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return producer;
}
}
5、
消費者配置類代碼
package com.hp.rocket.rocket;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.hp.rocket.common.CodeMsg;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.util.StringUtils;
@Slf4j
@SpringBootConfiguration
public class ConsumerConfig {
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;
@Value("${rocketmq.consumer.topics}")
private String topics;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;
@Autowired
private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer() throws Exception {
if (StringUtils.isEmpty(groupName)){
throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg());
}
if (StringUtils.isEmpty(namesrvAddr)){
throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg());
}
if(StringUtils.isEmpty(topics)){
throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg());
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.registerMessageListener(mqMessageListenerProcessor);
/**
* 設定Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/**
* 設定消費模型,叢集還是廣播,預設為叢集
*/
//consumer.setMessageModel(MessageModel.CLUSTERING);
/**
* 設定一次消費消息的條數,預設為1條
*/
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
try {
/**
* 設定該消費者訂閱的主題和tag,如果是訂閱該主題下的所有tag,則tag使用*;如果需要指定訂閱該主題下的某些tag,則使用||分割,例如tag1||tag2||tag3
*/
String[] topicTagsArr = topics.split(";");
for (String topicTags : topicTagsArr) {
String[] topicTag = topicTags.split("~");
consumer.subscribe(topicTag[0],topicTag[1]);
}
consumer.start();
log.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
}catch (MQClientException e){
log.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
throw new Exception(e);
}
return consumer;
}
}
7、消費者的監聽器代碼
package com.hp.rocket.rocket;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.hp.rocket.entity.MessageBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
@Slf4j
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if(CollectionUtils.isEmpty(list)){
log.info("接受到的消息為空,不處理,直接傳回成功");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = list.get(0);
log.info("接受到的消息為:"+new String(messageExt.getBody()));
log.info("接受到的消息為:"+messageExt.toString());
// MessageBack messageBack = new MessageBack();
// messageBack.setMsg(new String(messageExt.getBody()));
// messageBack.setId(messageExt.getMsgId());
if(messageExt.getTopic().equals("你的Topic")){
if(messageExt.getTags().equals("你的Tag")){
//判斷該消息是否重複消費
int reconsume = messageExt.getReconsumeTimes();
if(reconsume ==3){//消息已經重試了3次,如果不需要再次消費,則傳回成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//TODO 處理對應的業務邏輯
}
}
// 如果沒有return success ,consumer會重新消費該消息,直到return success
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
8、通用配置參數
package com.hp.rocket.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class ParamConfigService {
@Value("${rocket.group}")
public String rocketGroup ;
@Value("${rocket.topic}")
public String rocketTopic ;
@Value("${rocket.tag}")
public String rocketTag ;
}
9、service層
package com.hp.rocket.service;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.hp.rocket.entity.MessageBack;
import java.util.List;
public interface RocketMqService {
SendResult openAccountMsg(String msgInfo);
}
impl實作類
package com.hp.rocket.service;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
@Slf4j
public class RocketMqServiceImpl implements RocketMqService{
@Resource
private DefaultMQProducer defaultMQProducer;
@Resource
private ParamConfigService paramConfigService ;
@Override
public SendResult openAccountMsg(String msgInfo) {
// 可以不使用Config中的Group
defaultMQProducer.setProducerGroup(paramConfigService.rocketGroup);
log.info("開始發送消息:"+msgInfo);
SendResult sendResult = null;
try {
Message sendMsg = new Message(paramConfigService.rocketTopic,
paramConfigService.rocketTag,
"open_account_key", msgInfo.getBytes());
sendResult = defaultMQProducer.send(sendMsg);
log.info("消息發送響應資訊:"+sendResult.toString());
} catch (Exception e) {
e.printStackTrace();
}
return sendResult ;
}
}
10、controller層
友善浏覽器測試,用的get方法
package com.hp.rocket.controller;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.hp.rocket.common.HPResponse;
import com.hp.rocket.service.RocketMqService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RequestMapping("rocket")
@RestController
public class RocketController {
@Autowired
RocketMqService rocketMqService;
@GetMapping(value = "getResult/{msg}")
public HPResponse<SendResult> getResult(@PathVariable("msg") String msg){
if(StringUtils.isEmpty(msg)){
return HPResponse.error(CodeMsg.SYSTEM_ERROR.fillArgs("參數不能為空"));
}
SendResult result = rocketMqService.openAccountMsg(msg);
return HPResponse.success(result);
}
}
11、
response傳回類也貼上來,友善新手直接拿來用
package com.hp.rocket.common;
public class HPResponse<T> {
private int code;
private String msg;
private T data;
public static <T> HPResponse<T> success(T data){
return new HPResponse<T>(data);
}
public static <T> HPResponse<T> success(){
return new HPResponse<T>();
}
public static <T> HPResponse<T> error(CodeMsg codeMsg){
return new HPResponse<T>(codeMsg);
}
private HPResponse(T data) {
this.code = 200;
this.msg = "success";
this.data = data;
}
private HPResponse() {
this.code = 200;
this.msg = "success";
}
private HPResponse(CodeMsg codeMsg) {
if(codeMsg == null) {
return;
}
this.code = codeMsg.getCode();
this.msg = codeMsg.getMsg();
}
public int getCode() {
return code;
}
public String getMsg() {
return msg;
}
public T getData() {
return data;
}
}
12、自定義傳回錯誤碼工具類
package com.hp.rocket.common;
import lombok.Getter;
@Getter
public class CodeMsg {
private int code;
private String msg;
public static CodeMsg SERVER_ERROR = new CodeMsg(50000, "服務端異常");
public static CodeMsg SYSTEM_ERROR = new CodeMsg(40000, "%s");
private CodeMsg(int code, String msg) {
this.code = code;
this.msg = msg;
}
public CodeMsg fillArgs(Object ... args){
String message = String.format(this.msg,args);
return new CodeMsg(this.code,message);
}
}
啟動類我就不貼了,springboot項目自帶,覺得好用的就在我的GitHub點個星☆,謝謝
源碼:https://github.com/hlposvseq/springboot-rocketmq-