天天看點

springboot + rocketmq實作簡單消息隊列

1、先安裝rocketmq,配置環境變量,這裡就不寫怎麼安裝了,cmd指令行進入bin目錄,運作name-server和broker,分别用如下兩個指令行

start mqnamesrv.cmd
           

不要關閉指令行視窗,當然也可以用背景運作的方式運作這兩個檔案

2、建立springboot項目,下面是依賴包

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>

		<dependency>
			<groupId>com.alibaba.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>3.2.6</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>RELEASE</version>
        </dependency>
           

3、配置檔案 application.properties

###producer
#該應用是否啟用生産者
rocketmq.producer.isOnOff=on
#發送同一類消息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用[email protected](pid代表jvm名字)作為唯一标示
rocketmq.producer.groupName=hpGroup
#mq的nameServer位址
rocketmq.producer.namesrvAddr=127.0.0.1:9876
#消息最大長度 預設1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#發送消息逾時時間,預設3000
rocketmq.producer.sendMsgTimeout=3000
#發送消息失敗重試次數,預設2
rocketmq.producer.retryTimesWhenSendFailed=2

###consumer
##該應用是否啟用消費者
rocketmq.consumer.isOnOff=on
rocketmq.consumer.groupName=hpGroup
#mq的nameServer位址
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
#該消費者訂閱的主題和tags("*"号表示訂閱該主題下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
rocketmq.consumer.topics=rocketTopic~*
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
#設定一次消費消息的條數,預設為1條
rocketmq.consumer.consumeMessageBatchMaxSize=1


rocket.group=rocketGroup
rocket.topic=rocketTopic
rocket.tag=rocketTag
           

注:要保證rocketmq.consumer.topics去除 ~ 之後的值和rocket.group的值一緻,~隻是分隔符,也可以選擇其他分隔符。

4、

生産者配置類代碼:

package com.hp.rocket.rocket;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ProducerConfig {

    private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;
    @Value("${rocketmq.producer.groupName}")
    private String groupName;
    @Value("${rocketmq.producer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.producer.maxMessageSize}")
    private Integer maxMessageSize ;
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private Integer sendMsgTimeout;
    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
    private Integer retryTimesWhenSendFailed;

    @Bean
    public DefaultMQProducer getRocketMQProducer() {
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(this.groupName);
        producer.setNamesrvAddr(this.namesrvAddr);
        //如果需要同一個jvm中不同的producer往不同的mq叢集發送消息,需要設定不同的instanceName
        if(this.maxMessageSize!=null){
            producer.setMaxMessageSize(this.maxMessageSize);
        }
        if(this.sendMsgTimeout!=null){
            producer.setSendMsgTimeout(this.sendMsgTimeout);
        }
        //如果發送消息失敗,設定重試次數,預設為2次
        if(this.retryTimesWhenSendFailed!=null){
            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        }
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return producer;
    }
}

           

5、

消費者配置類代碼

package com.hp.rocket.rocket;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.hp.rocket.common.CodeMsg;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.util.StringUtils;

@Slf4j
@SpringBootConfiguration
public class ConsumerConfig {

    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;
    @Value("${rocketmq.consumer.topics}")
    private String topics;
    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
    private int consumeMessageBatchMaxSize;
    @Autowired
    private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;

    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer() throws Exception {
        if (StringUtils.isEmpty(groupName)){
            throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg());
        }
        if (StringUtils.isEmpty(namesrvAddr)){
            throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg());
        }
        if(StringUtils.isEmpty(topics)){
            throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg());
        }
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.registerMessageListener(mqMessageListenerProcessor);
        /**
         * 設定Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
         * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        /**
         * 設定消費模型,叢集還是廣播,預設為叢集
         */
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        /**
         * 設定一次消費消息的條數,預設為1條
         */
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        try {
            /**
             * 設定該消費者訂閱的主題和tag,如果是訂閱該主題下的所有tag,則tag使用*;如果需要指定訂閱該主題下的某些tag,則使用||分割,例如tag1||tag2||tag3
             */
            String[] topicTagsArr = topics.split(";");
            for (String topicTags : topicTagsArr) {
                String[] topicTag = topicTags.split("~");
                consumer.subscribe(topicTag[0],topicTag[1]);
            }
            consumer.start();
            log.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
        }catch (MQClientException e){
            log.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
            throw new Exception(e);
        }
        return consumer;
    }

}

           

7、消費者的監聽器代碼

package com.hp.rocket.rocket;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.hp.rocket.entity.MessageBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
@Slf4j
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

        if(CollectionUtils.isEmpty(list)){
            log.info("接受到的消息為空,不處理,直接傳回成功");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = list.get(0);
        log.info("接受到的消息為:"+new String(messageExt.getBody()));
        log.info("接受到的消息為:"+messageExt.toString());
//        MessageBack messageBack = new MessageBack();
//        messageBack.setMsg(new String(messageExt.getBody()));
//        messageBack.setId(messageExt.getMsgId());
        if(messageExt.getTopic().equals("你的Topic")){
            if(messageExt.getTags().equals("你的Tag")){
                //判斷該消息是否重複消費
                int reconsume = messageExt.getReconsumeTimes();
                if(reconsume ==3){//消息已經重試了3次,如果不需要再次消費,則傳回成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                //TODO 處理對應的業務邏輯
            }
        }
        // 如果沒有return success ,consumer會重新消費該消息,直到return success
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

           

8、通用配置參數

package com.hp.rocket.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class ParamConfigService {
    @Value("${rocket.group}")
    public String rocketGroup ;
    @Value("${rocket.topic}")
    public String rocketTopic ;
    @Value("${rocket.tag}")
    public String rocketTag ;
}
           

9、service層

package com.hp.rocket.service;

import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.hp.rocket.entity.MessageBack;

import java.util.List;

public interface RocketMqService {

    SendResult openAccountMsg(String msgInfo);

}
           

impl實作類

package com.hp.rocket.service;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
@Slf4j
public class RocketMqServiceImpl implements RocketMqService{

    @Resource
    private DefaultMQProducer defaultMQProducer;
    @Resource
    private ParamConfigService paramConfigService ;

    @Override
    public SendResult openAccountMsg(String msgInfo) {
        // 可以不使用Config中的Group
        defaultMQProducer.setProducerGroup(paramConfigService.rocketGroup);
        log.info("開始發送消息:"+msgInfo);
        SendResult sendResult = null;
        try {
            Message sendMsg = new Message(paramConfigService.rocketTopic,
                    paramConfigService.rocketTag,
                    "open_account_key", msgInfo.getBytes());
            sendResult = defaultMQProducer.send(sendMsg);
            log.info("消息發送響應資訊:"+sendResult.toString());

        } catch (Exception e) {
            e.printStackTrace();
        }
        return sendResult ;
    }

}
           

10、controller層

友善浏覽器測試,用的get方法

package com.hp.rocket.controller;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.hp.rocket.common.HPResponse;
import com.hp.rocket.service.RocketMqService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RequestMapping("rocket")
@RestController
public class RocketController {

    @Autowired
    RocketMqService rocketMqService;

    @GetMapping(value = "getResult/{msg}")
    public HPResponse<SendResult> getResult(@PathVariable("msg") String msg){
        if(StringUtils.isEmpty(msg)){
            return HPResponse.error(CodeMsg.SYSTEM_ERROR.fillArgs("參數不能為空"));
        }
        SendResult result = rocketMqService.openAccountMsg(msg);
        return HPResponse.success(result);
    }
}
           

11、

response傳回類也貼上來,友善新手直接拿來用

package com.hp.rocket.common;

public class HPResponse<T> {


    private int code;
    private String msg;
    private T data;
    public static <T> HPResponse<T> success(T data){
        return new HPResponse<T>(data);
    }

    public static <T> HPResponse<T> success(){
        return new HPResponse<T>();
    }

    public static <T> HPResponse<T> error(CodeMsg codeMsg){
        return new  HPResponse<T>(codeMsg);
    }
    private HPResponse(T data) {
        this.code = 200;
        this.msg = "success";
        this.data = data;
    }

    private HPResponse() {
        this.code = 200;
        this.msg = "success";
    }

    private HPResponse(CodeMsg codeMsg) {
        if(codeMsg == null) {
            return;
        }
        this.code = codeMsg.getCode();
        this.msg = codeMsg.getMsg();
    }
    public int getCode() {
        return code;
    }
    public String getMsg() {
        return msg;
    }
    public T getData() {
        return data;
    }


}

           

12、自定義傳回錯誤碼工具類

package com.hp.rocket.common;

import lombok.Getter;

@Getter
public class CodeMsg {

    private int code;
    private String msg;

    public static CodeMsg SERVER_ERROR = new CodeMsg(50000, "服務端異常");
    public static CodeMsg SYSTEM_ERROR = new CodeMsg(40000, "%s");

    private CodeMsg(int code, String msg) {
        this.code = code;
        this.msg = msg;
    }

    public CodeMsg fillArgs(Object ... args){
        String message = String.format(this.msg,args);
        return new CodeMsg(this.code,message);
    }


}

           

啟動類我就不貼了,springboot項目自帶,覺得好用的就在我的GitHub點個星☆,謝謝

源碼:https://github.com/hlposvseq/springboot-rocketmq-