mysqlsmom 文檔: https://mysqlsmom.readthedocs.io/en/latest/hello.html github: https://github.com/m358807551/mysqlsmom 環境要求:
1、python2.7
2、redis
3、Mysql 配置 binlog-format=row
安裝
pip install mysqlsmom
全量同步
# 建立全量同步配置檔案
$ mom new test_mom/init_config.py -t init --force
# 編輯配置檔案
$ vim ./test_mom/init_config.py # 按注釋提示修改配置
# 開始同步
$ mom run -c ./test_mom/init_config.py
增量同步
配置三個檔案
test_mom
├── binlog_config.py # 配置檔案
├── my_filters.py # 過濾器 配置于 watched
└── my_handlers.py # 處理器 配置于 pipeline
建立配置
mom new test_mom/binlog_config.py -t binlog --force
1、binlog_config.py
# coding=utf-8
STREAM = "BINLOG" # "BINLOG" or "INIT"
SERVER_ID = 99
SLAVE_UUID = __name__
# 一次同步 BULK_SIZE 條資料到elasticsearch,不設定該配置項預設為1
BULK_SIZE = 1
BINLOG_CONNECTION = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '123456'
}
# redis存儲上次同步位置等資訊
REDIS = {
"host": "127.0.0.1",
"port": 6379,
"db": 0,
# "password": "password", # 不需要密碼則注釋或删掉該行
}
NODES = [{"host": "127.0.0.1", "port": 9200}]
TASKS = [
{
"stream": {
"database": "demo",
"table": "student"
},
"jobs": [{
"actions": ["insert", "update"],
"watched": {
"filter_display": {}
},
"pipeline": [
{"only_fields": {"fields": ["id", "name", "age"]}},
{"change_name": {"key": "name", "prefix": "hot-"}},
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "upsert",
"index": "demo",
"type": "student",
"nodes": NODES
}
}
},
{
"actions": ["delete"],
"pipeline": [
# {"only_fields": {"fields": ["id", "name", "age"]}},
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "delete",
"index": "demo",
"type": "student",
"nodes": NODES
}
}
}
]
}
]
CUSTOM_ROW_HANDLERS = "./my_handlers.py"
CUSTOM_ROW_FILTERS = "./my_filters.py"
自定義處理器 my_handlers.py
# -*- coding: utf-8 -*-
import copy
def change_name(row, key, prefix):
new_row = copy.deepcopy(row)
new_row[key] = "{}{}".format(prefix, row[key])
# 傳回資料字典,下一工序繼續處理
return new_row
自定義過濾器 my_filters.py
# -*- coding: utf-8 -*-
def filter_display(event):
# 傳回True 或 False,使用或丢棄
return event["values"]["display"] == 1
啟動
mom run -c test_mom/binlog_config.py