概述
本文主要介紹 DBLE 心跳檢測子產品,内容包括心跳檢測作用及心跳檢測子產品源碼解析兩部分。
心跳檢測作用
DBLE 中心跳檢測的作用有以下三點:
1. 控制多個寫節點高可用切換;
2. 控制讀操作的負載均衡,會根據最近一次的心跳狀态,及主從延遲(如果配置了
slaveThreshold
主從延遲門檻值的話)來控制讀負載均衡;
3. 控制空閑連接配接數大小,關閉多餘空閑連接配接。這裡發送的是 PING 包,需與 dataNodeIdleCheckPeriod 參數配合,超過此參數的空閑連接配接會通過發送 PING 包來檢查。
總的來講,就是判斷 MySQL 執行個體的狀态。
本文中主要講解前兩點涉及到的心跳檢測内容,第 3 點更适合在連接配接管理中講,本文暫不涉及。
心跳子產品源碼解析
心跳檢測定時任務開始入口在
Scheduler#init
方法中,以
dataNodeHeartbeatPeriod
間隔定期進行心跳檢測,預設值為 10 秒:
scheduler.scheduleAtFixedRate(dataSourceHeartbeat(), 0L, system.getDataNodeHeartbeatPeriod(), TimeUnit.MILLISECONDS);
複制
Scheduler#dataSourceHeartbeat
方法傳回 Runnable 任務:
private Runnable dataSourceHeartbeat() {
return new Runnable() {
@Override
public void run() {
timerExecutor.execute(new Runnable() {
@Override
public void run() {
//這裡有個判斷,如果讀寫節點都沒有,自然不需要心跳檢測了
if (!DbleServer.getInstance().getConfig().isDataHostWithoutWR()) {
Map<String, AbstractPhysicalDBPool> hosts = DbleServer.getInstance().getConfig().getDataHosts();
for (AbstractPhysicalDBPool host : hosts.values()) {
//調用了AbstractPhysicalDBPool的doHeartbeat()方法
host.doHeartbeat();
}
}
}
});
}
};
}
複制
AbstractPhysicalDBPool#doHeartbeat
為抽象方法,有兩個實作分别在類
PhysicalDNPoolSingleWH
和
PhysicalDBPool
中,這兩個類的差別從名字就可以看出來,一個是隻有一個 WriteHost,另一個則有多個 WriteHost,會根據你們的 schema.xml 中的具體配置決定初始化哪一個。
對于心跳檢測來說,基本實作都一樣,是以看哪一個類并不影響。
我們就來看下
PhysicalDNPoolSingleWH#doHeartbeat
方法吧:
public void doHeartbeat() {
for (PhysicalDatasource source : allSourceMap.values()) {
if (source != null) {
source.doHeartbeat();
} else {
LOGGER.warn(hostName + " current dataSource is null!");
}
}
}
複制
上述方法其實就是循環周遊所有資料源,然後對每個資料源進行心跳檢測了。
繼續來看
PhysicalDatasource#doHeartbeat
方法,補充說明一下,
PhysicalDatasource
也是抽象類,但在 DBLE 中隻有
MySQLDatasource
一個實作,因為 DBLE 後端隻支援 MySQL 嘛,
MySQLDatasource#doHeartbeat
方法也是直接繼承了抽象類的實作:
public void doHeartbeat() {
if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) {
return;
}
if (!heartbeat.isStop()) {
//這裡直接調用了MySQLHeartbeat#heartbeat方法
heartbeat.heartbeat();
}
}
複制
再繼續看
MySQLHeartbeat#heartbeat
方法前,先來看下
MySQLDatasource
和
MySQLHeartbeat
類之間的關系:

它們之間的關系很簡單,就是
MySQLDatasource
會建立
MySQLHeartbeat
,并且它們之間有一對一的關聯關系。
簡單來說就是一個
MySQLDatasource
對象就有一個
MySQLHeartbeat
對象來負責它的心跳檢測。
進一步來看
MySQLHeartbeat#heartbeat
方法:
public void heartbeat() {
final ReentrantLock reentrantLock = this.lock;
reentrantLock.lock();
try {
if (isChecking.compareAndSet(false, true)) {
if (detector == null || detector.isQuit()) {
try {
detector = new MySQLDetector(this);
detector.heartbeat();
} catch (Exception e) {
LOGGER.info(source.getConfig().toString(), e);
setResult(ERROR_STATUS);
}
} else {
detector.heartbeat();
}
} else {
if (detector != null) {
if (detector.isQuit()) {
isChecking.compareAndSet(true, false);
} else if (detector.isHeartbeatTimeout()) {
setResult(TIMEOUT_STATUS);
}
}
}
} finally {
reentrantLock.unlock();
}
}
複制
上述方法主要是調用了
MySQLDetector#heartbeat
方法,調用鍊真的挺深的……:
public void heartbeat() {
if (con == null || con.isClosed()) {
heartbeat.setResult(MySQLHeartbeat.ERROR_STATUS);
return;
}
//設定了發送心跳檢測的時間
lastSendQryTime = System.currentTimeMillis();
String[] fetchCols = {};
if (heartbeat.getSource().getHostConfig().isShowSlaveSql()) {
fetchCols = MYSQL_SLAVE_STATUS_COLS;
} else if (heartbeat.getSource().getHostConfig().isShowClusterSql()) {
fetchCols = MYSQL_CLUSTER_STATUS_COLS;
} else if (heartbeat.getSource().getHostConfig().isSelectReadOnlySql()) {
fetchCols = MYSQL_READ_ONLY_COLS;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("do heartbeat,conn is " + con);
}
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(fetchCols, this);
sqlJob = new HeartbeatSQLJob(heartbeat.getHeartbeatSQL(), con, resultHandler);
//執行心跳檢測任務
sqlJob.execute();
}
複制
簡單說下該方法,該方法會根據你配置的心跳語句,實際執行檢測後端 MySQL 狀态,并存儲相應的資料,這裡涉及到了異步調用,檢測完成後将會回調
MySQLDetector#onResult
方法:
public void onResult(SQLQueryResult<Map<String, String>> result) {
//設定心跳檢測完成後的時間
lastReceivedQryTime = System.currentTimeMillis();
heartbeat.getRecorder().set((lastReceivedQryTime - lastSendQryTime));
if (result.isSuccess()) {
PhysicalDatasource source = heartbeat.getSource();
Map<String, String> resultResult = result.getResult();
if (source.getHostConfig().isShowSlaveSql()) {
setStatusBySlave(source, resultResult);
} else if (source.getHostConfig().isShowClusterSql()) {
setStatusByCluster(resultResult);
} else if (source.getHostConfig().isSelectReadOnlySql()) {
setStatusByReadOnly(source, resultResult);
} else {
setStatusForNormalHeartbeat(source);
}
} else {
heartbeat.setResult(MySQLHeartbeat.ERROR_STATUS);
}
}
複制
上述方法就是根據心跳檢測結果,來設定
MySQLHeartbeat
類中表示心跳狀态的各個變量了,比如
status
變量,
slaveBehindMaster
主從延遲時間變量。
上述整個過程就完成了資料源的心跳檢測,關于檢測結果的使用主要通過
MySQLHeartbeat
類中的
getStatus
和
getSlaveBehindMaster
方法,通過這兩個方法來判斷心跳是否成功,以及主從延遲多少,進而影響資料源切換及讀寫分離邏輯,分别對應心跳檢測作用的第 1、2 點。
總結
本文主要講解了 DBLE 心跳檢測子產品,包括心跳檢測作用以及相應源碼解析,希望本文能幫助大家進一步了解心跳檢測子產品。