分布式系統的三大理論CAP就不說了,但是作為分布式消息系統的rocketmq, 主從功能是最最基礎的保證可用性的手段了。也許該功能現在已經不是很常用了,但是對于我們了解一些分布式系統的常用工作原理還是有些積極意義的。
今天就一起來挖挖rocketmq是如何實作主從資料同步的吧。
1. 主從同步概述
主從同步這個概念相信大家在平時的工作中,多少都會聽到。其目的主要是用于做一備份類操作,以及一些讀寫分離場景。比如我們常用的關系型資料庫mysql,就有主從同步功能在。
主從同步,就是将主伺服器上的資料同步到從伺服器上,也就是相當于新增了一個副本。
而具體的主從同步的實作也各有千秋,如mysql中通過binlog實作主從同步,es中通過translog實作主從同步,redis中通過aof實作主從同步。那麼,rocketmq又是如何實作的主從同步呢?
另外,主從同步需要考慮的問題是哪些呢?
1. 資料同步的及時性?(延遲與一緻性)
2. 對主伺服器的影響性?(可用性)
3. 是否可替代主伺服器?(可用性或者分區容忍性)
前面兩個點是必須要考慮的,但對于第3個點,則可能不會被考慮。因為通過系統可能無法很好的做到這一點,是以很多系統就直接忽略這一點了,簡單嘛。即很多時候隻把從伺服器當作是一個備份存在,不會接受寫請求。如果要進行主從切換,必須要人工介入,做預知的有損切換。但随着技術的發展,現在已有非常多的自動切換主從的服務存在,這是在分布式系統滿天下的當今的必然趨勢。
2. rocketmq主從同步配置
在rocketmq中,最核心的元件是 broker, 它負責幾乎所有的存儲讀取業務。是以,要談主從同步,那必然是針對broker進行的。我們再來回看rocketmq的部署架構圖,以便全局觀察:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZwpmL4cjM1gDOzEDOtMDOxgDMzUDMycjMyEDMyAjMtEzM3AzM48CXyEDMyAjMvwVMzcDMzgzLcd2bsJ2Lc12bj5ycn9Gbi52YuAjMwIzZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
非常清晰的架構,無須多言。因為我們講的是主從同步,是以隻看broker這個元件,那麼整個架構就可以簡化為: BrokerMaster -> BrokerSlave 了。同樣,再簡化,主從同步就是如何将Master的資料同步到Slave這麼個過程。
那麼,如何配置使用主從同步呢?
conf/broker-a.properties (master配置)
#所屬叢集名字
brokerClusterName=DefaultCluster
#broker名字,名字可重複,為了管理,每個master起一個名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#Broker 的角色
#- ASYNC_MASTER 異步複制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#nameServer位址,分号分割
namesrvAddr=172.0.1.5:9876;172.0.1.6:9876
#在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口,
listenPort=10911
#删除檔案時間點,預設淩晨 4點
deleteWhen=04
#檔案保留時間,預設 48 小時
fileReservedTime=120
#commitLog每個檔案的大小預設1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個檔案預設存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測實體檔案磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/rocketmq/store/broker-a
#commitLog 存儲路徑
storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/rocketmq/store/broker-a/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/rocketmq/store/broker-a/index
#checkpoint 檔案存儲路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 檔案存儲路徑
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
conf/broker-a-s.properties (slave配置)
#所屬叢集名字
brokerClusterName=DefaultCluster
#broker名字,名字可重複,為了管理,每個master起一個名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#Broker 的角色
#- ASYNC_MASTER 異步複制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SLAVE
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#nameServer位址,分号分割
namesrvAddr=172.0.1.5:9876;172.0.1.6:9876
#在發送消息時,自動建立伺服器不存在的topic,預設建立的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口,
listenPort=10920
#删除檔案時間點,預設淩晨 4點
deleteWhen=04
#檔案保留時間,預設 48 小時
fileReservedTime=120
#commitLog每個檔案的大小預設1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個檔案預設存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測實體檔案磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/rocketmq/store/broker-a-s
#commitLog 存儲路徑
storePathCommitLog=/usr/local/rocketmq/store/broker-a-s/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/rocketmq/store/broker-a-s/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/rocketmq/store/broker-a-s/index
#checkpoint 檔案存儲路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 檔案存儲路徑
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
實際上具體配置檔案叫什麼名字不重要,重要的是要在啟動時指定指定對應的配置檔案位置即可。啟動master/slave指令如下:
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties > logs/broker-a.log 2>&1 &
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties > logs/broker-a-s.log 2>&1 &
以上配置,如果怕啟動指令出錯,也可以統一使用一個 broker.properties (預設查找), 裡面寫不同的内容,這樣就無需在不同機器上使用不同的指令啟動了,也避免了一定程度的誤操作。
當然要在啟動broker之前啟動nameserver節點。這樣,一個rocketmq的主從叢集就配置好了。配置項看起來有點多,但核心實際上隻有一個:在保持brokderName相同的前提下配置brokerRole=ASYNC_MASTER|SLAVE|SYNC_MASTER, 通過這個值就可以确定是主是從。從向主複制資料或者主向從同步資料。
3. rocketmq主從同步的實作
了解完主從配置,才是我們了解實作的開始。也從上面的說明中,我們看出,一個broker是master或者slave是在配置檔案中就指定了的,也就是說這個性質是改不了的了。是以,這個主從相關的動作,會在broker啟動時就表現出不一樣了。
我們先看看broker運作同步的大體架構如何:
// org.apache.rocketmq.broker.BrokerController#start
public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start();
}
if (this.remotingServer != null) {
this.remotingServer.start();
}
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
// 處理SLAVE消息同步
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
// 強制做一次注冊動作
this.registerBrokerAll(true, false, true);
}
// 定期向nameserver注冊自身狀态
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
private void handleSlaveSynchronize(BrokerRole role) {
// 隻有slave節點,才進行同步操作
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
// 設定master節點為空,避免一開始就進行同步
// 後續必然有其他地方設計 master資訊
// 實際上它是在registerBrokerAll() 的時候,将master資訊放入的
this.slaveSynchronize.setMasterAddr(null);
// 10秒鐘同步一次資料
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
// 強制注冊或者進行周期性注冊時間到時,向nameserver注冊自身
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
// 向多個nameserver依次注冊
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
// 更新master位址資訊
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
}
// org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
// 多線程同時注冊多個nameserver, 效果更佳
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
基本上,master與slave差别不大,各broker需要的功能,都會具有的。比如都會開啟各服務端口,都會進行檔案清理動作,都會向nameserver注冊自身等等。唯一的差别在于,slave會另外開啟一個同步的定時任務,每10秒向master發送一次同步請求,即 syncAll(); 那麼,所謂的同步,到底是同步個啥?即其如何實作同步?
所有的主從同步的實作都在這裡了:syncAll();
// org.apache.rocketmq.broker.slave.SlaveSynchronize#syncAll
public void syncAll() {
// 同步topic配置資訊
this.syncTopicConfig();
// 同步消費偏移量資訊
this.syncConsumerOffset();
// 同步延遲資訊
this.syncDelayOffset();
// 同步消費組資訊資料,是以主從同步的核心,是基于消息的訂閱來實作的
this.syncSubscriptionGroupConfig();
}
// 同步topic配置資訊
private void syncTopicConfig() {
String masterAddrBak = this.masterAddr;
// 存在master位址,且該位址不是自身時,才會進行同步動作
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
// 版本發生變更,即資料有變化,則寫入新的版本資料
if (!this.brokerController.getTopicConfigManager().getDataVersion()
.equals(topicWrapper.getDataVersion())) {
this.brokerController.getTopicConfigManager().getDataVersion()
.assignNewOne(topicWrapper.getDataVersion());
this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
this.brokerController.getTopicConfigManager().getTopicConfigTable()
.putAll(topicWrapper.getTopicConfigTable());
// 持久化topic資訊
this.brokerController.getTopicConfigManager().persist();
log.info("Update slave topic config from master, {}", masterAddrBak);
}
} catch (Exception e) {
log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
}
}
}
// 同步消費偏移量資訊
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
log.info("Update slave consumer offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
}
}
}
// 額。。。 反正就是一個數字吧, 存儲在 config/delayOffset.json 下
private void syncDelayOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
String delayOffset =
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
if (delayOffset != null) {
String fileName =
StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
.getMessageStoreConfig().getStorePathRootDir());
try {
MixAll.string2File(delayOffset, fileName);
} catch (IOException e) {
log.error("Persist file Exception, {}", fileName, e);
}
}
log.info("Update slave delay offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
}
}
}
// 同步消費組資訊資料
private void syncSubscriptionGroupConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
SubscriptionGroupWrapper subscriptionWrapper =
this.brokerController.getBrokerOuterAPI()
.getAllSubscriptionGroupConfig(masterAddrBak);
if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
.equals(subscriptionWrapper.getDataVersion())) {
SubscriptionGroupManager subscriptionGroupManager =
this.brokerController.getSubscriptionGroupManager();
subscriptionGroupManager.getDataVersion().assignNewOne(
subscriptionWrapper.getDataVersion());
subscriptionGroupManager.getSubscriptionGroupTable().clear();
subscriptionGroupManager.getSubscriptionGroupTable().putAll(
subscriptionWrapper.getSubscriptionGroupTable());
// 持久化消費組資訊
subscriptionGroupManager.persist();
log.info("Update slave Subscription Group from master, {}", masterAddrBak);
}
} catch (Exception e) {
log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
}
}
}
以上,就是rocketmq的主從同步的主體架構代碼了。回答上面的幾個疑問:同步個啥?同步4種資料:topic資訊、消費偏移資訊、延遲資訊、訂閱組資訊;同步的及時性如何?每10秒發起一步同步請求,即延遲是10秒級的。
等等,以上同步的資訊,看起來都是中繼資料資訊。那麼消息資料的同步去哪裡了?這可是我們最關心的啊!
4. rocketmq消息資料的同步實作
經過上一節的分析,我們好像摸到了點皮毛,然後發現不是想要的。因為定時任務隻同步了中繼資料資訊,而真正的資料資訊同步去了哪裡呢?實際上,它是由一個HAService去承載該功能的,HAService會使用的一個主循環,一直不停地向master拉取資料,然後添加到自身的commitlog檔案中,進而實作真正的資料同步。
4.1. HAService的開啟
同步服務是一系列專門的實作的,它包括server端,用戶端以及一些維護線程。這需要我們分開了解。同步服務的開啟,是在messageStore初始化時做的。它會讀取一個單獨的端口配置,開啟HA同步服務。
// org.apache.rocketmq.store.DefaultMessageStore#DefaultMessageStore
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
this.consumeQueueTable = new ConcurrentHashMap<>(32);
this.flushConsumeQueueService = new FlushConsumeQueueService();
this.cleanCommitLogService = new CleanCommitLogService();
this.cleanConsumeQueueService = new CleanConsumeQueueService();
this.storeStatsService = new StoreStatsService();
this.indexService = new IndexService(this);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 初始化 HAService
this.haService = new HAService(this);
} else {
this.haService = null;
}
...
File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
MappedFile.ensureDirOK(file.getParent());
lockFile = new RandomAccessFile(file, "rw");
}
// org.apache.rocketmq.store.ha.HAService#HAService
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
// 開啟server端服務
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
// 初始化client
this.haClient = new HAClient();
}
// 具體運作則都會被視為一個個的背景線程,會在start()操作中統一運作起來
public void start() throws Exception {
// server端服務啟動,由master節點管控
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
// 資料中轉服務,它會接收使用者的寫請求,然後吐資料給到各slave節點
this.groupTransferService.start();
// 用戶端請求服務,由slave節點發起
this.haClient.start();
}
HAService作為rocketmq中的一個小型服務,運作在背景線程中,為了簡單起見或者資源隔離,它使用一些單獨的端口和通信實作處理。也可謂麻雀雖小,五髒俱全。下面我就分三個單獨的部分講解下如何實作資料同步。
4.2. 從節點同步實作
從節點負責主動拉取主節點資料,是一個比較重要的步驟。它的實作是在 HAClient 中的,該client啟動起來之後,會一直不停地向master請求新的資料,然後同步到自己的commitlog中。
// org.apache.rocketmq.store.ha.HAService.HAClient#run
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 使用原生nio, 嘗試連接配接至master
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
// 隔一段時間向master彙報一次本slave的同步資訊
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
// 如果連接配接無效,則關閉,下次再循環周期将會重新發起連接配接
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
// 核心邏輯:處理擷取到的消息資料
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
// 未連接配接成功,5秒後重試,可能會一直無用
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
private boolean connectMaster() throws ClosedChannelException {
// 單例長連結
if (null == socketChannel) {
String addr = this.masterAddress.get();
// 如果沒有master, 則傳回空
// 針對master節點,也是同樣的運作,隻是不會連接配接到任何節點而已
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 原生nio實作
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
// org.apache.rocketmq.remoting.common.RemotingUtil#connect
public static SocketChannel connect(SocketAddress remote) {
return connect(remote, 1000 * 5);
}
public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {
SocketChannel sc = null;
try {
sc = SocketChannel.open();
sc.configureBlocking(true);
sc.socket().setSoLinger(false, -1);
sc.socket().setTcpNoDelay(true);
sc.socket().setReceiveBufferSize(1024 * 64);
sc.socket().setSendBufferSize(1024 * 64);
sc.socket().connect(remote, timeoutMillis);
sc.configureBlocking(false);
return sc;
} catch (Exception e) {
if (sc != null) {
try {
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
return null;
}
processReadEvent() 即是在收到master的新資料後,實作如何同步到本broker的commitlog中。其實作主要還是依賴于commitlogService.
// org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
private boolean dispatchReadRequest() {
// 按協定讀取資料
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
if (diff >= msgHeaderSize) {
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
// 資料讀取完成,則立即添加到存儲中
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
this.byteBufferRead.get(bodyData);
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
// org.apache.rocketmq.store.DefaultMessageStore#appendToCommitLog
@Override
public boolean appendToCommitLog(long startOffset, byte[] data) {
if (this.shutdown) {
log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
return false;
}
// 添加到commitlog中,并生成後續的consumeQueue,index等相關資訊
boolean result = this.commitLog.appendData(startOffset, data);
if (result) {
this.reputMessageService.wakeup();
} else {
log.error("appendToPhyQueue failed " + startOffset + " " + data.length);
}
return result;
}
從slave節點的處理流程,我們基本上已經完全搞清楚了rocketmq如何同步資料的了。單獨開啟一個端口用于同步資料,slave一直不停地輪詢master, 拿到新資料後,就将其添加到自身的commitlog中,構造自身的資料集。進而保持與master的同步。(請需要注意資料一緻性)
4.3. master的資料同步服務
從節點負責不停從主節點拉取資料,是以主節點隻要給到資料就可以了。但至少,主節點還是有一個網絡服務,以便接受從節點的請求。
這同樣是在 HAService中,它直接以nio的形式開啟一個服務端口,進而接收請求:
// org.apache.rocketmq.store.ha.HAService.AcceptSocketService
/**
* Listens to slave connections to create {@link HAConnection}.
*/
class AcceptSocketService extends ServiceThread {
private final SocketAddress socketAddressListen;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
// 給定端口監聽
public AcceptSocketService(final int port) {
this.socketAddressListen = new InetSocketAddress(port);
}
/**
* Starts listening to slave connections.
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
/**
* {@inheritDoc}
*/
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
HAConnection conn = new HAConnection(HAService.this, sc);
// accept 接入後,開啟另外的讀線程處理資料請求
conn.start();
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
...
}
// org.apache.rocketmq.store.ha.HAConnection#start
public void start() {
this.readSocketService.start();
this.writeSocketService.start();
}
// org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
this.makeStop();
writeSocketService.makeStop();
haService.removeConnection(HAConnection.this);
HAConnection.this.haService.getConnectionCount().decrementAndGet();
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPosition = pos;
// 讀取唯一參數
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
// ...
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
// org.apache.rocketmq.store.ha.HAService#notifyTransferSome
public void notifyTransferSome(final long offset) {
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
this.groupTransferService.notifyTransferSome();
break;
} else {
value = this.push2SlaveMaxOffset.get();
}
}
}
端口開啟及接受請求很容易,但如何響應用戶端還是有點複雜的。各自同學自行深入吧!
GroupCommitService 通過一個寫隊列和讀隊列,在有消息寫入時将被調用,進而達到實時通知的目的。
// org.apache.rocketmq.store.ha.HAService.GroupTransferService#putRequest
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
this.wakeup();
}
public void notifyTransferSome() {
this.notifyTransferObject.wakeup();
}
private void swapRequests() {
// 交換buffer
List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead.clear();
}
}
}
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
至此,rocketmq主從同步解析完成。rocketmq基于commitlog實作核心主從同步,以及其他多個中繼資料資訊的簡單定時同步,并以兩個緩沖buffer的形式,及時将資料推送到從節點。保證了盡量好的資料一緻性。
最後,我們需要注意一個問題,就是主從的資料一緻性到底是如何保證的?因為主的資料是直接寫入的,那麼從的資料又如何保證與主的一樣,或者簡單說就是,如何保證寫入的順序呢?如果某兩條記錄插入commitlog的順序不一樣,那麼最終就會亂序,結果就完不一樣了,比如進行主從切換,那麼如果使用相同的偏移量進行取值,必然會得到不一樣的結果。
實際上,從伺服器僅使用一條線程進行資料同步,即拉取到的資料順序是一緻的,寫入commitlog也是用同一條線程進行寫入,自然就不會存在亂序問題了。這可能也是主從同步不能使用netty這種通信架構的原因,沒必要也不能做。主從同步要求保證嚴格的順序性,而無需過多考慮并發性。就像redis的單線程,同樣撐起超高的性能。rocketmq主從同步基于原生 nio, 加上pagecache, mmap 同樣實作了超高的性能。也就無需單線程同步會導緻很大延遲了。
不要害怕今日的苦,你要相信明天,更苦!