天天看點

經常會有人提及到“消息中間件/消息隊列”,那這到底是個啥呢?為什麼要使用消息隊列?主要的應用場景解耦、異步、削峰

官方:消息中間件是基于隊列與消息傳遞技術,在網絡環境中為應用系統提供同步或異步、可靠的消息傳輸的支撐性軟體系統 。

    • 我個人覺得消息中間件也可以用共享單車模式形容,平台将共享單車放在固定的某些地方,然後大家有需要的自己用對應的app掃描對應的二維碼就可以開鎖使用。
    • 固定地方就像是中間件存放消息的地方Broker,共享單車就是Message消息,平台-Producer生産者,大家-Consumer消費者,而對應的app就像是Topic(如果用美團app掃描,就是使用美團單車,如果用支付寶掃描,就是使用哈羅單車),每個單車的編号其實就是組成Topic的更小單元Queue。

大概的執行邏輯是這樣滴:

經常會有人提及到“消息中間件/消息隊列”,那這到底是個啥呢?為什麼要使用消息隊列?主要的應用場景解耦、異步、削峰

為什麼要使用消息隊列?

主要的應用場景解耦、異步、削峰
經常會有人提及到“消息中間件/消息隊列”,那這到底是個啥呢?為什麼要使用消息隊列?主要的應用場景解耦、異步、削峰

解耦:

    • 傳統模式的系統間耦合性太強,比如系統A在代碼中直接調用系統B和系統C的代碼,如果将來D系統接入,系統A還需要修改代碼,過于麻煩!
    • 中間件模式的優點是将消息寫入消息隊列,需要消息的系統自己從消息隊列中訂閱,進而系統A不需要做任何修改。

異步:

    • 傳統模式的一些非必要的業務邏輯以同步的方式運作,太耗費時間。
    • 中間件模式的優點是将消息寫入消息隊列,非必要的業務邏輯以異步的方式運作,加快響應速度

削峰:

    • 傳統模式并發量大的時候,所有的請求直接怼到資料庫,造成資料庫連接配接異常
    • 中間件模式系統可以慢慢的按照資料庫能處理的并發量,從消息隊列中慢慢拉取消息。在生産中,這個短暫的高峰期積壓是允許的。

上代碼瞅瞅,我之前項目上用過的

1、引入依賴:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.8.Final</version>
</dependency>      

2、yml配置檔案添加生産者屬性:

#配置rocketmq      
rocketmq:
  producer:
  producerId: ***** #生産者id(舊版本是生産者id,新版本是groupid),替換成自己的
  msgTopic: alarmStatus #生産主題,替換成自己的
  accessKey: XXX  #連接配接通道,替換成自己的
  secretKey: XXX  #連接配接秘鑰,替換成自己的
  onsAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet  #生産者ons接入域名,替換成自己的      

3、 初始化生産者:

package com.iwhalecloud.citybrain.ducha.ism.utils;
 
import java.util.Properties;
 
import javax.annotation.PostConstruct;
 
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
 
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
 
/**
 * rocketmq生産者啟動初始化類
 * @author lkf
 * @Date 2021年8月9日
 *
 */
@Component
public class RocketmqProducerInit {
 
   @Value("${rocketmq.producer.producerId}")
   private String producerId;
  
   @Value("${rocketmq.producer.accessKey}")
   private String accessKey;
  
   @Value("${rocketmq.producer.secretKey}")
   private String secretKey;
  
   @Value("${rocketmq.producer.onsAddr}")
   private String ONSAddr;
  
   private static Producer producer;
  
   /* 
   
   //當無法注入執行個體的時候可以使用此方法進行執行個體初始化
   private static class ProducerHolder {
     private static final RocketmqProducerInit INSTANCE = new RocketmqProducerInit();
   }
  
   private RocketmqProducerInit (){
    
   }
  
   public static final RocketmqProducerInit getInstance() {
     return ProducerHolder.INSTANCE;
   }*/
  
   @PostConstruct
   public void init(){
    System.out.println("初始化啟動生産者!");
     // producer 執行個體配置初始化
     Properties properties = new Properties();
     //您在控制台建立的Producer ID
     properties.setProperty(PropertyKeyConst.GROUP_ID, producerId);
     // AccessKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
     properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
     // SecretKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
     properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
     //設定發送逾時時間,機關毫秒
     properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
     // 設定 TCP 接入域名(此處以公共雲生産環境為例),設定 TCP 接入域名,進入 MQ 控制台的消費者管理頁面,在左側操作欄單擊擷取接入點擷取
     properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
     producer = ONSFactory.createProducer(properties);
     // 在發送消息前,初始化調用start方法來啟動Producer,隻需調用一次即可,當項目關閉時,自動shutdown
     producer.start();
   }
  
   /**
    * 初始化生産者
    * @return
    */
   public Producer getProducer(){
     return producer;
   }
}      

4、發送消息:

package com.iwhalecloud.citybrain.ducha.ism.utils;
 
import java.util.Date;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
 
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.sixmonth.rocketmq.common.rocketmq.init.RocketmqProducerInit;
 
/**
 * 消息生産者,可與消費者分離
 * @author lkf
 * @Date 2021年8月9日
 *
 */
@Service
public class RocketmqProducerService {
 
 private Logger logger = LoggerFactory.getLogger(RocketmqProducerService.class);
 
 @Value("${rocketmq.producer.msgTopic}")
 private String msgTopic;
 
 @Autowired
 private RocketmqProducerInit rocketmqProducerInit;
 
 public String tag = "*";//生産标簽,可自定義,預設通配
   
   /**
    * 異步發送消息
    * 可靠異步發送:發送方發出資料後,不等接收方發回響應,接着發送下個資料包的通訊方式;
    * 特點:速度快;有結果回報;資料可靠;
    * 應用場景:異步發送一般用于鍊路耗時較長,對 rt響應時間較為敏感的業務場景,例如使用者視訊上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等;
    * @param msg
    * @return
    */
   public boolean sendMsgAsy(String msg) {
    Long startTime = System.currentTimeMillis();
    Message message = new Message(msgTopic, tag, msg.getBytes());
    rocketmqProducerInit.getProducer().sendAsync(message, new SendCallback() {
   @Override
   public void onSuccess(SendResult sendResult) {
    ///消息發送成功
             System.out.println("send message success. topic=" + sendResult.getMessageId());
   }
 
   @Override
   public void onException(OnExceptionContext context) {
    //消息發送失敗
    System.out.println("send message failed. execption=" + context.getException());
   }
    });
       Long endTime = System.currentTimeMillis();
    System.out.println("單次生産耗時:"+(endTime-startTime)/1000);
    return true;
   }
   
}      

5、消費者初始化:

package com.iwhalecloud.citybrain.ducha.ism.utils;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
@Component
public class RocketMQ3 {
    private static Logger log = LoggerFactory.getLogger(RocketMQ3.class);
    /**
     * 建立的Consumer 對象為線程安全的,可以在多線程間進行共享,避免每個線程建立一個執行個體。
     */
    private static Consumer consumer;
    /**
     * 消費警情資料
     *
     */
    @PostConstruct
    public void init() {
        log.info("初始化啟動消費者");
        Properties p = new Properties();
        // accessKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
        p.setProperty(PropertyKeyConst.AccessKey, "****");
        //您在控制台建立的 CONSUMER_ID
        p.setProperty(PropertyKeyConst.ConsumerId, "****");//CID_alarmInfo_jwzt
        // secretKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立//rYwdAimwMeov5xEKLkOBHq1V3I89lc
        p.setProperty(PropertyKeyConst.SecretKey, "*******");//kbpnLBVkSWUjCr1RPaHLa23MaLcaMQ
        // 設定 TCP 接入域名,進入控制台的執行個體管理頁面的“擷取接入點資訊”區域檢視
//        p.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
        p.setProperty(PropertyKeyConst.NAMESRV_ADDR, "15.***.***.242:8080");
        //設定發送逾時時間,機關毫秒
        p.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        consumer = ONSFactory.createConsumer(p);
        //訂閱多個 Tag  alarmStatus
        consumer.subscribe("alarmStatus", "*", new RocketMQListener());
        consumer.start();
        log.info("消費者啟動成功");
    }
}      

6、消費者消費/處理消息:

package com.iwhalecloud.citybrain.ducha.ism.utils;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.iwhalecloud.citybrain.ducha.ism.mapper.PoliceOrgMapper;
import com.iwhalecloud.citybrain.ducha.ism.service.impl.JWZTUtilsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
@Service
public class RocketMQListener implements MessageListener {
    private static Logger log = LoggerFactory.getLogger(RocketMQListener.class);
    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        log.info("Receive: {}", message);
        String messageBodyStr = new String(message.getBody());
        log.info("message消息為: {}", messageBodyStr);
        System.out.println("message消息為:" + messageBodyStr);
        try {
            JSONObject jsonObject = JSONObject.parseObject(messageBodyStr);
            //資料處理
            //。。。。
            //請求接收日志
        } catch (Exception e) {
            log.error("MQ警情事件資料錄字元串格式異常Exception", e);
            // 記錄字元串格式不正确
            return Action.CommitMessage;
        }
        return Action.CommitMessage;
    }
}      

簡單的使用大概就是這樣的,其他的比如

  1. 消息隊列如何選型?
  2. 如何保證消息隊列是高可用的?
  3. 如何保證消息不被重複消費?
  4. 如何保證消費的可靠性傳輸?
  5. 如何保證消息的順序性?
  6. 生産端和消費端的叢集負載均衡

想了解更多相關知識的,可以去這兒