天天看點

分布式 | DBLE 心跳檢測子產品解析

概述

本文主要介紹 DBLE 心跳檢測子產品,内容包括心跳檢測作用及心跳檢測子產品源碼解析兩部分。

心跳檢測作用

DBLE 中心跳檢測的作用有以下三點:

1. 控制多個寫節點高可用切換;

2. 控制讀操作的負載均衡,會根據最近一次的心跳狀态,及主從延遲(如果配置了

slaveThreshold

主從延遲門檻值的話)來控制讀負載均衡;

3. 控制空閑連接配接數大小,關閉多餘空閑連接配接。這裡發送的是 PING 包,需與 dataNodeIdleCheckPeriod 參數配合,超過此參數的空閑連接配接會通過發送 PING 包來檢查。

總的來講,就是判斷 MySQL 執行個體的狀态。

本文中主要講解前兩點涉及到的心跳檢測内容,第 3 點更适合在連接配接管理中講,本文暫不涉及。

心跳子產品源碼解析

心跳檢測定時任務開始入口在

Scheduler#init

方法中,以

dataNodeHeartbeatPeriod

間隔定期進行心跳檢測,預設值為 10 秒:

scheduler.scheduleAtFixedRate(dataSourceHeartbeat(), 0L, system.getDataNodeHeartbeatPeriod(), TimeUnit.MILLISECONDS);           

複制

Scheduler#dataSourceHeartbeat

方法傳回 Runnable 任務:

private Runnable dataSourceHeartbeat() {
 
        return new Runnable() {
 
            @Override
 
            public void run() {
 
                timerExecutor.execute(new Runnable() {
 
                    @Override
 
                    public void run() {
 
                    //這裡有個判斷,如果讀寫節點都沒有,自然不需要心跳檢測了
 
                        if (!DbleServer.getInstance().getConfig().isDataHostWithoutWR()) {
 
                            Map<String, AbstractPhysicalDBPool> hosts = DbleServer.getInstance().getConfig().getDataHosts();
 
                            for (AbstractPhysicalDBPool host : hosts.values()) {
 
                            //調用了AbstractPhysicalDBPool的doHeartbeat()方法
 
                                host.doHeartbeat();
 
                            }
 
                        }
 
                    }
 
                });
 
            }
 
        };
 
    }
            

複制

AbstractPhysicalDBPool#doHeartbeat

為抽象方法,有兩個實作分别在類

PhysicalDNPoolSingleWH

PhysicalDBPool

中,這兩個類的差別從名字就可以看出來,一個是隻有一個 WriteHost,另一個則有多個 WriteHost,會根據你們的 schema.xml 中的具體配置決定初始化哪一個。

對于心跳檢測來說,基本實作都一樣,是以看哪一個類并不影響。

我們就來看下

PhysicalDNPoolSingleWH#doHeartbeat

方法吧:

public void doHeartbeat() {
 
        for (PhysicalDatasource source : allSourceMap.values()) {
 
            if (source != null) {
 
                source.doHeartbeat();
 
            } else {
 
                LOGGER.warn(hostName + " current dataSource is null!");
 
            }
 
        }
 
    }           

複制

上述方法其實就是循環周遊所有資料源,然後對每個資料源進行心跳檢測了。

繼續來看

PhysicalDatasource#doHeartbeat

方法,補充說明一下,

PhysicalDatasource

也是抽象類,但在 DBLE 中隻有

MySQLDatasource

一個實作,因為 DBLE 後端隻支援 MySQL 嘛,

MySQLDatasource#doHeartbeat

方法也是直接繼承了抽象類的實作:

public void doHeartbeat() {
 
        if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) {
 
            return;
 
        }
 
        if (!heartbeat.isStop()) {
 
            //這裡直接調用了MySQLHeartbeat#heartbeat方法
 
            heartbeat.heartbeat();
 
        }
 
    }
            

複制

再繼續看

MySQLHeartbeat#heartbeat

方法前,先來看下

MySQLDatasource

MySQLHeartbeat

類之間的關系:

分布式 | DBLE 心跳檢測子產品解析

它們之間的關系很簡單,就是

MySQLDatasource

會建立

MySQLHeartbeat

,并且它們之間有一對一的關聯關系。

簡單來說就是一個

MySQLDatasource

對象就有一個

MySQLHeartbeat

對象來負責它的心跳檢測。

進一步來看

MySQLHeartbeat#heartbeat

方法:

public void heartbeat() {
 
        final ReentrantLock reentrantLock = this.lock;
 
        reentrantLock.lock();
 
        try {
 
            if (isChecking.compareAndSet(false, true)) {
 
                if (detector == null || detector.isQuit()) {
 
                    try {
 
                        detector = new MySQLDetector(this);
 
                        detector.heartbeat();
 
                    } catch (Exception e) {
 
                        LOGGER.info(source.getConfig().toString(), e);
 
                        setResult(ERROR_STATUS);
 
                    }
 
                } else {
 
                    detector.heartbeat();
 
                }
 
            } else {
 
                if (detector != null) {
 
                    if (detector.isQuit()) {
 
                        isChecking.compareAndSet(true, false);
 
                    } else if (detector.isHeartbeatTimeout()) {
 
                        setResult(TIMEOUT_STATUS);
 
                    }
 
                }
 
            }
 
        } finally {
 
            reentrantLock.unlock();
 
        }
 
    }
            

複制

上述方法主要是調用了

MySQLDetector#heartbeat

方法,調用鍊真的挺深的……:

public void heartbeat() {
 
        if (con == null || con.isClosed()) {
 
            heartbeat.setResult(MySQLHeartbeat.ERROR_STATUS);
 
            return;
 
        }
 
        //設定了發送心跳檢測的時間
 
        lastSendQryTime = System.currentTimeMillis();
 


 


 
        String[] fetchCols = {};
 
        if (heartbeat.getSource().getHostConfig().isShowSlaveSql()) {
 
            fetchCols = MYSQL_SLAVE_STATUS_COLS;
 
        } else if (heartbeat.getSource().getHostConfig().isShowClusterSql()) {
 
            fetchCols = MYSQL_CLUSTER_STATUS_COLS;
 
        } else if (heartbeat.getSource().getHostConfig().isSelectReadOnlySql()) {
 
            fetchCols = MYSQL_READ_ONLY_COLS;
 
        }
 


 
        if (LOGGER.isDebugEnabled()) {
 
            LOGGER.debug("do heartbeat,conn is " + con);
 
        }
 
        OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(fetchCols, this);
 
        sqlJob = new HeartbeatSQLJob(heartbeat.getHeartbeatSQL(), con, resultHandler);
 
        //執行心跳檢測任務
 
        sqlJob.execute();
 
    }
            

複制

簡單說下該方法,該方法會根據你配置的心跳語句,實際執行檢測後端 MySQL 狀态,并存儲相應的資料,這裡涉及到了異步調用,檢測完成後将會回調

MySQLDetector#onResult

方法:

public void onResult(SQLQueryResult<Map<String, String>> result) {
 
        //設定心跳檢測完成後的時間
 
        lastReceivedQryTime = System.currentTimeMillis();
 
        heartbeat.getRecorder().set((lastReceivedQryTime - lastSendQryTime));
 
        if (result.isSuccess()) {
 
            PhysicalDatasource source = heartbeat.getSource();
 
            Map<String, String> resultResult = result.getResult();
 
            if (source.getHostConfig().isShowSlaveSql()) {
 
                setStatusBySlave(source, resultResult);
 
            } else if (source.getHostConfig().isShowClusterSql()) {
 
                setStatusByCluster(resultResult);
 
            } else if (source.getHostConfig().isSelectReadOnlySql()) {
 
                setStatusByReadOnly(source, resultResult);
 
            } else {
 
                setStatusForNormalHeartbeat(source);
 
            }
 
        } else {
 
            heartbeat.setResult(MySQLHeartbeat.ERROR_STATUS);
 
        }
 
    }
            

複制

上述方法就是根據心跳檢測結果,來設定

MySQLHeartbeat

類中表示心跳狀态的各個變量了,比如

status

變量,

slaveBehindMaster

主從延遲時間變量。

上述整個過程就完成了資料源的心跳檢測,關于檢測結果的使用主要通過

MySQLHeartbeat

類中的

getStatus

getSlaveBehindMaster

方法,通過這兩個方法來判斷心跳是否成功,以及主從延遲多少,進而影響資料源切換及讀寫分離邏輯,分别對應心跳檢測作用的第 1、2 點。

總結

本文主要講解了 DBLE 心跳檢測子產品,包括心跳檢測作用以及相應源碼解析,希望本文能幫助大家進一步了解心跳檢測子產品。