簡介
本文記錄一個從 MySQL 同步資料到 ClickHouse 的方案. 在 GitHub 上以 MySQL 和 ClickHouse 為關鍵字搜尋到幾個相關的開源項目, 首先試了下 synch , 結果在安裝依賴包 mysqlclient 時一直失敗, 然後改為使用 Bifrost 這個項目.
Bifrost 目前有接近 700 個 star, 從這兩天的使用來看, Bifrost 的功能比較齊全, 基本能滿足我們的使用場景, 比如全量同步/增量同步/表的批量操作等, 而且經過測試發現速度還是挺快的, 要說不足的話, 主要在于界面的操作不夠簡潔自然, 好在官方文檔比較詳細, 第一次使用不會遇到太多困難.
安裝
具體的安裝和使用可以參考官方文檔, 安裝時推薦使用二進制安裝方式, 第一次安裝後若出現無法打開管理界面的情況, 可能是因為沒有生成 HTTPS 證書, 這一點需要注意, 生成證書的指令如下
啟動
安裝完成後, 進入
bin
目錄, 執行
./Bifrost-server start
啟動項目, 下面是安裝好之後的管理界面
問題彙總
1. 配置完資料同步資訊後, 若資料沒有開始同步, 問題可能是:
- MySQL 的 binlog 沒有打開
- MySQL 的
, 開啟指令為GTID_MODE=OFF
SET GLOBAL ENFORCE_GTID_CONSISTENCY = 'WARN';
SET GLOBAL ENFORCE_GTID_CONSISTENCY = 'ON';
SET GLOBAL GTID_MODE = 'OFF_PERMISSIVE';
SET GLOBAL GTID_MODE = 'ON_PERMISSIVE';
SET GLOBAL GTID_MODE = 'ON';
2. 在重新開機服務的時候, 有可能會遇到配置檔案被覆寫的情況, 具體原因不太清楚, 應對方法是備份配置檔案, 配置檔案的路徑為 ./data/db.Bifrost
./data/db.Bifrost
3. 有時同步位點會落後于 MySQL 的 binlog 位點, 為了保證資料的一緻性, 需要定期檢查同步過程是否正常, 檢查方式為比對目前同步位點和MySQL的最新位點, 檢查代碼如下
注意填寫正确的IP/端口/使用者名/密碼/資料源名稱(代碼裡的MySQL_SOURCE)
import requests
import json
import pandas as pd
import time
def get_header():
url = 'http://ip:21036/dologin'
data = {'UserName': 'xxx', 'Password': 'xxx'}
header = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36'}
ret = requests.post(url, data=json.dumps(data), headers=header)
cookie = 'xgo_cookie=' + ret.cookies.get_dict()['xgo_cookie']
header['Cookie'] = cookie
return header
def get_curr_pos():
header = get_header()
ret = requests.get(
'http://ip:21036/db/index',
headers=header).text
curr_pos = pd.read_html(ret)[0].Position.iloc[0]
curr_pos = ''.join(curr_pos.split(' ')[:3])
return curr_pos
def get_last_pos():
header = get_header()
ret = requests.post(
'http://ip:21036/db/get_last_position',
json={'DbName': 'MySQL_SOURCE'},
headers=header).text
ret = json.loads(ret)
last_pos = f"{ret['data']['BinlogFile']}{ret['data']['BinlogPosition']}"
delay = ret['data']['DelayedTime']
return last_pos, delay
def is_pos_right():
'''檢查 Bifrost 目前同步位點是否等于 MySQL 最新位點'''
curr_pos = get_curr_pos()
last_pos, delay = get_last_pos()
if curr_pos == last_pos:
return True
if delay >= 600:
return False
else:
time.sleep(10)
is_pos_right()
if __name__ == '__main__':
if is_pos_right():
print('Bifrost is working correctly!')
else:
raise Exception('Bifrost is not working correctly!')