天天看點

Python:使用 mysqlsmom 子產品實時同步MySQL資料到ElasticSearch全量同步增量同步

mysqlsmom 文檔: https://mysqlsmom.readthedocs.io/en/latest/hello.html github: https://github.com/m358807551/mysqlsmom 環境要求:

1、python2.7
2、redis
3、Mysql 配置 binlog-format=row      

安裝

pip install mysqlsmom      

全量同步

# 建立全量同步配置檔案

$ mom new test_mom/init_config.py -t init --force

# 編輯配置檔案
$ vim ./test_mom/init_config.py  # 按注釋提示修改配置

# 開始同步
$ mom run -c ./test_mom/init_config.py      

增量同步

配置三個檔案

test_mom
├── binlog_config.py   # 配置檔案
├── my_filters.py         # 過濾器 配置于 watched 
└── my_handlers.py    # 處理器 配置于 pipeline      

建立配置

mom new test_mom/binlog_config.py -t binlog --force      

1、binlog_config.py

# coding=utf-8

STREAM = "BINLOG"  # "BINLOG" or "INIT"
SERVER_ID = 99
SLAVE_UUID = __name__

# 一次同步 BULK_SIZE 條資料到elasticsearch,不設定該配置項預設為1
BULK_SIZE = 1

BINLOG_CONNECTION = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': '123456'
}

# redis存儲上次同步位置等資訊
REDIS = {
    "host": "127.0.0.1",
    "port": 6379,
    "db": 0,
    # "password": "password",  # 不需要密碼則注釋或删掉該行
}

NODES = [{"host": "127.0.0.1", "port": 9200}]

TASKS = [
    {
        "stream": {
            "database": "demo",
            "table": "student"
        },
        "jobs": [{
            "actions": ["insert", "update"],
            "watched": {
                "filter_display": {}
            },
            "pipeline": [
                {"only_fields": {"fields": ["id", "name", "age"]}},
                {"change_name": {"key": "name", "prefix": "hot-"}},
                {"set_id": {"field": "id"}}
            ],
            "dest": {
                "es": {
                    "action": "upsert",
                    "index": "demo",
                    "type": "student",
                    "nodes": NODES
                }
            }
        },
            {
                "actions": ["delete"],
                "pipeline": [
                    # {"only_fields": {"fields": ["id", "name", "age"]}},
                    {"set_id": {"field": "id"}}
                ],
                "dest": {
                    "es": {
                        "action": "delete",
                        "index": "demo",
                        "type": "student",
                        "nodes": NODES
                    }
                }
            }
        ]
    }
]

CUSTOM_ROW_HANDLERS = "./my_handlers.py"
CUSTOM_ROW_FILTERS = "./my_filters.py"
      

自定義處理器 my_handlers.py

# -*- coding: utf-8 -*-


import copy

def change_name(row, key, prefix):
    new_row = copy.deepcopy(row)
    new_row[key] = "{}{}".format(prefix, row[key])
    # 傳回資料字典,下一工序繼續處理
    return new_row
      

自定義過濾器 my_filters.py

# -*- coding: utf-8 -*-

def filter_display(event):
    # 傳回True 或 False,使用或丢棄
    return event["values"]["display"] == 1
      

啟動

mom run -c test_mom/binlog_config.py