天天看點

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

1. MQ介紹

##1.1 為什麼要用MQ

消息隊列是一種“先進先出”的資料結構

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

其應用場景主要包含以下3個方面

  • 應用解耦

系統的耦合性越高,容錯性就越低。以電商應用為例,使用者建立訂單後,如果耦合調用庫存系統、物流系統、支付系統,任何一個子系統出了故障或者因為更新等原因暫時不可用,都會造成下單操作異常,影響使用者使用體驗。

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

使用消息隊列解耦合,系統的耦合性就會提高了。比如物流系統發生故障,需要幾分鐘才能來修複,在這段時間内,物流系統要處理的資料被緩存到消息隊列中,使用者的下單操作正常完成。當物流系統回複後,補充處理存在消息隊列中的訂單消息即可,終端系統感覺不到物流系統發生過幾分鐘故障。

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例
  • 流量削峰
RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

應用系統如果遇到系統請求流量的瞬間猛增,有可能會将系統壓垮。有了消息隊列可以将大量請求緩存起來,分散到很長一段時間處理,這樣可以大大提到系統的穩定性和使用者體驗。

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

一般情況,為了保證系統的穩定性,如果系統負載超過門檻值,就會阻止使用者請求,這會影響使用者體驗,而如果使用消息隊列将請求緩存起來,等待系統處理完畢後通知使用者下單完畢,這樣總不能下單體驗要好。

處于經濟考量目的:

業務系統正常時段的QPS如果是1000,流量最高峰是10000,為了應對流量高峰配置高性能的伺服器顯然不劃算,這時可以使用消息隊列對峰值流量削峰

  • 資料分發
RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

通過消息隊列可以讓資料在多個系統更加之間進行流通。資料的産生方不需要關心誰來使用資料,隻需要将資料發送到消息隊列,資料使用方直接在消息隊列中直接擷取資料即可

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

1.2 MQ的優點和缺點

優點:解耦、削峰、資料分發

缺點包含以下幾點:

  • 系統可用性降低

    系統引入的外部依賴越多,系統穩定性越差。一旦MQ當機,就會對業務造成影響。

    如何保證MQ的高可用?

  • 系統複雜度提高

    MQ的加入大大增加了系統的複雜度,以前系統間是同步的遠端調用,現在是通過MQ進行異步調用。

    如何保證消息沒有被重複消費?怎麼處理消息丢失情況?那麼保證消息傳遞的順序性?

  • 一緻性問題

    A系統處理完業務,通過MQ給B、C、D三個系統發消息資料,如果B系統、C系統處理成功,D系統處理失敗。

    如何保證消息資料處理的一緻性?

1.3 各種MQ産品的比較

常見的MQ産品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ。

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

2. RocketMQ快速入門

RocketMQ是阿裡巴巴2016年MQ中間件,使用Java語言開發,在阿裡内部,RocketMQ承接了例如“雙11”等高并發場景的消息流轉,能夠處理萬億級别的消息。

2.1 準備工作

2.1.1 下載下傳RocketMQ

RocketMQ最新版本:4.5.1

下載下傳位址

2.2.2 環境要求

  • Linux64位系統
  • JDK1.8(64位)
  • 源碼安裝需要安裝Maven 3.2.x

2.2 安裝RocketMQ

2.2.1 安裝步驟

本教程以二進制包方式安裝

  1. 解壓安裝包
     unzip 壓縮包
     移動到自己新建立的目錄中(我這裡在/usr/local下建立rocketmq檔案夾)
     将解壓好的檔案,移動到新建立的檔案夾中
     mv rocketmq-all-4.4.0-bin-release /usr/local/rocketmq/
               
  2. 進入安裝目錄

2.2.2 目錄介紹

  • bin:啟動腳本,包括shell腳本和CMD腳本
  • conf:執行個體配置檔案 ,包括broker配置檔案、logback配置檔案等
  • lib:依賴jar包,包括Netty、commons-lang、FastJSON等

2.3 啟動RocketMQ

  1. 啟動NameServer
 # 1.啟動NameServer
 nohup sh bin/mqnamesrv &
 # 2.檢視啟動日志
 tail -f ~/logs/rocketmqlogs/namesrv.log
           
  1. 啟動Broker
 # 1.啟動Broker
 nohup sh bin/mqbroker -n localhost:9876 &
 # 2.檢視啟動日志
 tail -f ~/logs/rocketmqlogs/broker.log 
           
  • 問題描述:

    RocketMQ預設的虛拟機記憶體較大,啟動Broker如果因為記憶體不足失敗,需要編輯如下兩個配置檔案,修改JVM記憶體大小

 # 編輯runbroker.sh和runserver.sh修改預設JVM大小
 vi runbroker.sh
 - 參考設定:
 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
 ​
 vi runserver.sh
 - 參考設定:
 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m  -XX:MaxMetaspaceSize=320m"
           

改完以後,也必須先重新開機一下NameServer

 #先關閉,然後執行上面啟動的步驟
 sh bin/mqshutdown namesrv
           

2.4 測試RocketMQ

2.4.1 發送消息

 # 1.設定環境變量
 export NAMESRV_ADDR=localhost:9876
 # 2.使用安裝包的Demo發送消息
 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
           

2.4.2 接收消息

 # 1.設定環境變量
 export NAMESRV_ADDR=localhost:9876
 # 2.接收消息
 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
           

2.5 關閉RocketMQ

 # 1.關閉NameServer
 sh bin/mqshutdown namesrv
 # 2.關閉Broker
 sh bin/mqshutdown broker
           

3. RocketMQ叢集搭建

3.1 各角色介紹

  • Producer:消息的發送者;舉例:發信者
  • Consumer:消息接收者;舉例:收信者
  • Broker:暫存和傳輸消息;舉例:郵局
  • NameServer:管理Broker;舉例:各個郵局的管理機構
  • Topic:區分消息的種類;一個發送者可以發送消息給一個或者多個Topic;一個消息的接收者可以訂閱一個或者多個Topic消息
  • Message Queue:相當于是Topic的分區;用于并行發送和接收消息
RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

3.2 叢集搭建方式

3.2.1 叢集特點

  • NameServer是一個幾乎無狀态節點,可叢集部署,節點之間無任何資訊同步。
  • Broker部署相對複雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave隻能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer叢集中的所有節點建立長連接配接,定時注冊Topic資訊到所有NameServer。
  • Producer與NameServer叢集中的其中一個節點(随機選擇)建立長連接配接,定期從NameServer取Topic路由資訊,并向提供Topic服務的Master建立長連接配接,且定時向Master發送心跳。Producer完全無狀态,可叢集部署。
  • Consumer與NameServer叢集中的其中一個節點(随機選擇)建立長連接配接,定期從NameServer取Topic路由資訊,并向提供Topic服務的Master、Slave建立長連接配接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。

3.2.3 叢集模式

1)單Master模式

這種方式風險較大,一旦Broker重新開機或者當機時,會導緻整個服務不可用。不建議線上環境使用,可以用于本地測試。

2)多Master模式

一個叢集無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:

  • 優點:配置簡單,單個Master當機或重新開機維護對應用無影響,在磁盤配置為RAID10時,即使機器當機不可恢複情況下,由于RAID10磁盤非常可靠,消息也不會丢(異步刷盤丢失少量消息,同步刷盤一條不丢),性能最高;
  • 缺點:單台機器當機期間,這台機器上未被消費的消息在機器恢複之前不可訂閱,消息實時性會受到影響。

3)多Master多Slave模式(異步)

每個Master配置一個Slave,有多對Master-Slave,HA采用異步複制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:

  • 優點:即使磁盤損壞,消息丢失的非常少,且消息實時性不會受影響,同時Master當機後,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工幹預,性能同多Master模式幾乎一樣;
  • 缺點:Master當機,磁盤損壞情況下會丢失少量消息。

4)多Master多Slave模式(同步)

每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即隻有主備都寫成功,才向應用傳回成功,這種模式的優缺點如下:

  • 優點:資料與服務都無單點故障,Master當機情況下,消息無延遲,服務可用性與資料可用性都非常高;
  • 缺點:性能比異步複制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點當機後,備機不能自動切換為主機。

3.3 雙主雙從叢集搭建

3.3.1 總體架構

消息高可用采用2m-2s(同步雙寫)方式

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

3.3.2 叢集工作流程

  1. 啟動NameServer,NameServer起來後監聽端口,等待Broker、Producer、Consumer連上來,相當于一個路由控制中心。
  2. Broker啟動,跟所有的NameServer保持長連接配接,定時發送心跳包。心跳包中包含目前Broker資訊(IP+端口等)以及存儲所有Topic資訊。注冊成功後,NameServer叢集中就有Topic跟Broker的映射關系。
  3. 收發消息前,先建立Topic,建立Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動建立Topic。
  4. Producer發送消息,啟動時先跟NameServer叢集中的其中一台建立長連接配接,并從NameServer中擷取目前發送的Topic存在哪些Broker上,輪詢從隊列清單中選擇一個隊列,然後與隊列所在的Broker建立長連接配接進而向Broker發消息。
  5. Consumer跟Producer類似,跟其中一台NameServer建立長連接配接,擷取目前訂閱Topic存在哪些Broker上,然後直接跟Broker建立連接配接通道,開始消費消息。

3.3.3 伺服器環境

序号 IP 角色 架構模式
1 192.168.1.7 nameserver、brokerserver Master1、Slave2
2 192.168.1.8 nameserver、brokerserver Master2、Slave1

3.3.4 Host添加資訊

 vim /etc/hosts
           

配置如下:

 # nameserver
 192.168.1.7 rocketmq-nameserver1
 192.168.1.8 rocketmq-nameserver2
 # broker
 192.168.1.7 rocketmq-master1
 192.168.1.7 rocketmq-slave2
 192.168.1.8 rocketmq-master2
 192.168.1.8 rocketmq-slave1
           

配置完成後, 重新開機網卡

 systemctl restart network
           

3.3.5 防火牆配置

主控端需要遠端通路虛拟機的rocketmq服務和web服務,需要開放相關的端口号,簡單粗暴的方式是直接關閉防火牆

 # 關閉防火牆
 systemctl stop firewalld.service 
 # 檢視防火牆的狀态
 firewall-cmd --state 
 # 禁止firewall開機啟動
 systemctl disable firewalld.service
           

或者為了安全,隻開放特定的端口号,RocketMQ預設使用3個端口:9876 、10911 、11011 。如果防火牆沒有關閉的話,那麼防火牆就必須開放這些端口:

  • nameserver

    預設使用 9876 端口
  • master

    預設使用 10911 端口
  • slave

    預設使用11011 端口

執行以下指令:

 # 開放name server預設端口
 firewall-cmd --remove-port=9876/tcp --permanent
 # 開放master預設端口
 firewall-cmd --remove-port=10911/tcp --permanent
 # 開放slave預設端口 (目前叢集模式可不開啟)
 firewall-cmd --remove-port=11011/tcp --permanent 
 # 重新開機防火牆
 firewall-cmd --reload
           

3.3.6 環境變量配置

 vim /etc/profile
           

在profile檔案的末尾加入如下指令

 #set rocketmq
 ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
 PATH=$PATH:$ROCKETMQ_HOME/bin
 export ROCKETMQ_HOME PATH
           

輸入:wq! 儲存并退出, 并使得配置立刻生效:

 source /etc/profile
           

3.3.7 建立消息存儲路徑

 #建立master主節點的消息存儲路徑
 mkdir /usr/local/rocketmq/store
 mkdir /usr/local/rocketmq/store/commitlog
 mkdir /usr/local/rocketmq/store/consumequeue
 mkdir /usr/local/rocketmq/store/index
 #建立salve從節點的消息存儲路徑
 mkdir /usr/local/rocketmq/storesalve
 mkdir /usr/local/rocketmq/storesalve/commitlog
 mkdir /usr/local/rocketmq/storesalve/consumequeue
 mkdir /usr/local/rocketmq/storesalve/index
           

3.3.8 broker配置檔案

1)master1

伺服器:192.168.1.7

 vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties
           

修改配置如下:

#所屬叢集名字
 brokerClusterName=rocketmq-cluster
 #broker名字,注意此處不同的配置檔案填寫的不一樣
 brokerName=broker-a
 #0 表示 Master,>0 表示 Slave
 brokerId=0
 #nameServer位址,分号分割  注意:rocketmq-nameserver1必須跟/etc/hosts配置檔案的名字對應
 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
 #在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數
 defaultTopicQueueNums=4
 #是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
 autoCreateTopicEnable=true
 #是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
 autoCreateSubscriptionGroup=true
 #Broker 對外服務的監聽端口
 listenPort=10911
 #删除檔案時間點,預設淩晨 4點
 deleteWhen=04
 #檔案保留時間,預設 48 小時
 fileReservedTime=120
 #commitLog每個檔案的大小預設1G
 mapedFileSizeCommitLog=1073741824
 #ConsumeQueue每個檔案預設存30W條,根據業務情況調整
 mapedFileSizeConsumeQueue=300000
 #destroyMapedFileIntervalForcibly=120000
 #redeleteHangedFileInterval=120000
 #檢測實體檔案磁盤空間
 diskMaxUsedSpaceRatio=88
 #消息存儲路徑
 storePathRootDir=/usr/local/rocketmq/store
 #commitLog 存儲路徑
 storePathCommitLog=/usr/local/rocketmq/store/commitlog
 #消費隊列存儲路徑存儲路徑
 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
 #消息索引存儲路徑
 storePathIndex=/usr/local/rocketmq/store/index
 #checkpoint 檔案存儲路徑
 storeCheckpoint=/usr/local/rocketmq/store/checkpoint
 #abort 檔案存儲路徑
 abortFile=/usr/local/rocketmq/store/abort
 #限制的消息大小
 maxMessageSize=65536
 #flushCommitLogLeastPages=4
 #flushConsumeQueueLeastPages=2
 #flushCommitLogThoroughInterval=10000
 #flushConsumeQueueThoroughInterval=60000
 #Broker 的角色
 #- ASYNC_MASTER 異步複制Master
 #- SYNC_MASTER 同步雙寫Master
 #- SLAVE
 brokerRole=SYNC_MASTER
 #刷盤方式
 #- ASYNC_FLUSH 異步刷盤
 #- SYNC_FLUSH 同步刷盤
 flushDiskType=SYNC_FLUSH
 #checkTransactionMessageEnable=false
 #發消息線程池數量
 #sendMessageThreadPoolNums=128
 #拉消息線程池數量
 #pullMessageThreadPoolNums=128
 enablePropertyFilter=true
           

2)slave2

伺服器:192.168.1.7

vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties
           

修改配置如下:

#所屬叢集名字
 brokerClusterName=rocketmq-cluster
 #broker名字,注意此處不同的配置檔案填寫的不一樣
 brokerName=broker-b
 #0 表示 Master,>0 表示 Slave
 brokerId=1
 #nameServer位址,分号分割
 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
 #在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數
 defaultTopicQueueNums=4
 #是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
 autoCreateTopicEnable=true
 #是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
 autoCreateSubscriptionGroup=true
 #Broker 對外服務的監聽端口
 listenPort=11011
 #删除檔案時間點,預設淩晨 4點
 deleteWhen=04
 #檔案保留時間,預設 48 小時
 fileReservedTime=120
 #commitLog每個檔案的大小預設1G
 mapedFileSizeCommitLog=1073741824
 #ConsumeQueue每個檔案預設存30W條,根據業務情況調整
 mapedFileSizeConsumeQueue=300000
 #destroyMapedFileIntervalForcibly=120000
 #redeleteHangedFileInterval=120000
 #檢測實體檔案磁盤空間
 diskMaxUsedSpaceRatio=88
 #存儲路徑
 storePathRootDir=/usr/local/rocketmq/storesalve
 #commitLog 存儲路徑
 storePathCommitLog=/usr/local/rocketmq/storesalve/commitlog
 #消費隊列存儲路徑存儲路徑
 storePathConsumeQueue=/usr/local/rocketmq/storesalve/consumequeue
 #消息索引存儲路徑
 storePathIndex=/usr/local/rocketmq/storesalve/index
 #checkpoint 檔案存儲路徑
 storeCheckpoint=/usr/local/rocketmq/storesalve/checkpoint
 #abort 檔案存儲路徑
 abortFile=/usr/local/rocketmq/storesalve/abort
 #限制的消息大小
 maxMessageSize=65536
 #flushCommitLogLeastPages=4
 #flushConsumeQueueLeastPages=2
 #flushCommitLogThoroughInterval=10000
 #flushConsumeQueueThoroughInterval=60000
 #Broker 的角色
 #- ASYNC_MASTER 異步複制Master
 #- SYNC_MASTER 同步雙寫Master
 #- SLAVE
 brokerRole=SLAVE
 #刷盤方式
 #- ASYNC_FLUSH 異步刷盤
 #- SYNC_FLUSH 同步刷盤
 flushDiskType=ASYNC_FLUSH
 #checkTransactionMessageEnable=false
 #發消息線程池數量
 #sendMessageThreadPoolNums=128
 #拉消息線程池數量
 #pullMessageThreadPoolNums=128
 enablePropertyFilter=true
           

3)master2

伺服器:192.168.1.8

vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties
           

修改配置如下:

#所屬叢集名字
 brokerClusterName=rocketmq-cluster
 #broker名字,注意此處不同的配置檔案填寫的不一樣
 brokerName=broker-b
 #0 表示 Master,>0 表示 Slave
 brokerId=0
 #nameServer位址,分号分割
 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
 #在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數
 defaultTopicQueueNums=4
 #是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
 autoCreateTopicEnable=true
 #是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
 autoCreateSubscriptionGroup=true
 #Broker 對外服務的監聽端口
 listenPort=10911
 #删除檔案時間點,預設淩晨 4點
 deleteWhen=04
 #檔案保留時間,預設 48 小時
 fileReservedTime=120
 #commitLog每個檔案的大小預設1G
 mapedFileSizeCommitLog=1073741824
 #ConsumeQueue每個檔案預設存30W條,根據業務情況調整
 mapedFileSizeConsumeQueue=300000
 #destroyMapedFileIntervalForcibly=120000
 #redeleteHangedFileInterval=120000
 #檢測實體檔案磁盤空間
 diskMaxUsedSpaceRatio=88
 #存儲路徑
 storePathRootDir=/usr/local/rocketmq/store
 #commitLog 存儲路徑
 storePathCommitLog=/usr/local/rocketmq/store/commitlog
 #消費隊列存儲路徑存儲路徑
 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
 #消息索引存儲路徑
 storePathIndex=/usr/local/rocketmq/store/index
 #checkpoint 檔案存儲路徑
 storeCheckpoint=/usr/local/rocketmq/store/checkpoint
 #abort 檔案存儲路徑
 abortFile=/usr/local/rocketmq/store/abort
 #限制的消息大小
 maxMessageSize=65536
 #flushCommitLogLeastPages=4
 #flushConsumeQueueLeastPages=2
 #flushCommitLogThoroughInterval=10000
 #flushConsumeQueueThoroughInterval=60000
 #Broker 的角色
 #- ASYNC_MASTER 異步複制Master
 #- SYNC_MASTER 同步雙寫Master
 #- SLAVE
 brokerRole=SYNC_MASTER
 #刷盤方式
 #- ASYNC_FLUSH 異步刷盤
 #- SYNC_FLUSH 同步刷盤
 flushDiskType=SYNC_FLUSH
 #checkTransactionMessageEnable=false
 #發消息線程池數量
 #sendMessageThreadPoolNums=128
 #拉消息線程池數量
 #pullMessageThreadPoolNums=128
 enablePropertyFilter=true
           

4)slave1

伺服器:192.168.1.8

vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties
           

修改配置如下:

 #所屬叢集名字
 brokerClusterName=rocketmq-cluster
 #broker名字,注意此處不同的配置檔案填寫的不一樣
 brokerName=broker-a
 #0 表示 Master,>0 表示 Slave
 brokerId=1
 #nameServer位址,分号分割
 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
 #在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數
 defaultTopicQueueNums=4
 #是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
 autoCreateTopicEnable=true
 #是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
 autoCreateSubscriptionGroup=true
 #Broker 對外服務的監聽端口
 listenPort=11011
 #删除檔案時間點,預設淩晨 4點
 deleteWhen=04
 #檔案保留時間,預設 48 小時
 fileReservedTime=120
 #commitLog每個檔案的大小預設1G
 mapedFileSizeCommitLog=1073741824
 #ConsumeQueue每個檔案預設存30W條,根據業務情況調整
 mapedFileSizeConsumeQueue=300000
 #destroyMapedFileIntervalForcibly=120000
 #redeleteHangedFileInterval=120000
 #檢測實體檔案磁盤空間
 diskMaxUsedSpaceRatio=88
 #存儲路徑
 storePathRootDir=/usr/local/rocketmq/storesalve
 #commitLog 存儲路徑
 storePathCommitLog=/usr/local/rocketmq/storesalve/commitlog
 #消費隊列存儲路徑存儲路徑
 storePathConsumeQueue=/usr/local/rocketmq/storesalve/consumequeue
 #消息索引存儲路徑
 storePathIndex=/usr/local/rocketmq/storesalve/index
 #checkpoint 檔案存儲路徑
 storeCheckpoint=/usr/local/rocketmq/storesalve/checkpoint
 #abort 檔案存儲路徑
 abortFile=/usr/local/rocketmq/storesalve/abort
 #限制的消息大小
 maxMessageSize=65536
 #flushCommitLogLeastPages=4
 #flushConsumeQueueLeastPages=2
 #flushCommitLogThoroughInterval=10000
 #flushConsumeQueueThoroughInterval=60000
 #Broker 的角色
 #- ASYNC_MASTER 異步複制Master
 #- SYNC_MASTER 同步雙寫Master
 #- SLAVE
 brokerRole=SLAVE
 #刷盤方式
 #- ASYNC_FLUSH 異步刷盤
 #- SYNC_FLUSH 同步刷盤
 flushDiskType=ASYNC_FLUSH
 #checkTransactionMessageEnable=false
 #發消息線程池數量
 #sendMessageThreadPoolNums=128
 #拉消息線程池數量
 #pullMessageThreadPoolNums=128
 enablePropertyFilter=true
           

3.3.9 修改啟動腳本檔案

1)runbroker.sh

 vi /usr/local/rocketmq/bin/runbroker.sh
           

需要根據記憶體大小進行适當的對JVM參數進行調整:

 #===================================================
 # 開發環境配置 JVM Configuration
 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
           

####2)runserver.sh

 vim /usr/local/rocketmq/bin/runserver.sh
           
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
           

3.3.10 服務啟動

1)啟動NameServe叢集

分别在192.168.1.7和192.168.1.8啟動NameServer

 cd /usr/local/rocketmq/bin
 nohup sh mqnamesrv &
           

2)啟動Broker叢集

  • 在192.168.1.7上啟動master1和slave2

master1:

 cd /usr/local/rocketmq/bin
 #注意看下面啟動的路徑
nohup sh mqbroker -n 192.168.1.7:9876 -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties &
           

slave2:

 cd /usr/local/rocketmq/bin
 #注意看下面啟動的路徑
 nohup sh mqbroker -n 192.168.1.7:9876 -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
           
  • 在192.168.1.8上啟動master2和slave2

master2

cd /usr/local/rocketmq/bin
 #注意看下面啟動的路徑
nohup sh mqbroker -n 192.168.1.8:9876 -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties &
           

slave1

cd /usr/local/rocketmq/bin
 #注意看下面啟動的路徑
nohup sh mqbroker -n 192.168.1.8:9876 -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
           

3.3.11 檢視程序狀态

啟動後通過JPS檢視啟動程序

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

3.3.12 檢視日志

# 檢視nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 檢視broker日志
tail -500f ~/logs/rocketmqlogs/broker.log
           

3.5 叢集監控平台搭建

3.5.1 概述

RocketMQ

有一個對其擴充的開源項目incubator-rocketmq-externals,這個項目中有一個子子產品叫

rocketmq-console

,這個便是管理控制台項目了,先将incubator-rocketmq-externals拉到本地,因為我們需要自己對

rocketmq-console

進行編譯打包運作。

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

3.5.2 下載下傳并編譯打包

3.5.2.1首先從git上下載下傳

git clone https://github.com/apache/rocketmq-externals
           

3.5.2.2進入rocketmq-console工程中

(我的是在本機下載下傳的)

連結:https://pan.baidu.com/s/1LZoUcLZZQIbLpvVsqBf--g 

提取碼:f7hc

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

3.5.2.3修改配置檔案

進入rocketmq-console\src\main\resources目錄下,修改application.properties檔案

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

3.5.2.4回到主目錄,git打包

注意:maven install時,你的jdk版本必須是1.8以上

mvn clean package -Dmaven.test.skip=true
           
RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

3.5.2.5上傳jar包

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例
RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

上傳後:

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

3.6啟動rocketmq-console:

java -jar rocketmq-console-ng-1.0.1.jar
           

啟動成功後,我們就可以通過浏覽器通路

http://192.168.1.7:8080

進入控制台界面了,如下圖:

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

如果打開控制台,出現以下錯誤,請修改linux系統的時間。

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例
#檢視時間
date
#修改時間
date -s "2020-05-25 10:34:00"
#将時間寫入BIOS,以免重新開機伺服器後,設定的時間丢失。
hwclock -w
           

4. 消息發送樣例

  • 導入MQ用戶端依賴
<dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client</artifactId>
     <version>4.4.0</version>
 </dependency>
           
  • 消息發送者步驟分析r
 1.建立消息生産者producer,并制定生産者組名
 2.指定Nameserver位址
 3.啟動producer
 4.建立消息對象,指定主題Topic、Tag和消息體
 5.發送消息
 6.關閉生産者producer
           
  • 消息消費者步驟分析
 1.建立消費者Consumer,制定消費者組名
 2.指定Nameserver位址
 3.訂閱主題Topic和Tag
 4.設定回調函數,處理消息
 5.啟動消費者consumer
           

4.1 基本樣例

4.1.1 消息發送

1)發送同步消息

這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,短信通知。

 public class SyncProducer {
     public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
         //1.建立消息生産者producer,并制定生産者組名
         DefaultMQProducer producer=new DefaultMQProducer("group1");
         //2.指定Nameserver位址
         producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         //3.啟動producer
         producer.start();
         for (int i = 0; i < 10; i++) {
             //4.建立消息對象,指定主題Topic、Tag和消息體
             /**
              * 參數1:消息主題
              * 參數2:消息Tag
              * 參數3:消息内容
              */
             Message message=new Message("base","Tag1",("Hello world"+i).getBytes());
             //5.發送消息
             SendResult send = producer.send(message,10000);
             //發送狀态
             SendStatus status=send.getSendStatus();
             //消息ID
             String msgid=send.getMsgId();
             //消息接收隊列ID
             int queueId = send.getMessageQueue().getQueueId();
             System.out.println("----->"+send+"----->狀态--》"+status+"-->消息ID-->"+msgid+"--->接收隊列ID---》"+queueId);
             //線程睡眠一秒
             TimeUnit.SECONDS.sleep(1);
         }
             //6.關閉生産者producer
             producer.shutdown();
     }
 }
           

2)發送異步消息

異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。

 public class AyncProducer {
     public static void main(String[] args) throws Exception {
         // 執行個體化消息生産者Producer
         DefaultMQProducer producer = new DefaultMQProducer("group1");
         // 設定NameServer的位址
         producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876;");
         // 啟動Producer執行個體
         producer.start();
         for (int i = 0; i < 100; i++) {
             final int index = i;
             // 建立消息,并指定Topic,Tag和消息體
             Message message=new Message("base","Tag1",("Hello world"+i).getBytes());
             // SendCallback接收異步傳回結果的回調
             producer.send(message,new SendCallback() {
                 @Override
                 public void onSuccess(SendResult sendResult) {
                     System.out.println("success-->"+sendResult);
                 }
                 @Override
                 public void onException(Throwable e) {
                     System.out.println("erroe--->"+ e);
                 }
             } ,10000);
         }
         // 如果不再發送消息,關閉Producer執行個體。
        //producer.shutdown();
     }
 }
           

3)單向發送消息

這種方式主要用在不特别關心發送結果的場景,例如日志發送。

 public class OneWayProduce {
     public static void main(String[] args) throws Exception {
         //1.建立消息生産者producer,并制定生産者組名
         DefaultMQProducer producer=new DefaultMQProducer("group1");
         //2.指定Nameserver位址
         producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         //3.啟動producer
         producer.start();
         for (int i = 0; i < 3; i++) {
             //4.建立消息對象,指定主題Topic、Tag和消息體
             Message message=new Message("base","Tag3",("Hello world 單向消息"+i).getBytes());
             //5.發送消息,和發送同步消息唯一的差別
              producer.sendOneway(message);
             //線程睡眠一秒
             TimeUnit.SECONDS.sleep(10);
         }
         //6.關閉生産者producer
         producer.shutdown();
     }
 }
           

4.1.2 消費消息

1)普通模式

單純的消費消息。

public class Consumer {
     public static void main(String[] args) throws MQClientException {
     //1.建立消費者Consumer,制定消費者組名
         DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
     //2.指定Nameserver位址
         consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
     //3.訂閱主題Topic和Tag
         consumer.subscribe("base","Tag1");
     //4.設定回調函數,處理消息
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                 for (int i = 0; i < msg.size(); i++) {
                     System.out.println(new String(msg.get(i).getBody()));
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
     //5.啟動消費者consumer
         consumer.start();
     }
 }
           

2)負載均衡模式(預設情況下)

消費者采用負載均衡方式消費消息,多個消費者共同消費隊列消息,每個消費者處理的消息不同

 public static void main(String[] args) throws MQClientException {
     //1.建立消費者Consumer,制定消費者組名
         DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
     //2.指定Nameserver位址
         consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
     //3.訂閱主題Topic和Tag
         consumer.subscribe("base","Tag1");
       //******************負載均衡模式消費******************
     consumer.setMessageModel(MessageModel.CLUSTERING);
     //4.設定回調函數,處理消息
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                 for (int i = 0; i < msg.size(); i++) {
                     System.out.println(new String(msg.get(i).getBody()));
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
     //5.啟動消費者consumer
         consumer.start();
     }
           

3)廣播模式

消費者采用廣播的方式消費消息,每個消費者消費的消息都是相同的

public static void main(String[] args) throws MQClientException {
     //1.建立消費者Consumer,制定消費者組名
         DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
     //2.指定Nameserver位址
         consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
     //3.訂閱主題Topic和Tag
         consumer.subscribe("base","Tag1");
       //******************廣播模式消費*************
     consumer.setMessageModel(MessageModel.BROADCASTING);
     //4.設定回調函數,處理消息
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                 for (int i = 0; i < msg.size(); i++) {
                     System.out.println(new String(msg.get(i).getBody()));
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
     //5.啟動消費者consumer
         consumer.start();
     }
           

4.2 順序消息

消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。

順序消費的原了解析,在預設的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息隻依次發送到同一個queue中,消費的時候隻從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue隻有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。

下面用訂單進行分區有序的示例。一個訂單的順序流程是:建立、付款、推送、完成。訂單号相同的消息會被先後發送到同一個隊列中,消費時,同一個OrderId擷取到的肯定是同一個隊列。

4.2.1 順序消息生産

 /**
 * Producer,發送順序消息
 */
 public class Producer {
     public static void main(String[] args) throws Exception {
         //1.建立消息生産者producer,并制定生産者組名
         DefaultMQProducer producer = new DefaultMQProducer("group1");
         //2.指定Nameserver位址
         producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         //3.啟動producer
         producer.start();
         //建構消息集合
         List<OrderStep> orderSteps = OrderStep.buildOrders();
         //發送消息
         for (int i = 0; i < orderSteps.size(); i++) {
             String body = orderSteps.get(i) + "";
             Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());
             /**
              * 參數一:消息對象
              * 參數二:消息隊列的選擇器
              * 參數三:選擇隊列的業務辨別(訂單ID)
              */
             SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                 /**
                  * 用訂單ID取模來選擇路由到哪個隊列裡
                  * @param mqs:隊列集合
                  * @param msg:消息對象
                  * @param arg:業務辨別的參數
                  * @return
                  */
                 @Override
                 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                     long orderId = (long) arg;
                     long index = orderId % mqs.size();
                     return mqs.get((int) index);
                 }
             }, orderSteps.get(i).getOrderId(),10000); //orderSteps.get(i).getOrderId()這個地方的參數如果給上,那麼上面的arg就是用的這個參數
             System.out.println("發送結果:" + sendResult);
         }
         producer.shutdown();
     }
 }
 ​
    /**
     * 訂單的步驟
     */
    private static class OrderStep {
        private long orderId;
        private String desc;
 ​
        public long getOrderId() {
            return orderId;
        }
 ​
        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }
 ​
        public String getDesc() {
            return desc;
        }
 ​
        public void setDesc(String desc) {
            this.desc = desc;
        }
 ​
        @Override
        public String toString() {
            return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
        }
    }
 ​
    /**
     * 生成模拟訂單資料
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();
 ​
        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("建立");
        orderList.add(orderDemo);
 ​
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("建立");
        orderList.add(orderDemo);
 ​
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
 ​
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("建立");
        orderList.add(orderDemo);
 ​
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
 ​
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
 ​
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
 ​
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);
 ​
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
 ​
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
 ​
        return orderList;
    }
 }
           

4.2.2 順序消費消息

 /**
 * 順序消息消費,帶事務方式(應用可控制Offset什麼時候送出)
 */
 public class Consumer {
     public static void main(String[] args) throws MQClientException {
         //1.建立消費者Consumer,制定消費者組名
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
         //2.指定Nameserver位址
         consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         //3.訂閱主題Topic和Tag
         consumer.subscribe("OrderTopic", "*");
         //4.注冊消息監聽器  new MessageListenerOrderly這個監聽器就是,一個隊列配置設定一個線程去處理。
         consumer.registerMessageListener(new MessageListenerOrderly() {
             @Override
             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                 for (MessageExt msg : msgs) {
                     System.out.println("線程名稱:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody()));
                 }
                 return ConsumeOrderlyStatus.SUCCESS;
             }
         });
         //5.啟動消費者
         consumer.start();
         System.out.println("消費者啟動");
     }
 }
           

4.3 延時消息

比如電商裡,送出了一個訂單就可以發送一個延時消息,1h後去檢查這個訂單的狀态,如果還是未付款就取消訂單釋放庫存。

4.3.1 啟動消息消費者

public class ConsumerDealy {
     public static void main(String[] args) throws Exception {
         DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
         consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         consumer.subscribe("DelayTopic","Tag1");
         consumer.setMessageModel(MessageModel.BROADCASTING);
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                 for (int i = 0; i < msg.size(); i++) {
                     System.out.println("消息ID:"+msg.get(i).getMsgId()+"--->延遲時間"+(System.currentTimeMillis()-msg.get(i).getStoreTimestamp()));
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         consumer.start();
     }
 }
           

4.3.2 發送延時消息

 public class ProducerDelay {
     public static void main(String[] args) throws Exception {
         DefaultMQProducer producer=new DefaultMQProducer("group1");
         producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         //3.啟動producer
         producer.start();
         for (int i = 0; i < 10; i++) {
             //4.建立消息對象,指定主題Topic、Tag和消息體
             Message message=new Message("DelayTopic","Tag1",("Hello world"+i).getBytes());
             /**
              * 設定消息延遲時間
              * rocketmq的延遲時間隻能從以下時間選擇。
              * private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
              *
              *  下面代碼選擇的是3  那麼就延遲10s
              */
             message.setDelayTimeLevel(3);
             //5.發送消息
             SendResult send = producer.send(message,10000);
         }
         //6.關閉生産者producer
         producer.shutdown();
     }
 }
           

###4.3.3 驗證

您将會看到消息的消費比存儲時間晚10秒

4.3.4 使用限制

// org/apache/rocketmq/store/config/MessageStoreConfig.java
 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
           

現在RocketMq并不支援任意時間的延時,需要設定幾個固定的延時等級,從1s到2h分别對應着等級1到18

4.4 批量消息

批量發送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應超過4MB。

4.4.1 發送批量消息

如果您每次隻發送不超過4MB的消息,則很容易使用批處理,樣例如下:

 public class ProducerBatch {
     public static void main(String[] args) throws Exception {
         DefaultMQProducer producer=new DefaultMQProducer("group1");
         producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         producer.start();
         //4.建立消息對象,指定主題Topic、Tag和消息體,隻需一個集合即可。
         List<Message> msg=new ArrayList<>();
         Message message1=new Message("BatchTopic","Tag1",("Hello world"+1).getBytes());
         Message message2=new Message("BatchTopic","Tag1",("Hello world"+2).getBytes());
         Message message3=new Message("BatchTopic","Tag1",("Hello world"+3).getBytes());
         Message message4=new Message("BatchTopic","Tag1",("Hello world"+4).getBytes());
         msg.add(message1);
         msg.add(message2);
         msg.add(message3);
         msg.add(message4);
         //5.發送消息
         SendResult send = producer.send(msg,10000);
         //發送狀态
         SendStatus status=send.getSendStatus();
         //消息ID
         String msgid=send.getMsgId();
         //消息接收隊列ID
         int queueId = send.getMessageQueue().getQueueId();
         System.out.println("----->"+send+"----->狀态--》"+status+"-->消息ID-->"+msgid+"--->接收隊列ID---》"+queueId);
         producer.shutdown();
     }
 }
 ​
           

如果消息的總長度可能大于4MB時,這時候最好把消息進行分割

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
     @Override 
     public boolean hasNext() {
        return currIndex < messages.size();
    }
     @Override 
     public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // 增加日志的開銷20位元組
            if (tmpSize > SIZE_LIMIT) {
                //單個消息超過了最大的限制
                //忽略,否則會阻塞分裂的程序
                if (nextIndex - currIndex == 0) {
                   //假如下一個子清單沒有元素,則添加這個子清單然後退出循環,否則隻是退出循環
                   nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
 ​
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
 }
 //把大的消息分裂成若幹個小的消息
 ListSplitter splitter = new ListSplitter(messages);
 while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       //處理error
   }
 }
           

4.4.2消費批量消息(和其他消費方式相同)

public class ConsumerBatch {
     public static void main(String[] args) throws Exception {
         DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
         consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         //3.訂閱主題Topic和Tag
         consumer.subscribe("BatchTopic","Tag1");
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                 for (int i = 0; i < msg.size(); i++) {
                     System.out.println("消息:"+new String(msg.get(i).getBody()));
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         consumer.start();
     }
 }
           

4.5 過濾消息

在大多數情況下,TAG是一個簡單而有用的設計,其可以來選擇您想要的消息。例如:

 //普通情況下,隻需在消費端設定消費的Tag就可以
  consumer.subscribe("BatchTopic","Tag1");
 //如果消費目前topic下多個Tag
  consumer.subscribe("BatchTopic","Tag1 || Tag2 || Tag3");
 //如果消費目前topic下所有Tag的消息
 consumer.subscribe("BatchTopic","*");
           

消費者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一個消息隻能有一個标簽,這對于複雜的場景可能不起作用。在這種情況下,可以使用SQL表達式篩選消息。SQL特性可以通過發送消息時的屬性來進行計算。在RocketMQ定義的文法下,可以實作一些簡單的邏輯。下面是一個例子:

 ------------
 | message  |
 |----------|  a > 5 AND b = 'abc'
 | a = 10   |  --------------------> Gotten
 | b = 'abc'|
 | c = true |
 ------------
 ------------
 | message  |
 |----------|   a > 5 AND b = 'abc'
 | a = 1    |  --------------------> Missed
 | b = 'abc'|
 | c = true |
 ------------
           

4.5.1 SQL基本文法

RocketMQ隻定義了一些基本文法來支援這個特性。你也可以很容易地擴充它。

  • 數值比較,比如:>,>=,<,<=,BETWEEN,=;
  • 字元比較,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 邏輯符号 AND,OR,NOT;

常量支援類型為:

  • 數值,比如:123,3.1415;
  • 字元,比如:'abc',必須用單引号包裹起來;
  • NULL,特殊的常量
  • 布爾值,TRUE 或 FALSE

隻有使用push模式的消費者才能用使用SQL92标準的sql語句,接口如下:

 public void subscribe(finalString topic, final MessageSelector messageSelector)
           

4.5.2 消息生産者

發送消息時,你能通過

putUserProperty

來設定消息的屬性

public class ProducerSQL {
     public static void main(String[] args) throws Exception {
         DefaultMQProducer producer=new DefaultMQProducer("group1");
         producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         //3.啟動producer
         producer.start();
         for (int i = 0; i < 10; i++) {
             //4.建立消息對象,指定主題Topic、Tag和消息體
             Message message=new Message("FilterTopic","Tag1",("Hello world"+i).getBytes());
             // ***********設定消息屬性**I在消費者中就能當一個辨別********
             message.putUserProperty("i",String.valueOf(i));
             SendResult send = producer.send(message,10000);
             TimeUnit.SECONDS.sleep(1);
         }
         producer.shutdown();
     }
 }
           

4.5.3 消息消費者

用MessageSelector.bySql來使用sql篩選消息

 public class ConsumerSql {
     public static void main(String[] args) throws Exception {
         DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("grop1");
         consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         //3.訂閱主題Topic和Tag   隻有訂閱的消息有這個屬性i, i >=0 and i <= 3
         consumer.subscribe("FilterTopic", MessageSelector.bySql("i between 0 and 3"));
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                 for (int i = 0; i < msg.size(); i++) {
                     System.out.println("消息:"+new String(msg.get(i).getBody()));
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         //5.啟動消費者consumer
         consumer.start();
     }
 }
           

4.6 事務消息

###4.6.1 流程分析

RocketMQ基礎1. MQ介紹2. RocketMQ快速入門3. RocketMQ叢集搭建4. 消息發送樣例

上圖說明了事務消息的大緻方案,其中分為兩個流程:正常事務消息的發送及送出、事務消息的補償流程。

####1)事務消息發送及送出

(1) 發送消息(half消息)。

(2) 服務端響應消息寫入結果。

(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)。

(4) 根據本地事務狀态執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)

2)事務補償

(1) 對沒有Commit/Rollback的事務消息(pending狀态的消息),從服務端發起一次“回查”

(2) Producer收到回查消息,檢查回查消息對應的本地事務的狀态

(3) 根據本地事務狀态,重新Commit或者Rollback

其中,補償階段用于解決消息Commit或者Rollback發生逾時或者失敗的情況。

3)事務消息狀态

事務消息共有三種狀态,送出狀态、復原狀态、中間狀态:

  • TransactionStatus.CommitTransaction: 送出事務,它允許消費者消費此消息。
  • TransactionStatus.RollbackTransaction: 復原事務,它代表該消息将被删除,不允許被消費。
  • TransactionStatus.Unknown: 中間狀态,它代表需要檢查消息隊列來确定狀态。

###4.6.1 發送事務消息

1) 建立事務性生産者

使用

TransactionMQProducer

類建立生産者,并指定唯一的

ProducerGroup

,就可以設定自定義線程池來處理這些檢查請求。執行本地事務後、需要根據執行結果對消息隊列進行回複。回傳的事務狀态在請參考前一節。

 public class Producer {
     public static void main(String[] args) throws Exception {
         //1.建立消息生産者,不能用DefaultMQProducer,必須用TransactionMQProducer的生産者了。
         TransactionMQProducer producer = new TransactionMQProducer("group5");
         //2.指定Nameserver位址
         producer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         //添加事務監聽器
         producer.setTransactionListener(new TransactionListener() {
             /**
              * 在該方法中執行本地事務
              */
             @Override
             public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                 if (StringUtils.equals("TAGA", msg.getTags())) {
                     //消息發送成功
                     return LocalTransactionState.COMMIT_MESSAGE;
                 } else if (StringUtils.equals("TAGB", msg.getTags())) {
                     //消息復原
                     return LocalTransactionState.ROLLBACK_MESSAGE;
                 } else if (StringUtils.equals("TAGC", msg.getTags())) {
                     //消息不做處理。不做處理後,就會調用下面方法checkLocalTransaction 進行回查
                     return LocalTransactionState.UNKNOW;
                 }
                 return LocalTransactionState.UNKNOW;
             }
             /**
              * 該方法時MQ進行消息事務狀态回查
              */
             @Override
             public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                 System.out.println("消息的Tag:" + msg.getTags());
                 return LocalTransactionState.COMMIT_MESSAGE;
             }
         });
         //3.啟動producer
         producer.start();
         String[] tags = {"TAGA", "TAGB", "TAGC"};
         for (int i = 0; i < 3; i++) {
             //4.建立消息對象,指定主題Topic、Tag和消息體
             Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes());
             //5.發送消息
             SendResult result = producer.sendMessageInTransaction(msg, null);
             //發送狀态
             SendStatus status = result.getSendStatus();
             System.out.println("發送結果:" + result);
             //線程睡1秒
             TimeUnit.SECONDS.sleep(1);
         }
         //不能關閉,否則消息回查就不執行了
         //producer.shutdown();
     }
 }
           

1.2)實作事務的監聽接口

當發送半消息成功時,我們使用

executeLocalTransaction

方法來執行本地事務。它傳回前一節中提到的三個事務狀态之一。

checkLocalTranscation

方法用于檢查本地事務狀态,并回應消息隊列的檢查請求。它也是傳回前一節中提到的三個事務狀态之一。

public class TransactionListenerImpl implements TransactionListener {
 ​
     @Override
     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
         System.out.println("執行本地事務");
         if (StringUtils.equals("TagA", msg.getTags())) {
             return LocalTransactionState.COMMIT_MESSAGE;
         } else if (StringUtils.equals("TagB", msg.getTags())) {
             return LocalTransactionState.ROLLBACK_MESSAGE;
         } else {
             return LocalTransactionState.UNKNOW;
         }
 ​
     }
 ​
     @Override
     public LocalTransactionState checkLocalTransaction(MessageExt msg) {
         System.out.println("MQ檢查消息Tag【"+msg.getTags()+"】的本地事務執行結果");
         return LocalTransactionState.COMMIT_MESSAGE;
     }
 }
           

2)建立事務性消費者

public class Consumer {
 ​
     public static void main(String[] args) throws Exception {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
         consumer.setNamesrvAddr("192.168.1.7:9876;192.168.1.8:9876");
         //3.訂閱主題Topic和Tag
         consumer.subscribe("TransactionTopic", "*");
         //4.設定回調函數,處理消息
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             //接受消息内容
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                 for (MessageExt msg : msgs) {
                     System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         //5.啟動消費者consumer
         consumer.start();
         System.out.println("消費者啟動");
     }
 }
           

4.6.2 使用限制

  1. 事務消息不支援延時消息和批量消息。
  2. 為了避免單個消息被檢查太多次而導緻半隊列消息累積,我們預設将單個消息的檢查次數限制為 15 次,但是使用者可以通過 Broker 配置檔案的

    transactionCheckMax

    參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N =

    transactionCheckMax

    ) 則 Broker 将丢棄此消息,并在預設情況下同時列印錯誤日志。使用者可以通過重寫

    AbstractTransactionCheckListener

    類來修改這個行為。
  3. 事務消息将在 Broker 配置檔案中的參數 transactionMsgTimeout 這樣的特定時間長度之後被檢查。當發送事務消息時,使用者還可以通過設定使用者屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先于

    transactionMsgTimeout

    參數。
  4. 事務性消息可能不止一次被檢查或消費。
  5. 送出給使用者的目标主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確定事務消息不丢失、并且事務完整性得到保證,建議使用同步的雙重寫入機制。
  6. 事務消息的生産者 ID 不能與其他類型消息的生産者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ伺服器能通過它們的生産者 ID 查詢到消費者。

繼續閱讀