天天看點

[ClickHouse] 使用 Bifrost 全量(實時)同步 MySQL 資料

簡介

本文記錄一個從 MySQL 同步資料到 ClickHouse 的方案. 在 GitHub 上以 MySQL 和 ClickHouse 為關鍵字搜尋到幾個相關的開源項目, 首先試了下 synch , 結果在安裝依賴包 mysqlclient 時一直失敗, 然後改為使用 Bifrost 這個項目.

Bifrost 目前有接近 700 個 star, 從這兩天的使用來看, Bifrost 的功能比較齊全, 基本能滿足我們的使用場景, 比如全量同步/增量同步/表的批量操作等, 而且經過測試發現速度還是挺快的, 要說不足的話, 主要在于界面的操作不夠簡潔自然, 好在官方文檔比較詳細, 第一次使用不會遇到太多困難.

安裝

具體的安裝和使用可以參考官方文檔, 安裝時推薦使用二進制安裝方式, 第一次安裝後若出現無法打開管理界面的情況, 可能是因為沒有生成 HTTPS 證書, 這一點需要注意, 生成證書的指令如下

[ClickHouse] 使用 Bifrost 全量(實時)同步 MySQL 資料

啟動

安裝完成後, 進入

bin

目錄, 執行

./Bifrost-server start

啟動項目, 下面是安裝好之後的管理界面

[ClickHouse] 使用 Bifrost 全量(實時)同步 MySQL 資料

問題彙總

1. 配置完資料同步資訊後, 若資料沒有開始同步, 問題可能是:
  • MySQL 的 binlog 沒有打開
  • MySQL 的

    GTID_MODE=OFF

    , 開啟指令為
SET GLOBAL ENFORCE_GTID_CONSISTENCY = 'WARN';
SET GLOBAL ENFORCE_GTID_CONSISTENCY = 'ON';
SET GLOBAL GTID_MODE = 'OFF_PERMISSIVE';
SET GLOBAL GTID_MODE = 'ON_PERMISSIVE';
SET GLOBAL GTID_MODE = 'ON';
           
2. 在重新開機服務的時候, 有可能會遇到配置檔案被覆寫的情況, 具體原因不太清楚, 應對方法是備份配置檔案, 配置檔案的路徑為

./data/db.Bifrost

3. 有時同步位點會落後于 MySQL 的 binlog 位點, 為了保證資料的一緻性, 需要定期檢查同步過程是否正常, 檢查方式為比對目前同步位點和MySQL的最新位點, 檢查代碼如下

注意填寫正确的IP/端口/使用者名/密碼/資料源名稱(代碼裡的MySQL_SOURCE)

import requests
import json
import pandas as pd
import time

def get_header():
    url = 'http://ip:21036/dologin'
    data = {'UserName': 'xxx', 'Password': 'xxx'}
    header = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36'}
    ret = requests.post(url, data=json.dumps(data), headers=header)
    cookie = 'xgo_cookie=' + ret.cookies.get_dict()['xgo_cookie']
    header['Cookie'] = cookie

    return header

def get_curr_pos():
    header = get_header()
    ret = requests.get(
        'http://ip:21036/db/index',
        headers=header).text
    curr_pos = pd.read_html(ret)[0].Position.iloc[0]
    curr_pos = ''.join(curr_pos.split(' ')[:3])

    return curr_pos

def get_last_pos():
    header = get_header()
    ret = requests.post(
        'http://ip:21036/db/get_last_position',
        json={'DbName': 'MySQL_SOURCE'}, 
        headers=header).text
    ret = json.loads(ret)
    last_pos = f"{ret['data']['BinlogFile']}{ret['data']['BinlogPosition']}"
    delay = ret['data']['DelayedTime']

    return last_pos, delay


def is_pos_right():
    '''檢查 Bifrost 目前同步位點是否等于 MySQL 最新位點'''
    curr_pos = get_curr_pos()
    last_pos, delay = get_last_pos()

    if curr_pos == last_pos:
        return True

    if delay >= 600:
        return False
    else:
        time.sleep(10)
        is_pos_right()


if __name__ == '__main__':
    if is_pos_right():
        print('Bifrost is working correctly!')
    else:
        raise Exception('Bifrost is not working correctly!')