本文主要介紹如何将 RocketMQ 叢集從原先的主從同步更新到主從切換。
首先介紹與 DLedger 多副本即 RocketMQ 主從切換相關的核心配置屬性,然後嘗試搭建一個主從同步叢集,再從原先的 RocketMQ 叢集平滑更新到 DLedger 叢集的示例,并簡單測試一下主從切換功能。
1、RocketMQ DLedger 多副本即主從切換核心配置參數詳解
其主要的配置參數如下所示:
-
enableDLegerCommitLog
是否啟用 DLedger,即是否啟用 RocketMQ 主從切換,預設值為 false。如果需要開啟主從切換,則該值需要設定為 true 。
-
dLegerGroup
節點所屬的 raft 組,建議與 brokerName 保持一緻,例如 broker-a。
-
dLegerPeers
叢集節點資訊,示例配置如下:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913,多個節點用英文冒号隔開,單個條目遵循 legerSlefId-ip:端口,這裡的端口用作 dledger 内部通信。
-
dLegerSelfId
目前節點id。取自 legerPeers 中條目的開頭,即上述示例中的 n0,并且特别需要強調,隻能第一個字元為英文,其他字元需要配置成數字。
-
storePathRootDir
DLedger 日志檔案的存儲根目錄,為了能夠支援平滑更新,該值與 storePathCommitLog 設定為不同的目錄。
2、搭建主從同步環境
首先先搭建一個傳統意義上的主從同步架構,往叢集中灌一定量的資料,然後更新到 DLedger 叢集。
在 Linux 伺服器上搭建一個 rocketmq 主從同步叢集我想不是一件很難的事情,故本文就不會詳細介紹按照過程,隻貼出相關配置。
實驗環境的部署結構采取 一主一次,其部署圖如下:

下面我就重點貼一下 broker 的配置檔案。
220 上的 broker 配置檔案如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.0.220
brokerIP2=192.168.0.220
namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
221 上 broker 的配置檔案如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.0.221
brokerIP2=192.168.0.221
namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
相關的啟動指令如下:
nohup bin/mqnamesrv /dev/null 2>&1 &
nohup bin/mqbroker -c conf/broker.conf /dev/null 2>&1 &
安裝後的叢集資訊如圖所示:
3、主從同步叢集更新到DLedger
3.1 部署架構
DLedger 叢集至少需要3台機器,故搭建 DLedger 還需要再引入一台機器,其部署結構圖如下:
從主從同步叢集更新到 DLedger 叢集,使用者最關心的還是更新後的叢集是否能夠相容原先的資料,即原先存儲在消息能否能被消息消費者消費端,甚至于能否查詢到。
為了友善後續驗證,首先我使用下述程式向 mq 叢集中添加了一篇友善查詢的消息(設定消息的key)。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test");
producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876");
producer.start();
for(int i =600000; i < 600100; i ++) {
try {
Message msg = new Message("topic_dw_test_by_order_01",null , "m" + i,("Hello RocketMQ" + i ).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
//System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
System.out.println("end");
}
}
消息的查詢結果示例如下:
3.2 更新步驟
Step1:将 192.168.0.220 的 rocketmq 拷貝到 192.168.0.222,可以使用如下指令進行操作。在 192.168.0.220 上敲如下指令:
scp -r rocketmq-all-4.5.2-bin-release/ [email protected]:/opt/application/rocketmq-all-4.5.2-bin-release
溫馨提示:示例中由于版本是一樣,實際過程中,版本需要更新,故需先下載下傳最新的版本,然後将老叢集中的 store 目錄完整的拷貝到新叢集的 store 目錄。
Step2:依次在三台伺服器的 broker.conf 配置檔案中添加與 dledger 相關的配置屬性。
192.168.0.220 broker配置檔案如下:
brokerClusterName = DefaultCluster
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.0.220
brokerIP2=192.168.0.220
namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
# 與 dledger 相關的屬性
enableDLegerCommitLog=true
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
dLegerGroup=broker-a
dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
dLegerSelfId=n0
192.168.0.221 broker配置檔案如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.0.221
brokerIP2=192.168.0.221
namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
# 與dledger 相關的配置屬性
enableDLegerCommitLog=true
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
dLegerGroup=broker-a
dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
dLegerSelfId=n1
192.168.0.222 broker配置檔案如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.0.222
brokerIP2=192.168.0.222
namesrvAddr=192.168.0.221:9876;192.168.0.220:9876
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store
storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=false
# 與 dledger 相關的配置
enableDLegerCommitLog=true
storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store
dLegerGroup=broker-a
dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911
dLegerSelfId=n2
溫馨提示:legerSelfId 分别為 n0、n1、n2。在真實的生産環境中,broker配置檔案中的 storePathRootDir、storePathCommitLog 盡量使用單獨的根目錄,這樣判斷其磁盤使用率時才不會互相影響。
Step3:将 store/config 下的 所有檔案拷貝到 dledger store 的 congfig 目錄下。
cd /opt/application/rocketmq-all-4.5.2-bin-release/store/
cp config/* dledger_store/config/
溫馨提示:該步驟按照各自按照時配置的目錄進行複制即可。
Step4:依次啟動三台 broker。
nohup bin/mqbroker -c conf/broker.conf /dev/null 2>&1 &
如果啟動成功,則在 rocketmq-console 中看到的叢集資訊如下:
3.3 驗證消息發送與消息查找
首先我們先驗證更新之前的消息是否能查詢到,那我們還是查找key 為 m600000 的消息,查找結果如圖所示:
然後我們來測試一下消息發送。測試代碼如下:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test");
producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876");
producer.start();
for(int i =600200; i < 600300; i ++) {
try {
Message msg = new Message("topic_dw_test_by_order_01",null , "m" + i,("Hello RocketMQ" + i ).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
System.out.println("end");
}
}
執行結果如下:
再去控制台查詢一下消息,其結果也表明新的消息也能查詢到。
最後我們再來驗證一下主節點當機,消息發送是否會受影響。
在消息發送的過程中,去關閉主節點,其截圖如下:
再來看一下叢集的狀态:
等待該複制組重新完成主伺服器選舉後,即可繼續處理消息發送。
溫馨提示:由于本示例是一主一從,故在選舉期間,消息不可用,但在真實的生産環境上,其部署架構是多主主從,即一個複制組在 leader 選舉期間,其他複制組可以接替該複制組完成消息的發送,實作消息服務的高可用。
與 DLedger 相關的日志,預設存儲在 broker_default.log 檔案中。
4、源碼分析 RocketMQ 系列文章
1、
RocketMQ 多副本前置篇:初探raft協定2、
源碼分析 RocketMQ DLedger 多副本之 Leader 選主3、
源碼分析 RocketMQ DLedger 多副本存儲實作4、
源碼分析 RocketMQ DLedger(多副本) 之日志追加流程5、
源碼分析 RocketMQ DLedger(多副本) 之日志複制(傳播)6、
基于 raft 協定的 RocketMQ DLedger 多副本日志複制設計原理7、
RocketMQ 整合 DLedger(多副本)即主從切換實作平滑更新的設計技巧8、
源碼分析 RocketMQ DLedger 多副本即主從切換實作原理原文釋出時間為:2019-10-13
本文作者:丁威,《RocketMQ技術内幕》作者。
本文來自
中間件興趣圈,了解相關資訊可以關注
。