1. MQ介紹
##1.1 為什麼要用MQ
消息隊列是一種“先進先出”的資料結構

其應用場景主要包含以下3個方面
- 應用解耦
系統的耦合性越高,容錯性就越低。以電商應用為例,使用者建立訂單後,如果耦合調用庫存系統、物流系統、支付系統,任何一個子系統出了故障或者因為更新等原因暫時不可用,都會造成下單操作異常,影響使用者使用體驗。
使用消息隊列解耦合,系統的耦合性就會提高了。比如物流系統發生故障,需要幾分鐘才能來修複,在這段時間内,物流系統要處理的資料被緩存到消息隊列中,使用者的下單操作正常完成。當物流系統回複後,補充處理存在消息隊列中的訂單消息即可,終端系統感覺不到物流系統發生過幾分鐘故障。
- 流量削峰
應用系統如果遇到系統請求流量的瞬間猛增,有可能會将系統壓垮。有了消息隊列可以将大量請求緩存起來,分散到很長一段時間處理,這樣可以大大提到系統的穩定性和使用者體驗。
一般情況,為了保證系統的穩定性,如果系統負載超過門檻值,就會阻止使用者請求,這會影響使用者體驗,而如果使用消息隊列将請求緩存起來,等待系統處理完畢後通知使用者下單完畢,這樣總不能下單體驗要好。
處于經濟考量目的:
業務系統正常時段的QPS如果是1000,流量最高峰是10000,為了應對流量高峰配置高性能的伺服器顯然不劃算,這時可以使用消息隊列對峰值流量削峰
- 資料分發
通過消息隊列可以讓資料在多個系統更加之間進行流通。資料的産生方不需要關心誰來使用資料,隻需要将資料發送到消息隊列,資料使用方直接在消息隊列中直接擷取資料即可
1.2 MQ的優點和缺點
優點:解耦、削峰、資料分發
缺點包含以下幾點:
-
系統可用性降低
系統引入的外部依賴越多,系統穩定性越差。一旦MQ當機,就會對業務造成影響。
如何保證MQ的高可用?
-
系統複雜度提高
MQ的加入大大增加了系統的複雜度,以前系統間是同步的遠端調用,現在是通過MQ進行異步調用。
如何保證消息沒有被重複消費?怎麼處理消息丢失情況?那麼保證消息傳遞的順序性?
-
一緻性問題
A系統處理完業務,通過MQ給B、C、D三個系統發消息資料,如果B系統、C系統處理成功,D系統處理失敗。
如何保證消息資料處理的一緻性?
1.3 各種MQ産品的比較
常見的MQ産品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ。
2. RocketMQ快速入門
RocketMQ是阿裡巴巴2016年MQ中間件,使用Java語言開發,在阿裡内部,RocketMQ承接了例如“雙11”等高并發場景的消息流轉,能夠處理萬億級别的消息。
2.1 準備工作
2.1.1 下載下傳RocketMQ
RocketMQ最新版本:4.5.1
下載下傳位址
2.2.2 環境要求
- Linux64位系統
- JDK1.8(64位)
- 源碼安裝需要安裝Maven 3.2.x
2.2 安裝RocketMQ
2.2.1 安裝步驟
本教程以二進制包方式安裝
- 解壓安裝包
unzip 壓縮包 移動到自己新建立的目錄中(我這裡在/usr/local下建立rocketmq檔案夾) 将解壓好的檔案,移動到新建立的檔案夾中 mv rocketmq-all-4.4.0-bin-release /usr/local/rocketmq/
- 進入安裝目錄
2.2.2 目錄介紹
- bin:啟動腳本,包括shell腳本和CMD腳本
- conf:執行個體配置檔案 ,包括broker配置檔案、logback配置檔案等
- lib:依賴jar包,包括Netty、commons-lang、FastJSON等
2.3 啟動RocketMQ
- 啟動NameServer
# 1.啟動NameServer
nohup sh bin/mqnamesrv &
# 2.檢視啟動日志
tail -f ~/logs/rocketmqlogs/namesrv.log
- 啟動Broker
# 1.啟動Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 2.檢視啟動日志
tail -f ~/logs/rocketmqlogs/broker.log
-
問題描述:
RocketMQ預設的虛拟機記憶體較大,啟動Broker如果因為記憶體不足失敗,需要編輯如下兩個配置檔案,修改JVM記憶體大小
# 編輯runbroker.sh和runserver.sh修改預設JVM大小
vi runbroker.sh
- 參考設定:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
vi runserver.sh
- 參考設定:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
改完以後,也必須先重新開機一下NameServer
#先關閉,然後執行上面啟動的步驟
sh bin/mqshutdown namesrv
2.4 測試RocketMQ
2.4.1 發送消息
# 1.設定環境變量
export NAMESRV_ADDR=localhost:9876
# 2.使用安裝包的Demo發送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
2.4.2 接收消息
# 1.設定環境變量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
2.5 關閉RocketMQ
# 1.關閉NameServer
sh bin/mqshutdown namesrv
# 2.關閉Broker
sh bin/mqshutdown broker
3. RocketMQ叢集搭建
3.1 各角色介紹
- Producer:消息的發送者;舉例:發信者
- Consumer:消息接收者;舉例:收信者
- Broker:暫存和傳輸消息;舉例:郵局
- NameServer:管理Broker;舉例:各個郵局的管理機構
- Topic:區分消息的種類;一個發送者可以發送消息給一個或者多個Topic;一個消息的接收者可以訂閱一個或者多個Topic消息
- Message Queue:相當于是Topic的分區;用于并行發送和接收消息
3.2 叢集搭建方式
3.2.1 叢集特點
- NameServer是一個幾乎無狀态節點,可叢集部署,節點之間無任何資訊同步。
- Broker部署相對複雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave隻能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer叢集中的所有節點建立長連接配接,定時注冊Topic資訊到所有NameServer。
- Producer與NameServer叢集中的其中一個節點(随機選擇)建立長連接配接,定期從NameServer取Topic路由資訊,并向提供Topic服務的Master建立長連接配接,且定時向Master發送心跳。Producer完全無狀态,可叢集部署。
- Consumer與NameServer叢集中的其中一個節點(随機選擇)建立長連接配接,定期從NameServer取Topic路由資訊,并向提供Topic服務的Master、Slave建立長連接配接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
3.2.3 叢集模式
1)單Master模式
這種方式風險較大,一旦Broker重新開機或者當機時,會導緻整個服務不可用。不建議線上環境使用,可以用于本地測試。
2)多Master模式
一個叢集無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:
- 優點:配置簡單,單個Master當機或重新開機維護對應用無影響,在磁盤配置為RAID10時,即使機器當機不可恢複情況下,由于RAID10磁盤非常可靠,消息也不會丢(異步刷盤丢失少量消息,同步刷盤一條不丢),性能最高;
- 缺點:單台機器當機期間,這台機器上未被消費的消息在機器恢複之前不可訂閱,消息實時性會受到影響。
3)多Master多Slave模式(異步)
每個Master配置一個Slave,有多對Master-Slave,HA采用異步複制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:
- 優點:即使磁盤損壞,消息丢失的非常少,且消息實時性不會受影響,同時Master當機後,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工幹預,性能同多Master模式幾乎一樣;
- 缺點:Master當機,磁盤損壞情況下會丢失少量消息。
4)多Master多Slave模式(同步)
每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即隻有主備都寫成功,才向應用傳回成功,這種模式的優缺點如下:
- 優點:資料與服務都無單點故障,Master當機情況下,消息無延遲,服務可用性與資料可用性都非常高;
- 缺點:性能比異步複制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點當機後,備機不能自動切換為主機。
3.3 雙主雙從叢集搭建
3.3.1 總體架構
消息高可用采用2m-2s(同步雙寫)方式
3.3.2 叢集工作流程
- 啟動NameServer,NameServer起來後監聽端口,等待Broker、Producer、Consumer連上來,相當于一個路由控制中心。
- Broker啟動,跟所有的NameServer保持長連接配接,定時發送心跳包。心跳包中包含目前Broker資訊(IP+端口等)以及存儲所有Topic資訊。注冊成功後,NameServer叢集中就有Topic跟Broker的映射關系。
- 收發消息前,先建立Topic,建立Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動建立Topic。
- Producer發送消息,啟動時先跟NameServer叢集中的其中一台建立長連接配接,并從NameServer中擷取目前發送的Topic存在哪些Broker上,輪詢從隊列清單中選擇一個隊列,然後與隊列所在的Broker建立長連接配接進而向Broker發消息。
- Consumer跟Producer類似,跟其中一台NameServer建立長連接配接,擷取目前訂閱Topic存在哪些Broker上,然後直接跟Broker建立連接配接通道,開始消費消息。
3.3.3 伺服器環境
序号 | IP | 角色 | 架構模式 |
---|---|---|---|
1 | 192.168.1.7 | nameserver、brokerserver | Master1、Slave2 |
2 | 192.168.1.8 | nameserver、brokerserver | Master2、Slave1 |
3.3.4 Host添加資訊
vim /etc/hosts
配置如下:
# nameserver
192.168.1.7 rocketmq-nameserver1
192.168.1.8 rocketmq-nameserver2
# broker
192.168.1.7 rocketmq-master1
192.168.1.7 rocketmq-slave2
192.168.1.8 rocketmq-master2
192.168.1.8 rocketmq-slave1
配置完成後, 重新開機網卡
systemctl restart network
3.3.5 防火牆配置
主控端需要遠端通路虛拟機的rocketmq服務和web服務,需要開放相關的端口号,簡單粗暴的方式是直接關閉防火牆
# 關閉防火牆
systemctl stop firewalld.service
# 檢視防火牆的狀态
firewall-cmd --state
# 禁止firewall開機啟動
systemctl disable firewalld.service
或者為了安全,隻開放特定的端口号,RocketMQ預設使用3個端口:9876 、10911 、11011 。如果防火牆沒有關閉的話,那麼防火牆就必須開放這些端口:
-
預設使用 9876 端口nameserver
-
預設使用 10911 端口master
-
預設使用11011 端口slave
執行以下指令:
# 開放name server預設端口
firewall-cmd --remove-port=9876/tcp --permanent
# 開放master預設端口
firewall-cmd --remove-port=10911/tcp --permanent
# 開放slave預設端口 (目前叢集模式可不開啟)
firewall-cmd --remove-port=11011/tcp --permanent
# 重新開機防火牆
firewall-cmd --reload
3.3.6 環境變量配置
vim /etc/profile
在profile檔案的末尾加入如下指令
#set rocketmq
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
輸入:wq! 儲存并退出, 并使得配置立刻生效:
source /etc/profile
3.3.7 建立消息存儲路徑
#建立master主節點的消息存儲路徑
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
#建立salve從節點的消息存儲路徑
mkdir /usr/local/rocketmq/storesalve
mkdir /usr/local/rocketmq/storesalve/commitlog
mkdir /usr/local/rocketmq/storesalve/consumequeue
mkdir /usr/local/rocketmq/storesalve/index
3.3.8 broker配置檔案
1)master1
伺服器:192.168.1.7
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties
修改配置如下:
#所屬叢集名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置檔案填寫的不一樣
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer位址,分号分割 注意:rocketmq-nameserver1必須跟/etc/hosts配置檔案的名字對應
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#删除檔案時間點,預設淩晨 4點
deleteWhen=04
#檔案保留時間,預設 48 小時
fileReservedTime=120
#commitLog每個檔案的大小預設1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個檔案預設存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測實體檔案磁盤空間
diskMaxUsedSpaceRatio=88
#消息存儲路徑
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存儲路徑
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 檔案存儲路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 檔案存儲路徑
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步複制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
enablePropertyFilter=true
2)slave2
伺服器:192.168.1.7
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties
修改配置如下:
#所屬叢集名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置檔案填寫的不一樣
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer位址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=11011
#删除檔案時間點,預設淩晨 4點
deleteWhen=04
#檔案保留時間,預設 48 小時
fileReservedTime=120
#commitLog每個檔案的大小預設1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個檔案預設存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測實體檔案磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/rocketmq/storesalve
#commitLog 存儲路徑
storePathCommitLog=/usr/local/rocketmq/storesalve/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/rocketmq/storesalve/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/rocketmq/storesalve/index
#checkpoint 檔案存儲路徑
storeCheckpoint=/usr/local/rocketmq/storesalve/checkpoint
#abort 檔案存儲路徑
abortFile=/usr/local/rocketmq/storesalve/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步複制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SLAVE
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
enablePropertyFilter=true
3)master2
伺服器:192.168.1.8
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties
修改配置如下:
#所屬叢集名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置檔案填寫的不一樣
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer位址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#删除檔案時間點,預設淩晨 4點
deleteWhen=04
#檔案保留時間,預設 48 小時
fileReservedTime=120
#commitLog每個檔案的大小預設1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個檔案預設存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測實體檔案磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存儲路徑
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 檔案存儲路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 檔案存儲路徑
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步複制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
enablePropertyFilter=true
4)slave1
伺服器:192.168.1.8
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties
修改配置如下:
#所屬叢集名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置檔案填寫的不一樣
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer位址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=11011
#删除檔案時間點,預設淩晨 4點
deleteWhen=04
#檔案保留時間,預設 48 小時
fileReservedTime=120
#commitLog每個檔案的大小預設1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個檔案預設存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測實體檔案磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/rocketmq/storesalve
#commitLog 存儲路徑
storePathCommitLog=/usr/local/rocketmq/storesalve/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/rocketmq/storesalve/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/rocketmq/storesalve/index
#checkpoint 檔案存儲路徑
storeCheckpoint=/usr/local/rocketmq/storesalve/checkpoint
#abort 檔案存儲路徑
abortFile=/usr/local/rocketmq/storesalve/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步複制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SLAVE
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
enablePropertyFilter=true
3.3.9 修改啟動腳本檔案
1)runbroker.sh
vi /usr/local/rocketmq/bin/runbroker.sh
需要根據記憶體大小進行适當的對JVM參數進行調整:
#===================================================
# 開發環境配置 JVM Configuration
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
####2)runserver.sh
vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
3.3.10 服務啟動
1)啟動NameServe叢集
分别在192.168.1.7和192.168.1.8啟動NameServer
cd /usr/local/rocketmq/bin
nohup sh mqnamesrv &
2)啟動Broker叢集
- 在192.168.1.7上啟動master1和slave2
master1:
cd /usr/local/rocketmq/bin
#注意看下面啟動的路徑
nohup sh mqbroker -n 192.168.1.7:9876 -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties &
slave2:
cd /usr/local/rocketmq/bin
#注意看下面啟動的路徑
nohup sh mqbroker -n 192.168.1.7:9876 -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
- 在192.168.1.8上啟動master2和slave2
master2
cd /usr/local/rocketmq/bin
#注意看下面啟動的路徑
nohup sh mqbroker -n 192.168.1.8:9876 -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties &
slave1
cd /usr/local/rocketmq/bin
#注意看下面啟動的路徑
nohup sh mqbroker -n 192.168.1.8:9876 -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
3.3.11 檢視程序狀态
啟動後通過JPS檢視啟動程序
3.3.12 檢視日志
# 檢視nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 檢視broker日志
tail -500f ~/logs/rocketmqlogs/broker.log
3.5 叢集監控平台搭建
3.5.1 概述
RocketMQ
有一個對其擴充的開源項目incubator-rocketmq-externals,這個項目中有一個子子產品叫
rocketmq-console
,這個便是管理控制台項目了,先将incubator-rocketmq-externals拉到本地,因為我們需要自己對
rocketmq-console
進行編譯打包運作。
3.5.2 下載下傳并編譯打包
3.5.2.1首先從git上下載下傳
git clone https://github.com/apache/rocketmq-externals
3.5.2.2進入rocketmq-console工程中
(我的是在本機下載下傳的)
連結:https://pan.baidu.com/s/1LZoUcLZZQIbLpvVsqBf--g
提取碼:f7hc
3.5.2.3修改配置檔案
進入rocketmq-console\src\main\resources目錄下,修改application.properties檔案
3.5.2.4回到主目錄,git打包
注意:maven install時,你的jdk版本必須是1.8以上
mvn clean package -Dmaven.test.skip=true
3.5.2.5上傳jar包
上傳後:
3.6啟動rocketmq-console:
java -jar rocketmq-console-ng-1.0.1.jar
啟動成功後,我們就可以通過浏覽器通路
http://192.168.1.7:8080
進入控制台界面了,如下圖:
如果打開控制台,出現以下錯誤,請修改linux系統的時間。
#檢視時間
date
#修改時間
date -s "2020-05-25 10:34:00"
#将時間寫入BIOS,以免重新開機伺服器後,設定的時間丢失。
hwclock -w
4. 消息發送樣例
- 導入MQ用戶端依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
- 消息發送者步驟分析r
1.建立消息生産者producer,并制定生産者組名
2.指定Nameserver位址
3.啟動producer
4.建立消息對象,指定主題Topic、Tag和消息體
5.發送消息
6.關閉生産者producer
- 消息消費者步驟分析
1.建立消費者Consumer,制定消費者組名
2.指定Nameserver位址
3.訂閱主題Topic和Tag
4.設定回調函數,處理消息
5.啟動消費者consumer
4.1 基本樣例
4.1.1 消息發送
1)發送同步消息
這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,短信通知。
public class SyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//1.建立消息生産者producer,并制定生産者組名
DefaultMQProducer producer=new DefaultMQProducer("group1");
//2.指定Nameserver位址
producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.啟動producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.建立消息對象,指定主題Topic、Tag和消息體
/**
* 參數1:消息主題
* 參數2:消息Tag
* 參數3:消息内容
*/
Message message=new Message("base","Tag1",("Hello world"+i).getBytes());
//5.發送消息
SendResult send = producer.send(message,10000);
//發送狀态
SendStatus status=send.getSendStatus();
//消息ID
String msgid=send.getMsgId();
//消息接收隊列ID
int queueId = send.getMessageQueue().getQueueId();
System.out.println("----->"+send+"----->狀态--》"+status+"-->消息ID-->"+msgid+"--->接收隊列ID---》"+queueId);
//線程睡眠一秒
TimeUnit.SECONDS.sleep(1);
}
//6.關閉生産者producer
producer.shutdown();
}
}
2)發送異步消息
異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。
public class AyncProducer {
public static void main(String[] args) throws Exception {
// 執行個體化消息生産者Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 設定NameServer的位址
producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876;");
// 啟動Producer執行個體
producer.start();
for (int i = 0; i < 100; i++) {
final int index = i;
// 建立消息,并指定Topic,Tag和消息體
Message message=new Message("base","Tag1",("Hello world"+i).getBytes());
// SendCallback接收異步傳回結果的回調
producer.send(message,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("success-->"+sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("erroe--->"+ e);
}
} ,10000);
}
// 如果不再發送消息,關閉Producer執行個體。
//producer.shutdown();
}
}
3)單向發送消息
這種方式主要用在不特别關心發送結果的場景,例如日志發送。
public class OneWayProduce {
public static void main(String[] args) throws Exception {
//1.建立消息生産者producer,并制定生産者組名
DefaultMQProducer producer=new DefaultMQProducer("group1");
//2.指定Nameserver位址
producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.啟動producer
producer.start();
for (int i = 0; i < 3; i++) {
//4.建立消息對象,指定主題Topic、Tag和消息體
Message message=new Message("base","Tag3",("Hello world 單向消息"+i).getBytes());
//5.發送消息,和發送同步消息唯一的差別
producer.sendOneway(message);
//線程睡眠一秒
TimeUnit.SECONDS.sleep(10);
}
//6.關閉生産者producer
producer.shutdown();
}
}
4.1.2 消費消息
1)普通模式
單純的消費消息。
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.建立消費者Consumer,制定消費者組名
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
//2.指定Nameserver位址
consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.訂閱主題Topic和Tag
consumer.subscribe("base","Tag1");
//4.設定回調函數,處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < msg.size(); i++) {
System.out.println(new String(msg.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.啟動消費者consumer
consumer.start();
}
}
2)負載均衡模式(預設情況下)
消費者采用負載均衡方式消費消息,多個消費者共同消費隊列消息,每個消費者處理的消息不同
public static void main(String[] args) throws MQClientException {
//1.建立消費者Consumer,制定消費者組名
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
//2.指定Nameserver位址
consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.訂閱主題Topic和Tag
consumer.subscribe("base","Tag1");
//******************負載均衡模式消費******************
consumer.setMessageModel(MessageModel.CLUSTERING);
//4.設定回調函數,處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < msg.size(); i++) {
System.out.println(new String(msg.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.啟動消費者consumer
consumer.start();
}
3)廣播模式
消費者采用廣播的方式消費消息,每個消費者消費的消息都是相同的
public static void main(String[] args) throws MQClientException {
//1.建立消費者Consumer,制定消費者組名
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
//2.指定Nameserver位址
consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.訂閱主題Topic和Tag
consumer.subscribe("base","Tag1");
//******************廣播模式消費*************
consumer.setMessageModel(MessageModel.BROADCASTING);
//4.設定回調函數,處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < msg.size(); i++) {
System.out.println(new String(msg.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.啟動消費者consumer
consumer.start();
}
4.2 順序消息
消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。
順序消費的原了解析,在預設的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息隻依次發送到同一個queue中,消費的時候隻從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue隻有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。
下面用訂單進行分區有序的示例。一個訂單的順序流程是:建立、付款、推送、完成。訂單号相同的消息會被先後發送到同一個隊列中,消費時,同一個OrderId擷取到的肯定是同一個隊列。
4.2.1 順序消息生産
/**
* Producer,發送順序消息
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1.建立消息生産者producer,并制定生産者組名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver位址
producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.啟動producer
producer.start();
//建構消息集合
List<OrderStep> orderSteps = OrderStep.buildOrders();
//發送消息
for (int i = 0; i < orderSteps.size(); i++) {
String body = orderSteps.get(i) + "";
Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());
/**
* 參數一:消息對象
* 參數二:消息隊列的選擇器
* 參數三:選擇隊列的業務辨別(訂單ID)
*/
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
/**
* 用訂單ID取模來選擇路由到哪個隊列裡
* @param mqs:隊列集合
* @param msg:消息對象
* @param arg:業務辨別的參數
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long orderId = (long) arg;
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, orderSteps.get(i).getOrderId(),10000); //orderSteps.get(i).getOrderId()這個地方的參數如果給上,那麼上面的arg就是用的這個參數
System.out.println("發送結果:" + sendResult);
}
producer.shutdown();
}
}
/**
* 訂單的步驟
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模拟訂單資料
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("建立");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("建立");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("建立");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
4.2.2 順序消費消息
/**
* 順序消息消費,帶事務方式(應用可控制Offset什麼時候送出)
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1.建立消費者Consumer,制定消費者組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver位址
consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.訂閱主題Topic和Tag
consumer.subscribe("OrderTopic", "*");
//4.注冊消息監聽器 new MessageListenerOrderly這個監聽器就是,一個隊列配置設定一個線程去處理。
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("線程名稱:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//5.啟動消費者
consumer.start();
System.out.println("消費者啟動");
}
}
4.3 延時消息
比如電商裡,送出了一個訂單就可以發送一個延時消息,1h後去檢查這個訂單的狀态,如果還是未付款就取消訂單釋放庫存。
4.3.1 啟動消息消費者
public class ConsumerDealy {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
consumer.subscribe("DelayTopic","Tag1");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < msg.size(); i++) {
System.out.println("消息ID:"+msg.get(i).getMsgId()+"--->延遲時間"+(System.currentTimeMillis()-msg.get(i).getStoreTimestamp()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
4.3.2 發送延時消息
public class ProducerDelay {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.啟動producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.建立消息對象,指定主題Topic、Tag和消息體
Message message=new Message("DelayTopic","Tag1",("Hello world"+i).getBytes());
/**
* 設定消息延遲時間
* rocketmq的延遲時間隻能從以下時間選擇。
* private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
*
* 下面代碼選擇的是3 那麼就延遲10s
*/
message.setDelayTimeLevel(3);
//5.發送消息
SendResult send = producer.send(message,10000);
}
//6.關閉生産者producer
producer.shutdown();
}
}
###4.3.3 驗證
您将會看到消息的消費比存儲時間晚10秒
4.3.4 使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
現在RocketMq并不支援任意時間的延時,需要設定幾個固定的延時等級,從1s到2h分别對應着等級1到18
4.4 批量消息
批量發送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應超過4MB。
4.4.1 發送批量消息
如果您每次隻發送不超過4MB的消息,則很容易使用批處理,樣例如下:
public class ProducerBatch {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
producer.start();
//4.建立消息對象,指定主題Topic、Tag和消息體,隻需一個集合即可。
List<Message> msg=new ArrayList<>();
Message message1=new Message("BatchTopic","Tag1",("Hello world"+1).getBytes());
Message message2=new Message("BatchTopic","Tag1",("Hello world"+2).getBytes());
Message message3=new Message("BatchTopic","Tag1",("Hello world"+3).getBytes());
Message message4=new Message("BatchTopic","Tag1",("Hello world"+4).getBytes());
msg.add(message1);
msg.add(message2);
msg.add(message3);
msg.add(message4);
//5.發送消息
SendResult send = producer.send(msg,10000);
//發送狀态
SendStatus status=send.getSendStatus();
//消息ID
String msgid=send.getMsgId();
//消息接收隊列ID
int queueId = send.getMessageQueue().getQueueId();
System.out.println("----->"+send+"----->狀态--》"+status+"-->消息ID-->"+msgid+"--->接收隊列ID---》"+queueId);
producer.shutdown();
}
}
如果消息的總長度可能大于4MB時,這時候最好把消息進行分割
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的開銷20位元組
if (tmpSize > SIZE_LIMIT) {
//單個消息超過了最大的限制
//忽略,否則會阻塞分裂的程序
if (nextIndex - currIndex == 0) {
//假如下一個子清單沒有元素,則添加這個子清單然後退出循環,否則隻是退出循環
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//把大的消息分裂成若幹個小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//處理error
}
}
4.4.2消費批量消息(和其他消費方式相同)
public class ConsumerBatch {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.訂閱主題Topic和Tag
consumer.subscribe("BatchTopic","Tag1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < msg.size(); i++) {
System.out.println("消息:"+new String(msg.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
4.5 過濾消息
在大多數情況下,TAG是一個簡單而有用的設計,其可以來選擇您想要的消息。例如:
//普通情況下,隻需在消費端設定消費的Tag就可以
consumer.subscribe("BatchTopic","Tag1");
//如果消費目前topic下多個Tag
consumer.subscribe("BatchTopic","Tag1 || Tag2 || Tag3");
//如果消費目前topic下所有Tag的消息
consumer.subscribe("BatchTopic","*");
消費者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一個消息隻能有一個标簽,這對于複雜的場景可能不起作用。在這種情況下,可以使用SQL表達式篩選消息。SQL特性可以通過發送消息時的屬性來進行計算。在RocketMQ定義的文法下,可以實作一些簡單的邏輯。下面是一個例子:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
4.5.1 SQL基本文法
RocketMQ隻定義了一些基本文法來支援這個特性。你也可以很容易地擴充它。
- 數值比較,比如:>,>=,<,<=,BETWEEN,=;
- 字元比較,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 邏輯符号 AND,OR,NOT;
常量支援類型為:
- 數值,比如:123,3.1415;
- 字元,比如:'abc',必須用單引号包裹起來;
- NULL,特殊的常量
- 布爾值,TRUE 或 FALSE
隻有使用push模式的消費者才能用使用SQL92标準的sql語句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
4.5.2 消息生産者
發送消息時,你能通過
putUserProperty
來設定消息的屬性
public class ProducerSQL {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.啟動producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.建立消息對象,指定主題Topic、Tag和消息體
Message message=new Message("FilterTopic","Tag1",("Hello world"+i).getBytes());
// ***********設定消息屬性**I在消費者中就能當一個辨別********
message.putUserProperty("i",String.valueOf(i));
SendResult send = producer.send(message,10000);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}
4.5.3 消息消費者
用MessageSelector.bySql來使用sql篩選消息
public class ConsumerSql {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.訂閱主題Topic和Tag 隻有訂閱的消息有這個屬性i, i >=0 and i <= 3
consumer.subscribe("FilterTopic", MessageSelector.bySql("i between 0 and 3"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < msg.size(); i++) {
System.out.println("消息:"+new String(msg.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.啟動消費者consumer
consumer.start();
}
}
4.6 事務消息
###4.6.1 流程分析
上圖說明了事務消息的大緻方案,其中分為兩個流程:正常事務消息的發送及送出、事務消息的補償流程。
####1)事務消息發送及送出
(1) 發送消息(half消息)。
(2) 服務端響應消息寫入結果。
(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)。
(4) 根據本地事務狀态執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)
2)事務補償
(1) 對沒有Commit/Rollback的事務消息(pending狀态的消息),從服務端發起一次“回查”
(2) Producer收到回查消息,檢查回查消息對應的本地事務的狀态
(3) 根據本地事務狀态,重新Commit或者Rollback
其中,補償階段用于解決消息Commit或者Rollback發生逾時或者失敗的情況。
3)事務消息狀态
事務消息共有三種狀态,送出狀态、復原狀态、中間狀态:
- TransactionStatus.CommitTransaction: 送出事務,它允許消費者消費此消息。
- TransactionStatus.RollbackTransaction: 復原事務,它代表該消息将被删除,不允許被消費。
- TransactionStatus.Unknown: 中間狀态,它代表需要檢查消息隊列來确定狀态。
###4.6.1 發送事務消息
1) 建立事務性生産者
使用
TransactionMQProducer
類建立生産者,并指定唯一的
ProducerGroup
,就可以設定自定義線程池來處理這些檢查請求。執行本地事務後、需要根據執行結果對消息隊列進行回複。回傳的事務狀态在請參考前一節。
public class Producer {
public static void main(String[] args) throws Exception {
//1.建立消息生産者,不能用DefaultMQProducer,必須用TransactionMQProducer的生産者了。
TransactionMQProducer producer = new TransactionMQProducer("group5");
//2.指定Nameserver位址
producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//添加事務監聽器
producer.setTransactionListener(new TransactionListener() {
/**
* 在該方法中執行本地事務
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
if (StringUtils.equals("TAGA", msg.getTags())) {
//消息發送成功
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", msg.getTags())) {
//消息復原
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", msg.getTags())) {
//消息不做處理。不做處理後,就會調用下面方法checkLocalTransaction 進行回查
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
/**
* 該方法時MQ進行消息事務狀态回查
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("消息的Tag:" + msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
//3.啟動producer
producer.start();
String[] tags = {"TAGA", "TAGB", "TAGC"};
for (int i = 0; i < 3; i++) {
//4.建立消息對象,指定主題Topic、Tag和消息體
Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes());
//5.發送消息
SendResult result = producer.sendMessageInTransaction(msg, null);
//發送狀态
SendStatus status = result.getSendStatus();
System.out.println("發送結果:" + result);
//線程睡1秒
TimeUnit.SECONDS.sleep(1);
}
//不能關閉,否則消息回查就不執行了
//producer.shutdown();
}
}
1.2)實作事務的監聽接口
當發送半消息成功時,我們使用
executeLocalTransaction
方法來執行本地事務。它傳回前一節中提到的三個事務狀态之一。
checkLocalTranscation
方法用于檢查本地事務狀态,并回應消息隊列的檢查請求。它也是傳回前一節中提到的三個事務狀态之一。
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("執行本地事務");
if (StringUtils.equals("TagA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("MQ檢查消息Tag【"+msg.getTags()+"】的本地事務執行結果");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
2)建立事務性消費者
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
//3.訂閱主題Topic和Tag
consumer.subscribe("TransactionTopic", "*");
//4.設定回調函數,處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.啟動消費者consumer
consumer.start();
System.out.println("消費者啟動");
}
}
4.6.2 使用限制
- 事務消息不支援延時消息和批量消息。
- 為了避免單個消息被檢查太多次而導緻半隊列消息累積,我們預設将單個消息的檢查次數限制為 15 次,但是使用者可以通過 Broker 配置檔案的
參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N =transactionCheckMax
) 則 Broker 将丢棄此消息,并在預設情況下同時列印錯誤日志。使用者可以通過重寫transactionCheckMax
類來修改這個行為。AbstractTransactionCheckListener
- 事務消息将在 Broker 配置檔案中的參數 transactionMsgTimeout 這樣的特定時間長度之後被檢查。當發送事務消息時,使用者還可以通過設定使用者屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先于
參數。transactionMsgTimeout
- 事務性消息可能不止一次被檢查或消費。
- 送出給使用者的目标主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確定事務消息不丢失、并且事務完整性得到保證,建議使用同步的雙重寫入機制。
- 事務消息的生産者 ID 不能與其他類型消息的生産者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ伺服器能通過它們的生産者 ID 查詢到消費者。