SpringCloud-Alibaba之RocketMQ
RocketMQ概述
RocketMQ是一款由阿裡研發的高性能高可靠性的分布式消息隊列,使用Java語言開發,該項目已經貢獻給了Apache基金會,成為Apache的頂級開源項目。
在早期,阿裡内部使用ActiveMQ作為其消息傳遞中間件,随着業務的快速增長,基于ActiveMQ的消息隊列叢集在處理大規模業務吞吐量時會遇到IO等性能瓶頸,阿裡研發團隊曾努力優化ActiveMQ,但無奈效果不理想,緊接着他們将目光投向Kafka,不幸的是Kafka并不能滿足他們的要求, 特别是在低延遲和高可靠性方面。在這種情況下,阿裡研發團隊最終決定自己研發一個消息隊列引擎來處理更廣泛的使用場景,包括從傳統的釋出/訂閱到大批量高實時、消息零丢失的交易系統。并且将該方案向社群開放,希望可以服務更多的企業。
RocketMQ官方網址:http://rocketmq.apache.org/
目前已經有超過很多加家企業其業務中使用RocketMQ,下圖是部分使用到RocketMQ的大廠:
消息隊列的優點
簡單來說,消息隊列就是一種“先進先出”的資料結構,架構在此基礎上實作資料傳輸的高性能、消息的高可靠性和系統高可用性,是以在如今微服務大行其道的背景下,分布式消息隊列可以提供異步、解耦和消峰等功能,已經成為大型網際網路服務架構裡不可或缺的中間件。
異步:在大型電商系統,通常會将各個核心功能拆分成獨立的服務子產品,使用者的下單操作需要多個服務子產品共同完成,如果此時訂單服務要一直等到其他服務的操作完成,那麼整個下單操作耗時将很長,使用者體驗會很差。是以通常的做法就是訂單服務将資料生成後發送到消息隊列之後立刻傳回下單成功給使用者,其他依賴服務從消息隊列裡面擷取資料進行延後處理,這種延後的異步功能能夠提升系統的性能和吞吐量,同時不至于讓使用者長時間等待。
解耦:很多大型企業裡面通常會有很多的子系統,子系統之間當需要進行消息通訊時,可以通過在子系統之間架設消息隊列,這樣每個子系統隻需要關心自己訂閱的資料,子系統之間達到完全解耦,甚至各個子系統實作的語言、技術、架構等等都可以不一樣。
消峰:每年的雙十一,淘寶系統都會面臨瞬間大規模流量的沖擊,如果沒有緩沖機制,是不可能支撐得住的。通過利用消息隊列,把大量的使用者請求暫存起來,接着服務依次從隊列裡面将請求取出處理,雖然使用者的請求耗時變長了,但相比較于系統崩潰導緻的請求失敗來說,這實際上已經是提高了使用者體驗,并且最重要的是保證了系統的穩定性。
RocketMQ基本結構
Client端
- Producer Group 一類Producer的集合名稱,這類Producer通常發送一類消息,且發送邏輯一緻
- Consumer Group 一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一緻
Server端
- Broker 消息中轉角色,負責存儲消息,轉發消息,這裡就是RocketMQ Server
- Topic 消息的主題,用于定義并在服務端配置,消費者可以按照主題進行訂閱,也就是消息分類,通常一個系統一個Topic
- Message 在生産者、消費者、伺服器之間傳遞的消息,一個message必須屬于一個Topic 消息是要傳遞的資訊。郵件中必須包含一個主題,該主題可以解釋為要發送給您的信的位址。消息還可能具有可選标簽和額外的鍵值對。例如,您可以為消息設定業務密鑰,然後在代理伺服器上查找消息以在開發過程中診斷問題。
- Namesrver 一個無狀态的名稱服務,可以叢集部署,每一個broker啟動的時候都會向名稱伺服器注冊,主要是接收broker的注冊,接收用戶端的路由請求并傳回路由資訊
- Offset 偏移量,消費者拉取消息時需要知道上一次消費到了什麼位置, 這一次從哪裡開始
- Partition 分區,Topic實體上的分組,一個Topic可以分為多個分區,每個分區是一一個有序的隊列。 分區中的每條消息都會給配置設定一個有序的ID,也就是偏移量,保證了順序,消費的正确性
-
Tag 用于對消息進行過濾,了解為message的标記,同一業務不同目的的message可以用相同的topic但是 可以用不同的tag來區分
簡單來說就是用來進一步區分某個 Topic 下的消息分類。 Topic 與 Tag 最佳實踐
- key 消息的KEY字段是為了唯- -表示消息的,友善查問題,不是說必須設定,隻是說設定為了友善開發和運維定位問題。 比如:這個KEY可以是訂單ID等
我們實際上隻需要把 Topic 和 xx_Group 和 Tag 弄明白了就行
基于 Docker 安裝 RocketMQ(單機版)
必須安裝有docker環境 包括 dockerfile環境 還必須有git 伺服器 可用空閑硬碟記憶體5G以上 記憶體1.5G以上
如果沒有安裝git
下載下傳安裝配置
mkdir -p /usr/local/src/rocketmq
cd /usr/local/src/rocketmq
git clone git://github.com/foxiswho/docker-rocketmq.git
移動到brokercnf檔案夾下
cd /usr/local/src/rocketmq/docker-rocketmq/rmq/rmq/brokerconf/
修改配置檔案
vi broker.conf
30行左右位置 ,修改
namesrvAddr=192.168.81.128:9876 為主控端IP (不然在項目裡推送消息就永遠找不到服務連接配接失敗)
如果是叢集的話分号分割 namesrvAddr=192.168.81.128:9876;192.168.81.129:9876;192.168.81.130:9876
brokerIP1=192.168.25.142為主控端IP (如果不設定為目前安裝RocketMQ的ip那麼啟動就會失敗 或者通路不到頁面)
注意一定要把 #去掉
移動到
docker-compose.yml
檔案位置
cd /usr/local/src/rocketmq/docker-rocketmq/rmq
修改docker-compose.yml配置
vi docker-compose.yml
每個版本的配置都不同但是預設就行 主要就是改全部的JVM 不然虛拟機記憶體不夠使用會導緻啟動不了(測試使用,生産跳過)
-Xms128M -Xmx128M -Xmn128m
叢集的話第一個節點不用動配置檔案其他子節點隻需要 rmqnamesrv 和 rmqbroker , rmqconsole可以從配置檔案中删除就行了
運作容器
先修改 啟動檔案
vi /usr/local/src/rocketmq/docker-rocketmq/rmq/start.sh
#!/usr/bin/env bash
# 建立目錄
mkdir -p ./rmqs/logs
mkdir -p ./rmqs/store
mkdir -p ./rmq/logs
mkdir -p ./rmq/store
# 設定目錄權限
chmod -R 777 ./rmqs/logs
chmod -R 777 ./rmqs/store
chmod -R 777 ./rmq/logs
chmod -R 777 ./rmq/store
docker-compose -f ./docker-compose.yml up -d
# 顯示 rocketmq 容器
docker ps |grep rocketmq
然後啟動就行了
/usr/local/src/rocketmq/docker-rocketmq/rmq/start.sh
檢視容器狀态
打開運維頁面
http://192.168.81.128:8180
出現以上頁面代表ok了
選擇中文
添加配置
添加Maven
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
application.yml
生産者
rocketmq:
# rocketmq位址
nameServer: 192.168.81.128:9876
producer:
# 生産者 必須填寫 group
group: test-group
消費者
rocketmq:
# rocketmq位址
nameServer: 192.168.81.128:9876
如果生産者和消費者都在一個服務裡的話那麼,使用生産者的配置就行了
端口号自行根據情況添加
server:
port: 13000
其他配置根據自己需求添加…
MQ模闆工具
@Resource
private RocketMQTemplate rocketMQTemplate;
将MQ正常的操作都給封裝好了直接調用就行
消息生産者
發送普通消息(最常用)
将消息直接發送到MQ中 ,不管是否被消費 ,隻負責單純的發送消息
public void sync() {
rocketMQTemplate.convertAndSend("topic-name", "send convertAndSend message !");
}
底層還是使用 send發送的 就是發送普通消息
同步發送消息(sync)
發送消息采用同步模式,這種方式隻有在消息完全發送完成之後才傳回結果,此方式存在需要同步等待發送結果的時間代價。
這種方式具有内部重試機制,即在主動聲明本次消息發送失敗之前,内部實作将重試一定次數,預設為2次
發送的結果存在同一個消息可能被多次發送給給broker,這裡需要應用的開發者自己在消費端處理幂等性問題。
public void sync() {
SendResult sendResult = rocketMQTemplate.syncSend("topic-name", "send sync message !");
//消費者消費完畢後會傳回結果, 否則目前線程一直卡死
System.out.printf("syncSend1 to topic %s sendResult=%s %n", stringTopic, sendResult);
}
異步發送消息(async)
發送消息采用異步發送模式,消息發送後立刻傳回,當消息完全完成發送後,會調用回調函數sendCallback來告知發送者本次發送是成功或者失敗。異步模式通常用于響應時間敏感業務場景,即承受不了同步發送消息時等待傳回的耗時代價。
同同步發送一樣,異步模式也在内部實作了重試機制,預設次數為2次,發送的結果同樣存在同一個消息可能被多次發送給給broker,需要應用的開發者自己在消費端處理幂等性問題。
public void async() {
rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("send successful");
}
@Override
public void onException(Throwable throwable) {
log.info("send fail; {}", throwable.getMessage());
}
});
}
直接發送消息(one-way)
采用
one-way
發送模式發送消息的時候,發送端發送完消息後會立即傳回,不會等待來自broker的ack來告知本次消息發送是否完全完成發送(不關心發送結果)。這種方式吞吐量很大,但是存在消息丢失的風險,是以其适用于不重要的消息發送,比如日志收集。
public void oneWay() {
rocketMQTemplate.sendOneWay("topic-name", "send one-way message");
}
總結(同步,異步,直接)
在實際使用場景中,利用何種發送方式,可以總結如下:
- 當發送的消息不重要時,采用
方式,以提高吞吐量;one-way
- 當發送的消息很重要是,且對響應時間不敏感的時候采用
方式;sync
- 當發送的消息很重要,且對響應時間非常敏感的時候采用
方式;async
發送延遲消息(Delayed)
發送順序消息
消費者必須開啟接受順序消費才行
封裝後的發送消息工具類
package com.rocketmq.service.impl;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class MsgSenderTemplateUtils {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 發送普通消息
*
* @param data 消息資訊
* @param topic 主題
*/
public void sendMessage(String topic, Object data) {
rocketMQTemplate.convertAndSend(topic, data);
}
/**
* 發送普通消息
*
* @param data 消息資訊
* @param topic 主題
* @param tags 主題的标簽
*/
public void sendMessage(String topic, String tags, Object data) {
rocketMQTemplate.convertAndSend(String.format("%s:%s", topic, tags), data);
}
/**
* 發送消息(支援分布式事務)
*
* @param data 消息資訊
* @param topic 主題
*/
public void sendMessageInTransaction(String topic, Object data) {
MessageBuilder<Object> messageBuilder = MessageBuilder.withPayload(data);
messageBuilder.setHeader("msg", JSON.toJSONString(data));
rocketMQTemplate.sendMessageInTransaction(topic, messageBuilder.build(), null);
}
/**
* 發送消息(支援分布式事務)
*
* @param data 消息資訊
* @param topic 主題
* @param tags 主題的标簽
*/
public void sendMessageInTransaction(String topic, String tags, Object data) {
try {
MessageBuilder<Object> messageBuilder = MessageBuilder.withPayload(data);
messageBuilder.setHeader("msg", JSON.toJSONString(data));
System.out.println(rocketMQTemplate);
rocketMQTemplate.sendMessageInTransaction(String.format("%s:%s", topic, tags), messageBuilder.build(), null);
} catch (MessagingException e) {
e.printStackTrace();
}
}
}
示範代碼
controller
package com.rocketmq.controller;
import com.rocketmq.service.impl.MsgSenderTemplateUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
public class TestController {
@Resource
private MsgSenderTemplateUtils msgSender;
// http://localhost:13000/test
/**
* 普通消息投遞
*/
@GetMapping("/test")
public String test() {
Map<String, Object> orderInfo = new HashMap<>();
orderInfo.put("orderId", UUID.randomUUID().toString());
orderInfo.put("price", 10000);
orderInfo.put("description", "我是注冊訂單,請盡快處理");
msgSender.sendMessage("topicTest", orderInfo);
// msgSender.sendMessage("topicTest", "order", orderInfo);
return "投遞消息 => " + orderInfo + " => 成功";
}
// http://localhost:13000/test1
/**
* 普通消息投遞 帶tags
*/
@GetMapping("/test1")
public String test1() {
Map<String, Object> orderInfo = new HashMap<>();
orderInfo.put("orderId", UUID.randomUUID().toString());
orderInfo.put("price", 1000011);
orderInfo.put("description", "我是注冊訂單,請盡快處理");
msgSender.sendMessage("topicTest", "order", orderInfo);
return "投遞消息 => " + orderInfo + " => 成功2225412521512``";
}
// http://localhost:13000/test2
/**
* 分布式事務消息投遞
*/
@GetMapping("/test2")
public String test2() {
Map<String, Object> orderInfo = new HashMap<>();
orderInfo.put("orderId", UUID.randomUUID().toString());
orderInfo.put("price", 10000);
orderInfo.put("description", "我是注冊訂單,請盡快處理");
msgSender.sendMessageInTransaction("topicTest", "order", orderInfo);
return "投遞消息 => " + orderInfo + " => 成功";
}
}
注意: RocketMq 分布式事務還不能使用因為還沒有實作事務監聽類… 分布式事務,這個是幹啥的, 保證消息絕對送出到MQ中,下面會講如何使用
消息消費者
配置介紹
消費者隻需要在類上加@RocketMQMessageListener(xxx)就行了
常用的注解參數
- consumerGroup 消費者分組
- topic 主題
-
selectorType 消息選擇器類型
預設值 SelectorType.TAG 根據TAG選擇
僅支援表達式格式如:“tag1 || tag2 || tag3”,如果表達式為null或者“*”辨別訂閱所有消息
-
selectorExpression 選擇器表達式
預設值 ”*“
-
consumeMode 消費者模式
ConsumeMode.CONCURRENTLY (異步多線程消費 沒有順序 速度最快) (預設)
ConsumeMode.ORDERLY (順序消費 )
-
messageModel 廣播消費模式與叢集消費模式
MessageModel.CLUSTERING(叢集消費模式 就相當于是負載均衡) (預設)
MessageModel.BROADCASTING (廣播模式)
叢集模式
當 Consumer 使用叢集模式時,每條消息隻會被 Consumer 叢集内的任意一個 Consumer 執行個體消費一次。
比如: 當一個 Consumer 叢集内有 3 個 Consumer 執行個體時,一條消息到達時,隻會被Consumer 1、Consumer 2、Consumer 3中的一個消費。
@Slf4j
@Component //必須注入spring容器
@RocketMQMessageListener(
messageModel= MessageModel.CLUSTERING, //(叢集消費模式 預設配置)
consumeMode= ConsumeMode.CONCURRENTLY , //(異步多線程消費 預設配置)
topic = "topicTest",
selectorType = SelectorType.TAG,
consumerGroup = "test-consumer-group",
selectorExpression = "order") //tag
public class Consumer1 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String message = new String(messageExt.getBody());
log.info("==============================消費者1");
log.info("收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),
messageExt.getMsgId(), message);
Map<Integer, Object> orderInfo = JSON.parseObject(messageExt.getBody(), Map.class);
log.info("訂單資訊 orderInfo = {} ", orderInfo.toString());
}
}
注意如果想測試那麼就需要在啟動一個服務把端口号變了就行,因為同一個項目隻支援注冊一個topic 和不同consumerGroup)
廣播模式
當 Consumer 使用廣播模式時,每條消息都會被 Consumer 叢集内所有的 Consumer 執行個體消費一次。
比如:當一個 Consumer 叢集内有 3 個 Consumer 執行個體時,一條消息到達時,會被Consumer 1、Consumer 2、Consumer 3 三個都消費一次。
消費者1:
@Slf4j
@Component //必須注入spring容器
@RocketMQMessageListener(
messageModel= MessageModel.CLUSTERING,
consumeMode= ConsumeMode.CONCURRENTLY ,
topic = "topicTest",
selectorType = SelectorType.TAG,
consumerGroup = "test-consumer-group1",
selectorExpression = "order") //tag
消費者2:
@Slf4j
@Component //必須注入spring容器
@RocketMQMessageListener(
messageModel= MessageModel.CLUSTERING,
consumeMode= ConsumeMode.CONCURRENTLY ,
topic = "topicTest", //topic:消息的發送者使用同一個topic
selectorType = SelectorType.TAG,
consumerGroup = "test-consumer-group2", //group:不用和生産者group相同
selectorExpression = "order") //tag
一定要注意保證 consumerGroup 組的名稱要不同否則會導緻消息發送混亂
消息重複消費和幂等性問題
問題的原因
現在稍微大點的項目都會有用到消息隊列中間件,因為消息隊列有異步解耦、流量削峰、資料分發等許多好處。但是在網際網路應用中,尤其在網絡不穩定的情況下,消息隊列 RocketMQ 的消息有可能會出現重複。如果消息重複則會影響到我們正常的業務處理,這時就要對消息做幂等處理。最近,我所在項目中,就出現了消息重複被消費的問題,造成了對使用者的過渡營銷引起了投訴,于是我們項目立即召開會議讨論方案解決此問題。這裡涉及到消息重複和幂等,下面先說下這兩概念,然後再說下我們用到的解決方案。
消息重複的場景
發送時消息重複
當一條消息已被成功發送到服務端并完成持久化,此時出現了網絡閃斷或者用戶端當機,導緻服務端對生産者的确認應答失敗。如果此時生産者意識到消息發送失敗并嘗試再次發送消息,消費者後續會收到兩條内容相同并且 Message ID 也相同的消息。
投遞時消息重複
消息消費的場景下,消息已投遞到消費者并完成業務處理,當消費者給服務端回報應答的時候網絡閃斷。為了保證消息至少被消費一次,消息隊列 RocketMQ 的服務端将在網絡恢複後再次嘗試投遞之前已被處理過的消息,消費者後續會收到兩條内容相同并且 Message ID 也相同的消息。
負載均衡時消息重複
包括但不限于網絡抖動、Broker 重新開機以及消費者應用重新開機。當消息隊列 RocketMQ 的 Broker 或用戶端重新開機、擴容或縮容時,會觸發 Rebalance,此時消費者可能會收到重複消息。
解決方式
當出現消費者對某條消息重複消費的情況時,重複消費的結果與消費一次的結果是相同的,并且多次消費并未對業務系統産生任何負面影響,那麼這整個過程就可實作消息幂等。
例如,在支付場景下,消費者消費扣款消息,對一筆訂單執行扣款操作,扣款金額為 100 元。如果因網絡不穩定等原因導緻扣款消息重複投遞,消費者重複消費了該扣款消息,但最終的業務結果是隻扣款一次,扣費 100 元,且使用者的扣款記錄中對應的訂單隻有一條扣款流水,不會多次扣除費用。那麼這次扣款操作是符合要求的,整個消費過程實作了消費幂等。
具體流程如下:
- 消費者收到任務後先在redis 中查詢此任務的 id 是否存在
- 如果存在那麼就代表有人已經消費過這個任務了,那麼就直接return 就行了
- 如果沒有存在那麼就先将任務的ID(必須保證ID是唯一可以采用雪花id) 存儲在Redis的Set集合中(Set集合可以保證沒有重複)
- 然後在處理業務邏輯…
RocketMq 生産者事務
保證消息絕對送出到MQ中了 不會因為中途斷電情…況導緻生産者送出了消失而MQ沒有收到
原理就是 生産者先發送半個消息到MQ,這個消息消費者是看不到的,然後一段時間(10ms~60ms)左右
消費者向MQ進行回查,判斷半個消息是否被MQ接收到了,如果MQ接收到了,
那麼生産者在向MQ發送消息送出(COMMIT_MESSAGE),表示次消息消費者可以進行消費了
如果MQ沒有收到半個消息,那麼生産者就傳回一個事務狀态(ROLLBACK_MESSAGE)将取消發送半個消息,
注意,這裡還涉及到另外一個問題。如果是傳回未知狀态(UNKNOW),
RocketMQ Broker
伺服器會以1分鐘的間隔時間不斷回查,直至達到事務回查最大檢測數,如果超過這個數字還未查詢到事務狀态,則復原此消息。
@Slf4j
@RocketMQTransactionListener
public class RocketMqTransaction implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
// MessageHeaders headers = message.getHeaders();
// Map<String, Object> msg = JSON.parseObject(headers.get("msg").toString(),Map.class);
log.info("executeLocalTransaction - UNKNOWN ");
return RocketMQLocalTransactionState.UNKNOWN; //開啟消息回查
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
if (Objects.isNull((headers.get("msg")))){
log.info("checkLocalTransaction - UNKNOWN ");
return RocketMQLocalTransactionState.UNKNOWN; //消息沒有被MQ接收到 發送一個未知狀态 繼續回查
}
Map<String, Object> msg = JSON.parseObject(headers.get("msg").toString(),Map.class);
if (msg.isEmpty()) {
log.info("checkLocalTransaction - UNKNOWN ");
return RocketMQLocalTransactionState.UNKNOWN; //消息沒有被MQ接收到 發送一個未知狀态 繼續回查
}
log.info("checkLocalTransaction - COMMIT ");
return RocketMQLocalTransactionState.COMMIT; //确認消息以被MQ接受到了
}
}
雖然我們保證了生産者消息絕對送出成功,那麼生産者怎麼辦??? 我們可以使用消費者 ACK機制,來保證消費者絕對消費成功
RocketMq 消費者ACK
1、什麼是消息确認ACK。
答:如果在處理消息的過程中,消費者的伺服器在處理消息的時候出現異常,那麼可能這條正在處理的消息就沒有完成消息消費,資料就會丢失。為了確定資料不會丢失,RabbitMQ支援消息确定-ACK。
2、ACK的消息确認機制。
答:ACK機制是消費者從RocketMq 收到消息并處理完成後,回報給RocketMq ,RocketMq 收到回報後才将此消息從隊列中删除。
如果一個消費者在處理消息出現了網絡不穩定、伺服器異常等現象,那麼就不會有ACK回報,RocketMq 會認為這個消息沒有正常消費,會将消息重新放入隊列中。
如果在叢集的情況下,RocketMq 會立即将這個消息推送給這個線上的其他消費者。這種機制保證了在消費者服務端故障的時候,不丢失任何消息和任務。
消息永遠不會從RocketMq 中删除,隻有當消費者正确發送ACK回報,RocketMq 确認收到後,消息才會從RabbitMQ伺服器的資料中删除。
消息的ACK确認機制預設是打開的。
3、ACK機制的開發注意事項。
答:如果忘記了ACK,那麼後果很嚴重。當Consumer退出時候,Message會一直重新分發。然後RabbitMQ會占用越來越多的内容,由于RocketMq 會長時間運作,是以這個"記憶體洩漏"是緻命的。
在RocketMq 中ACK分為有序和無序
有序ACK
@Slf4j
@Component //必須注入spring容器
public class ConsumerAck {
// 順序消費的回調
public ConsumerAck() throws MQClientException {
// 執行個體化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-ack-group");
// 設定NameServer的位址
consumer.setNamesrvAddr("192.168.81.128:9876:9876");
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe("topicTest", "*");
// 注冊回調實作類來處理從broker拉取回來的消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
try {
for (MessageExt messageExt : msgs) {
String message = new String(messageExt.getBody());
log.info("ACk收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),
messageExt.getMsgId(), message);
Map<Integer, Object> orderInfo = JSON.parseObject(messageExt.getBody(), Map.class);
log.info("ACk訂單資訊 orderInfo = {} ", orderInfo.toString());
}
// int i=1/0; 模拟出錯 實作ACK機制
// 标記該消息已經被成功消費
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
e.printStackTrace();
// 标記該消息失敗,從新打回到mq中 //無限循環直到成功 而不是16次後進入死隊列
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
// 啟動消費者執行個體
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
無序ACK
@Slf4j
@Component //必須注入spring容器
public class ConsumerAck {
public ConsumerAck() throws MQClientException {
// 執行個體化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-ack-group");
// 設定NameServer的位址
consumer.setNamesrvAddr("192.168.81.128:9876:9876");
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe("topicTest", "*");
// 注冊回調實作類來處理從broker拉取回來的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
{
try {
for (MessageExt messageExt : msgs) {
String message = new String(messageExt.getBody());
log.info("ACk收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),
messageExt.getMsgId(), message);
Map<Integer, Object> orderInfo = JSON.parseObject(messageExt.getBody(), Map.class);
log.info("ACk訂單資訊 orderInfo = {} ", orderInfo.toString());
}
int i=1/0; 模拟出錯 實作ACK機制
// 标記該消息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
// 标記該消息失敗,從新打回到mq中
而如果一直這樣重複消費都持續失敗到一定次數(預設16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工幹預。
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
// 啟動消費者執行個體
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
分布式事務解決方案
- 訂單系統向
發送一條預備扣減庫存消息,MQ
儲存預備消息并傳回成功MQ
ACK
- 接收到預備消息執行成功
,訂單系統執行本地下單操作,為防止消息發送成功而本地事務失敗,訂單系統會實作ACK
的回調接口,其内不斷的檢查本地事務是否執行成功,如果失敗則MQ
復原預備消息;成功則對消息進行最終rollback
送出。commit
- 庫存系統消費扣減庫存消息,執行本地事務,如果扣減失敗,消息會重新投,一旦超出重試次數,則本地表持久化失敗消息,并啟動定時任務做補償。
基于消息中間件的兩階段送出方案,通常用在高并發場景下使用,犧牲資料的強一緻性換取性能的大幅提升,不過實作這種方式的成本和複雜度是比較高的,還要看實際業務情況。
出現的問題
建立broker容器出現閃退情況
記憶體不夠(最低2G),硬碟大小不夠(最低留4G空閑硬碟)
生産者推送消息失敗情況
[CL: 0.91 CQ: 0.91 INDEX: 0.91]出現類似這個問題表示目前分區内磁盤占用率以及高達百分之91了 表示記憶體不足了
可用通過 下指令查詢linux磁盤占用率
df -h
沒辦法隻能加硬碟記憶體了
消費者接收不到消息情況
關閉防火牆 或者把 9876 ,10911 ,10909 11011 11009 這5個端口開發
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=11011/tcp --permanent
firewall-cmd --zone=public --add-port=11009/tcp --permanent
其他問題
/usr/local/src/rocketmq/docker-rocketmq/rmq/rmq/logs/rocketmqlogs
找到 broker.log 看日志到底啥情況
如果出現下面這個問題
ERROR: for rmqnamesrv Cannot start service rmqnamesrv: driver failed programming external connectivity on endpoint rmqnamesrv (a3508287c83b6a778d19f97e9720dbf37c60e7f6a55fa2902d5468b530cdd119): (iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 9876 -j DNAT --to-destination 172.18.0.2:9876 ! -i br-c0bbc3d62576: iptables: No chain/target/match by that name.
(exit status 1))
ERROR: Encountered errors while bringing up the project.
解決辦法
重新開機docker
systemctl restart docker
點贊 -收藏加 -關注 便于以後複習和收到最新内容 有其他問題在評論區讨論-或者私信我-收到會在第一時間回複 感謝,配合,希望我的努力對你有幫助^_^