官方:消息中間件是基于隊列與消息傳遞技術,在網絡環境中為應用系統提供同步或異步、可靠的消息傳輸的支撐性軟體系統 。
-
- 我個人覺得消息中間件也可以用共享單車模式形容,平台将共享單車放在固定的某些地方,然後大家有需要的自己用對應的app掃描對應的二維碼就可以開鎖使用。
- 固定地方就像是中間件存放消息的地方Broker,共享單車就是Message消息,平台-Producer生産者,大家-Consumer消費者,而對應的app就像是Topic(如果用美團app掃描,就是使用美團單車,如果用支付寶掃描,就是使用哈羅單車),每個單車的編号其實就是組成Topic的更小單元Queue。
大概的執行邏輯是這樣滴:

為什麼要使用消息隊列?
主要的應用場景解耦、異步、削峰
經常會有人提及到“消息中間件/消息隊列”,那這到底是個啥呢?為什麼要使用消息隊列?主要的應用場景解耦、異步、削峰
解耦:
-
- 傳統模式的系統間耦合性太強,比如系統A在代碼中直接調用系統B和系統C的代碼,如果将來D系統接入,系統A還需要修改代碼,過于麻煩!
- 中間件模式的優點是将消息寫入消息隊列,需要消息的系統自己從消息隊列中訂閱,進而系統A不需要做任何修改。
異步:
-
- 傳統模式的一些非必要的業務邏輯以同步的方式運作,太耗費時間。
- 中間件模式的優點是将消息寫入消息隊列,非必要的業務邏輯以異步的方式運作,加快響應速度
削峰:
-
- 傳統模式并發量大的時候,所有的請求直接怼到資料庫,造成資料庫連接配接異常
- 中間件模式系統可以慢慢的按照資料庫能處理的并發量,從消息隊列中慢慢拉取消息。在生産中,這個短暫的高峰期積壓是允許的。
上代碼瞅瞅,我之前項目上用過的
1、引入依賴:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.8.Final</version>
</dependency>
2、yml配置檔案添加生産者屬性:
#配置rocketmq
rocketmq:
producer:
producerId: ***** #生産者id(舊版本是生産者id,新版本是groupid),替換成自己的
msgTopic: alarmStatus #生産主題,替換成自己的
accessKey: XXX #連接配接通道,替換成自己的
secretKey: XXX #連接配接秘鑰,替換成自己的
onsAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet #生産者ons接入域名,替換成自己的
3、 初始化生産者:
package com.iwhalecloud.citybrain.ducha.ism.utils;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
/**
* rocketmq生産者啟動初始化類
* @author lkf
* @Date 2021年8月9日
*
*/
@Component
public class RocketmqProducerInit {
@Value("${rocketmq.producer.producerId}")
private String producerId;
@Value("${rocketmq.producer.accessKey}")
private String accessKey;
@Value("${rocketmq.producer.secretKey}")
private String secretKey;
@Value("${rocketmq.producer.onsAddr}")
private String ONSAddr;
private static Producer producer;
/*
//當無法注入執行個體的時候可以使用此方法進行執行個體初始化
private static class ProducerHolder {
private static final RocketmqProducerInit INSTANCE = new RocketmqProducerInit();
}
private RocketmqProducerInit (){
}
public static final RocketmqProducerInit getInstance() {
return ProducerHolder.INSTANCE;
}*/
@PostConstruct
public void init(){
System.out.println("初始化啟動生産者!");
// producer 執行個體配置初始化
Properties properties = new Properties();
//您在控制台建立的Producer ID
properties.setProperty(PropertyKeyConst.GROUP_ID, producerId);
// AccessKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
// SecretKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
//設定發送逾時時間,機關毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設定 TCP 接入域名(此處以公共雲生産環境為例),設定 TCP 接入域名,進入 MQ 控制台的消費者管理頁面,在左側操作欄單擊擷取接入點擷取
properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
producer = ONSFactory.createProducer(properties);
// 在發送消息前,初始化調用start方法來啟動Producer,隻需調用一次即可,當項目關閉時,自動shutdown
producer.start();
}
/**
* 初始化生産者
* @return
*/
public Producer getProducer(){
return producer;
}
}
4、發送消息:
package com.iwhalecloud.citybrain.ducha.ism.utils;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.sixmonth.rocketmq.common.rocketmq.init.RocketmqProducerInit;
/**
* 消息生産者,可與消費者分離
* @author lkf
* @Date 2021年8月9日
*
*/
@Service
public class RocketmqProducerService {
private Logger logger = LoggerFactory.getLogger(RocketmqProducerService.class);
@Value("${rocketmq.producer.msgTopic}")
private String msgTopic;
@Autowired
private RocketmqProducerInit rocketmqProducerInit;
public String tag = "*";//生産标簽,可自定義,預設通配
/**
* 異步發送消息
* 可靠異步發送:發送方發出資料後,不等接收方發回響應,接着發送下個資料包的通訊方式;
* 特點:速度快;有結果回報;資料可靠;
* 應用場景:異步發送一般用于鍊路耗時較長,對 rt響應時間較為敏感的業務場景,例如使用者視訊上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等;
* @param msg
* @return
*/
public boolean sendMsgAsy(String msg) {
Long startTime = System.currentTimeMillis();
Message message = new Message(msgTopic, tag, msg.getBytes());
rocketmqProducerInit.getProducer().sendAsync(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
///消息發送成功
System.out.println("send message success. topic=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
//消息發送失敗
System.out.println("send message failed. execption=" + context.getException());
}
});
Long endTime = System.currentTimeMillis();
System.out.println("單次生産耗時:"+(endTime-startTime)/1000);
return true;
}
}
5、消費者初始化:
package com.iwhalecloud.citybrain.ducha.ism.utils;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
@Component
public class RocketMQ3 {
private static Logger log = LoggerFactory.getLogger(RocketMQ3.class);
/**
* 建立的Consumer 對象為線程安全的,可以在多線程間進行共享,避免每個線程建立一個執行個體。
*/
private static Consumer consumer;
/**
* 消費警情資料
*
*/
@PostConstruct
public void init() {
log.info("初始化啟動消費者");
Properties p = new Properties();
// accessKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立
p.setProperty(PropertyKeyConst.AccessKey, "****");
//您在控制台建立的 CONSUMER_ID
p.setProperty(PropertyKeyConst.ConsumerId, "****");//CID_alarmInfo_jwzt
// secretKey 阿裡雲身份驗證,在阿裡雲伺服器管理控制台建立//rYwdAimwMeov5xEKLkOBHq1V3I89lc
p.setProperty(PropertyKeyConst.SecretKey, "*******");//kbpnLBVkSWUjCr1RPaHLa23MaLcaMQ
// 設定 TCP 接入域名,進入控制台的執行個體管理頁面的“擷取接入點資訊”區域檢視
// p.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
p.setProperty(PropertyKeyConst.NAMESRV_ADDR, "15.***.***.242:8080");
//設定發送逾時時間,機關毫秒
p.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
consumer = ONSFactory.createConsumer(p);
//訂閱多個 Tag alarmStatus
consumer.subscribe("alarmStatus", "*", new RocketMQListener());
consumer.start();
log.info("消費者啟動成功");
}
}
6、消費者消費/處理消息:
package com.iwhalecloud.citybrain.ducha.ism.utils;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.iwhalecloud.citybrain.ducha.ism.mapper.PoliceOrgMapper;
import com.iwhalecloud.citybrain.ducha.ism.service.impl.JWZTUtilsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
@Service
public class RocketMQListener implements MessageListener {
private static Logger log = LoggerFactory.getLogger(RocketMQListener.class);
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
log.info("Receive: {}", message);
String messageBodyStr = new String(message.getBody());
log.info("message消息為: {}", messageBodyStr);
System.out.println("message消息為:" + messageBodyStr);
try {
JSONObject jsonObject = JSONObject.parseObject(messageBodyStr);
//資料處理
//。。。。
//請求接收日志
} catch (Exception e) {
log.error("MQ警情事件資料錄字元串格式異常Exception", e);
// 記錄字元串格式不正确
return Action.CommitMessage;
}
return Action.CommitMessage;
}
}
簡單的使用大概就是這樣的,其他的比如
- 消息隊列如何選型?
- 如何保證消息隊列是高可用的?
- 如何保證消息不被重複消費?
- 如何保證消費的可靠性傳輸?
- 如何保證消息的順序性?
- 生産端和消費端的叢集負載均衡
想了解更多相關知識的,可以去這兒