天天看點

Spring boot實戰項目整合阿裡雲RocketMQ 消息隊列實作發送普通消息,延時消息 --附代碼

一.為什麼選擇RocketMQ消息隊列?(可跳過看三的整合代碼執行個體)

  • 首先RocketMQ是阿裡巴巴自研出來的,也已開源。其性能和穩定性從雙11就能看出來,借用阿裡的一句官方介紹:曆年雙 11 購物狂歡節零點千萬級 TPS、萬億級資料洪峰,創造了全球最大的業務消息并發以及流轉紀錄(日志類消息除外); 
  • 在始終保證高性能前提下,支援億級消息堆積,不影響叢集的正常服務,在削峰填谷(蓄洪)、微服務解耦的場景下尤為重要;這,就能說明RocketMQ的強大。

二.RocketMQ的特點和優勢(可跳過看三的整合代碼執行個體)

  • 削峰填谷(主要解決諸如秒殺、搶紅包、企業開門紅等大型活動時皆會帶來較高的流量脈沖,或因沒做相應的保護而導緻系統超負荷甚至崩潰,或因限制太過導緻請求大量失敗而影響使用者體驗,海量消息堆積能力強)
  • Spring boot實戰項目整合阿裡雲RocketMQ 消息隊列實作發送普通消息,延時消息 --附代碼
  • 異步解耦(高可用松耦合架構設計,對高依賴的項目之間進行解耦,當下遊系統出現當機,不會影響上遊系統的正常運作,或者雪崩)  
  • Spring boot實戰項目整合阿裡雲RocketMQ 消息隊列實作發送普通消息,延時消息 --附代碼
  • 順序消息(順序消息即保證消息的先進先出,比如證券交易過程時間優先原則,交易系統中的訂單建立、支付、退款等流程,航班中的旅客登機消息處理等)
    Spring boot實戰項目整合阿裡雲RocketMQ 消息隊列實作發送普通消息,延時消息 --附代碼
  • 分布式事務消息(確定資料的最終一緻性,大量引入 MQ 的分布式事務,既可以實作系統之間的解耦,又可以保證最終的資料一緻性,減少系統間的互動)
    Spring boot實戰項目整合阿裡雲RocketMQ 消息隊列實作發送普通消息,延時消息 --附代碼

三.SpringBoot 整合RocketMQ(商業雲端版)

  • 首先去阿裡雲控制台建立所需消息隊列資源,包括消息隊列 RocketMQ 的執行個體、Topic、Group ID (GID),以及鑒權需要的 AccessKey(AK)。
  • 在springboot項目pom.xml添加需要的依賴 ons-client v1.8.0.Final
    <!-- RocketMQ -->
     <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>ons-client</artifactId>
        <version>1.8.0.Final</version>
     </dependency>        
  • 在對應環境的application-xx.properties檔案配置參數
    ##-------鑒權需要的 AccessKey(AK)(實際項目,這裡填寫阿裡雲自己的賬号資訊)---
    rocketmq.accessKey=xxAxxxxxxxxxx
    rocketmq.secretKey=xxxxxxxxxiHxxxxxxxxxxxxxx
    ## 執行個體TCP 協定公網接入位址(實際項目,填寫自己阿裡雲MQ的公網位址)
    rocketmq.nameSrvAddr=http://MQ_INST_***********85_BbM********************yuncs.com:80
    #普通消息topic (實際項目,填寫自己阿裡雲MQ中的topic名稱和groupid)
    rocketmq.topic=common
    rocketmq.groupId=GID-message
    rocketmq.tag=*
    #定時/延時消息
    rocketmq.timeTopic=time-lapse
    rocketmq.timeGroupId=GID-message
    rocketmq.timeTag=*      
  • 封裝MQ配置類:MqConfig
    Spring boot實戰項目整合阿裡雲RocketMQ 消息隊列實作發送普通消息,延時消息 --附代碼
    Spring boot實戰項目整合阿裡雲RocketMQ 消息隊列實作發送普通消息,延時消息 --附代碼
    /**
     * MQ配置加載
     * @author laifuwei
     */
    @Configuration
    @ConfigurationProperties(prefix = "rocketmq")
    public class MqConfig {
    
        private String accessKey;
        private String secretKey;
        private String nameSrvAddr;
        private String topic;
        private String groupId;
        private String tag;
        private String timeTopic;
        private String timeGroupId;
        private String timeTag;
    
        public Properties getMqPropertie() {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
            properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
            properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
            //設定發送逾時時間,機關毫秒
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "4000");
            return properties;
        }
    
        public String getAccessKey() {
            return accessKey;
        }
    
        public void setAccessKey(String accessKey) {
            this.accessKey = accessKey;
        }
    
        public String getSecretKey() {
            return secretKey;
        }
    
        public void setSecretKey(String secretKey) {
            this.secretKey = secretKey;
        }
    
        public String getNameSrvAddr() {
            return nameSrvAddr;
        }
    
        public void setNameSrvAddr(String nameSrvAddr) {
            this.nameSrvAddr = nameSrvAddr;
        }
    
        public String getTopic() {
            return topic;
        }
    
        public void setTopic(String topic) {
            this.topic = topic;
        }
    
        public String getGroupId() {
            return groupId;
        }
    
        public void setGroupId(String groupId) {
            this.groupId = groupId;
        }
    
        public String getTag() {
            return tag;
        }
    
        public void setTag(String tag) {
            this.tag = tag;
        }
    
        public String getTimeTopic() {
            return timeTopic;
        }
    
        public void setTimeTopic(String timeTopic) {
            this.timeTopic = timeTopic;
        }
    
        public String getTimeGroupId() {
            return timeGroupId;
        }
    
        public void setTimeGroupId(String timeGroupId) {
            this.timeGroupId = timeGroupId;
        }
    
        public String getTimeTag() {
            return timeTag;
        }
    
        public void setTimeTag(String timeTag) {
            this.timeTag = timeTag;
        }
    
    }      
    View Code
  • 給消息生産者注入配置資訊,

    ProducerBean

    用于将

    Producer

    內建至Spring Bean中
    /**
     * MQ配置注入生成消息執行個體
     */
    @Configuration
    public class ProducerClient {
    
        @Autowired
        private MqConfig mqConfig;
        
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public ProducerBean buildProducer() {
            //ProducerBean用于将Producer內建至Spring Bean中
            ProducerBean producer = new ProducerBean();
            producer.setProperties(mqConfig.getMqPropertie());
            return producer;
        }
    }      
  • 為了友善使用,我封裝了一個發送消息的類,消息的Message參數和配置,看代碼注釋,很容易了解
    /**
     * MQ發送消息助手
     * @author laifuwei
     */
    @Component
    public class ProducerUtil {
        
        
        private Logger logger = LoggerFactory.getLogger(ProducerUtil.class);
        
        @Autowired
        private MqConfig config;
        
        @Autowired
        private ProducerBean producer;
        
        
        /**
         * 同步發送消息
         * @param msgTag 标簽,可用于消息小分類标注
         * @param messageBody 消息body内容,生産者自定義内容
         * @param msgKey 消息key值,建議設定全局唯一,可不傳,不影響消息投遞
         * @return success:SendResult or error:null
         */
        public SendResult sendMsg(String msgTag,byte[] messageBody,String msgKey) {
            Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);
            return this.send(msg,Boolean.FALSE);
        }
        /**
         * 同步發送定時/延時消息
         * @param msgTag 标簽,可用于消息小分類标注,對消息進行再歸類
         * @param messageBody 消息body内容,生産者自定義内容,二進制形式的資料
         * @param msgKey 消息key值,建議設定全局唯一值,可不設定,不影響消息收發
         * @param delayTime 服務端發送消息時間,立即發送輸入0或比更早的時間
         * @return success:SendResult or error:null
         */
        public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) {
            Message msg = new Message(config.getTimeTopic(),msgTag,msgKey,messageBody);
            msg.setStartDeliverTime(delayTime);
            return this.send(msg,Boolean.FALSE);
        }
        /**
         * 發送單向消息
         */
        public void sendOneWayMsg(String msgTag,byte[] messageBody,String msgKey) {
            Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);
            this.send(msg,Boolean.TRUE);
        }
        
        /**
         * 普通消息發送發放
         * @param msg 消息
         * @param isOneWay 是否單向發送
         */
        private SendResult send(Message msg,Boolean isOneWay) {
            try {
                if(isOneWay) {
                    //由于在 oneway 方式發送消息時沒有請求應答處理,一旦出現消息發送失敗,則會因為沒有重試而導緻資料丢失。
                    //若資料不可丢,建議選用同步或異步發送方式。
                    producer.sendOneway(msg);
                    success(msg, "單向消息MsgId不傳回");
                    return null;
                }else {
                    //可靠同步發送
                    SendResult sendResult = producer.send(msg);
                       //擷取發送結果,不抛異常即發送成功
                    if (sendResult != null) {
                       success(msg, sendResult.getMessageId());
                       return sendResult;
                    }else {
                       error(msg,null);
                       return null;
                    }
                }
            } catch (Exception e) {
                error(msg,e);
                return null;
            }
        }
        
        //對于使用異步接口,可設定單獨的回調處理線程池,擁有更靈活的配置和監控能力。
        //根據項目需要,伺服器配置合理設定線程數,線程太多有OOM 風險,
        private ExecutorService threads = Executors.newFixedThreadPool(3);
        //僅建議執行輕量級的Callback任務,避免阻塞公共線程池 引起其它鍊路逾時。
        
        /**
         * 異步發送普通消息
         * @param msgTag
         * @param messageBody
         * @param msgKey
         */
        public void sendAsyncMsg(String msgTag,byte[] messageBody,String msgKey) {
            producer.setCallbackExecutor(threads);
            
            Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);
             try {
                 producer.sendAsync(msg, new SendCallback() {
                     @Override
                     public void onSuccess(final SendResult sendResult) {
                         assert sendResult != null;
                         success(msg, sendResult.getMessageId());
                     }
                     @Override
                     public void onException(final OnExceptionContext context) {
                         //出現異常意味着發送失敗,為了避免消息丢失,建議緩存該消息然後進行重試。
                         error(msg,context.getException());
                     }
                 });
             } catch (ONSClientException e) {
                 error(msg,e);
             }
        }
        
        
        //--------------日志列印----------
        private void error(Message msg,Exception e) {
            logger.error("發送MQ消息失敗-- Topic:{}, Key:{}, tag:{}, body:{}"
                     ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()));
            logger.error("errorMsg --- {}",e.getMessage());
        }
        private void success(Message msg,String messageId) {
            logger.info("發送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
                    ,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody()));
        }
        
    }      
  • 注入封裝的發送消息util類,在業務系統需要的地方調用來發送消息即可
    //這裡直接使用上面封裝的發送消息util類
        @Autowired
        private ProducerUtil producer;
    
    
    
       /**
         * 示範方法,可在自己的業務系統方法中進行發送消息
         */
        public String mqTest() {
            /*  使用前面封裝的方法,傳入對應的參數即可發送消息
             *  msgTag 标簽,可用于消息小分類标注
             *  messageBody 消息body内容,生産者自定義内容,任何二進制資料,生産者和消費者協定資料的序列化和反序列化
             *  msgKey 消息key值,建議設定全局唯一,比如訂單号,使用者id這種,可不傳,不影響消息投遞
             */
            //body内容自定義
            JSONObject body = new JSONObject();
            body.put("userId", "this is userId");
            body.put("notice", "同步消息");
            //同步發送消息
            producer.sendMsg("userMessage", body.toJSONString().getBytes(), "messageId");
            //單向消息
            producer.sendOneWayMsg("userMessage", "單向消息".getBytes(), "messageId");
            //異步消息
            producer.sendAsyncMsg("userMessage", "異步消息".getBytes(), "messageId");
            //定時/延時消息,目前時間的30秒後推送。時間自己定義
            producer.sendTimeMsg("userMessage", "延時消息".getBytes(), "messageId", System.currentTimeMillis()+30000);
            //順序消息(全局順序 / 分區順序)、分布式事務消息 目前沒用到,可看官網說明操作
            return "ok";
        }      
  • 接下來是消息消費者的配置和接收消息(一般在下遊系統或者相關聯的系統),接收消息的項目照舊,添加依賴jar包 ons-client v1.8.0.Final 、配置mq參數連結(mq的配置檔案參數要和生産者項目配置的一樣)、添加MqConfig類(上面有寫)
  • 注入配置、訂閱消息、添加消息處理的方法
    @Configuration
    public class ConsumerClient {
    
        @Autowired
        private MqConfig mqConfig;
    
        //普通消息監聽器,Consumer注冊消息監聽器來訂閱消息. 
        @Autowired
        private MqMessageListener messageListener;
        
        //定時消息監聽器,Consumer注冊消息監聽器來訂閱消息. 
        @Autowired
        private MqTimeMessageListener timeMessageListener;
    
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public ConsumerBean buildConsumer() {
            ConsumerBean consumerBean = new ConsumerBean();
            //配置檔案
            Properties properties = mqConfig.getMqPropertie();
            properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
            //将消費者線程數固定為20個 20為預設值
            properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
            consumerBean.setProperties(properties);
            //訂閱消息
            Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
            //訂閱普通消息
            Subscription subscription = new Subscription();
            subscription.setTopic(mqConfig.getTopic());
            subscription.setExpression(mqConfig.getTag());
            subscriptionTable.put(subscription, messageListener);
            //訂閱定時/延時消息
            Subscription subscriptionTime = new Subscription();
            subscriptionTime.setTopic(mqConfig.getTimeTopic());
            subscriptionTime.setExpression(mqConfig.getTimeTag());
            subscriptionTable.put(subscriptionTime, timeMessageListener);
    
            consumerBean.setSubscriptionTable(subscriptionTable);
            return consumerBean;
        }
    
    }      
  • 對定時/延時消息監聽類進行實作,處理接收到的消息
    /**
     * 定時/延時MQ消息監聽消費
     * @author laifuwei
     */
    @Component
    public class MqTimeMessageListener implements MessageListener {
    
        private Logger logger = LoggerFactory.getLogger(this.getClass());
        
        //實作MessageListtener監聽器的消費方法
        @Override
        public Action consume(Message message, ConsumeContext context) {      

         logger.info("接收到MQ消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",

                     message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody()));

      try {

    String msgTag = message.getTag();//消息類型
                String msgKey = message.getKey();//業務唯一id
                switch (msgTag) {
                //----通過生産者傳的tag标簽進行消息分類和過濾處理
                case "userMessage":
                    //通過唯一key的,比如前面key傳的值是訂單号或者使用者id這種唯一值,來進行資料的查詢或處理
                    //由于RocketMQ能重複推送消息,處理消息的時候做好資料的幂等,防止重複處理
                    if(//如訂單系統需要判斷訂單是否被處理過等,通過傳的msgKey即訂單号去查詢資料庫進行判斷) {
    break;
                    }
                    //驗證通過,處理業務
                    //do something
                    break;
                }
                //消費成功,繼續消費下一條消息
                return Action.CommitMessage;
            } catch (Exception e) {
                logger.error("消費MQ消息失敗! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
                //消費失敗,告知伺服器稍後再投遞這條消息,繼續消費其他消息
                return Action.ReconsumeLater;
            }
        }
        
    }      
  • 對普通消息進行監聽,消費消息
    /**
     * 普通(預設同步)MQ消息監聽消費
     * @author laifuwei
     */
    @Component
    public class MqMessageListener implements MessageListener {
    
        private Logger logger = LoggerFactory.getLogger(this.getClass());
        
        @Override
        public Action consume(Message message, ConsumeContext context) {
            logger.info("接收到MQ消息. Topic :" + message.getTopic() + ", tag :" + message.getTag()+ " msgId : " + message.getMsgID()+", Key :" + message.getKey()+", body:" + new String(message.getBody()));
            try {
                String msgTag = message.getTag();//消息類型
                String msgKey = message.getKey();//唯一key
                switch (msgTag) {
                //--------普通通知
                case "userMessage":
    
                    break;
                }
                return Action.CommitMessage;
            } catch (Exception e) {
                logger.error("消費MQ消息失敗! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
                return Action.ReconsumeLater;
            }
        }
        
    }      

 四.最後運作消費者項目和生産者項目,調用生産者項目發送消息驗證效果:

  • 生産者發送消息結果日志:消息發送正常
    2019-08-17 15:11:06.837 INFO 9996 --- [nio-8080-exec-9] com.dyj.shop.mq.ProducerUtil : 發送MQ消息成功 -- Topic:common ,msgId:C0A86532270C2A139A5555A7E5DD0000 , Key:messageId, tag:userMessage, body:{"userId":"this is userId","notice":"同步消息"}
    2019-08-17 15:11:06.841 INFO 9996 --- [nio-8080-exec-9] com.dyj.shop.mq.ProducerUtil : 發送MQ消息成功 -- Topic:common ,msgId:單向消息MsgId不傳回 , Key:messageId, tag:userMessage, body:單向消息
    2019-08-17 15:11:06.901 INFO 9996 --- [pool-6-thread-1] com.dyj.shop.mq.ProducerUtil : 發送MQ消息成功 -- Topic:common ,msgId:C0A86532270C2A139A5555A7E6630004 , Key:messageId, tag:userMessage, body:異步消息
    2019-08-17 15:11:07.060 INFO 9996 --- [nio-8080-exec-9] com.dyj.shop.mq.ProducerUtil : 發送MQ消息成功 -- Topic:time-lapse ,msgId:C0A86532270C2A139A5555A7E69F0006 , Key:messageId, tag:userMessage, body:定時/延時消息       
  • 消費者接收到消息,可以看到普通消息的發送時間和接收到消息的時間,就相差幾毫秒,值得注意的是:延時消息按照生産者定義的30秒後消費者才收到。這就是延時消息的好玩之處
    2019-08-17 15:11:06.881 INFO 10942 --- [MessageThread_7] com.dyj.timer.mq.MqMessageListener : 接收到MQ消息. Topic :common, tag :userMessage msgId : C0A86532270C2A139A5555A7E5DD0000, Key :messageId, body:{"userId":"this is userId","notice":"同步消息"}
    2019-08-17 15:11:06.934 INFO 10942 --- [MessageThread_8] com.dyj.timer.mq.MqMessageListener : 接收到MQ消息. Topic :common, tag :userMessage msgId : C0A86532270C2A139A5555A7E6550002, Key :messageId, body:單向消息
    2019-08-17 15:11:06.947 INFO 10942 --- [MessageThread_9] com.dyj.timer.mq.MqMessageListener : 接收到MQ消息. Topic :common, tag :userMessage msgId : C0A86532270C2A139A5555A7E6630004, Key :messageId, body:異步消息
    2019-08-17 15:11:36.996 INFO 10942 --- [essageThread_10] com.dyj.timer.mq.MqTimeMessageListener : 接收到MQ消息. Topic :time-lapse, tag :userMessage msgId : cd900e16f7cba68369ec498ae2f9dd6c, Key :messageId, body:定時/延時消      

寫在最後:有不妥或有興趣的可以下方留言,多謝指教(#^-^#)