天天看點

MySQL多表關聯同步到ES的實踐

作者:閃念基因

背景

線上問題:業務系統查詢,涉及多表關聯查詢,條件次元較大且有模糊比對需求,索引無法覆寫,導緻查詢性能較低。

解決方向:引入搜尋引擎,将資料實時同步到ES,提升查詢性能。

具體分析:如果是單表同步到ES,然後在ES進行聯合查詢,這樣不但性能有所損耗,而且增加了查詢的複雜度。直接多表關聯,将資料拉平後同步到ES,這樣在ES查詢的性能最高,同時對現有系統改造成本較低。

落地方案:

全量離線同步使用DataX,增量同步使用Canal。

MySQL多表關聯同步到ES的實踐

方案揭秘

DataX

DataX的工作原理

MySQL多表關聯同步到ES的實踐

DataX本身作為離線資料同步架構,采用Framework + plugin架構建構。将資料源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步架構中。

  • Reader:Reader為資料采集子產品,負責采集資料源的資料,将資料發送給Framework。
  • Writer:Writer為資料寫入子產品,負責不斷向Framework取資料,并将資料寫入到目的端。
  • Framework:Framework用于連接配接reader和writer,作為兩者的資料傳輸通道,并處理緩沖、流控、并發和資料轉換等核心技術問題。

目前已經支援的插件

在我們的實際案例中,我們使用mysql的Reader插件和ES的Writer插件,進行一系列的配置,通過Datax的FrameWork進行資料傳輸,轉換,實作資料同步。

DataX具體的細節,官方講解較詳細,以下網址可以檢視官方介紹。

https://github.com/alibaba/DataX/blob/master/introduction.md

Canal

Canal的工作原理

描述Canal工作原理前,先回顧下Mysql的主備複制原理:

MySQL主備複制原理

MySQL多表關聯同步到ES的實踐
  • MySQL master 将資料變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行檢視)。
  • MySQL slave 将 master 的 binary log events 拷貝到它的中繼日志(relay log)。
  • MySQL slave 重放 relay log 中事件,将資料變更反映它自己的資料。

Canal工作原理

  • Canal 模拟 MySQL slave 的互動協定,僞裝自己為 MySQL slave ,向 MySQL master 發送dump協定。
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即canal)。
  • Canal 解析 binary log 對象(原始為 byte 流)。
MySQL多表關聯同步到ES的實踐

方案落地

以下描述的實踐過程,主要介紹mysql->es多表關聯同步的核心過程 (以使用者表,權限表,使用者權限表的場景模拟)。

測試環境:

jdk1.8、python 2.7.1、ES6.3.2

DataX

全量同步

DataX的下載下傳

官方提供了兩種方式:

  • 直接下載下傳打好的包,下載下傳後直接解壓到自己本地的某個目錄。
  • 這種方式目前存在問題,裡面沒有es的插件,需要自己将es的插件進行打包。然後将es的插件安裝到DataX中。
  • a. 打開源碼,将elasticsearchwriter子產品進行編譯,編譯後的目錄:
MySQL多表關聯同步到ES的實踐

b. 打開插件目錄

{data_home}/plugin/writer

c. 将elasticsearchwriter複制到 datax中

MySQL多表關聯同步到ES的實踐

備注:data_home:DataX本地安裝目錄

  • 下載下傳源碼,本地編譯,打包。
  • 下載下傳位址:https://github.com/alibaba/DataX

準備job檔案

DataX準備好之後,開始準備需要執行的job檔案,配置将mysql中的資料同步到es的規則。

打開job目錄,編輯job檔案,格式為json檔案。編輯檔案中reader和writer屬性部分。

cd {datax_home}/job/

如案例所示:

reader部分:配置插件為mysqlreader,MysqlReader通過JDBC連接配接器連接配接到遠端的Mysql資料庫,并根據使用者配置的資訊生成查詢SELECT SQL語句,然後發送到遠端Mysql資料庫,并将該SQL執行傳回結果使用DataX自定義的資料類型拼裝為抽象的資料集,并傳遞給下遊Writer處理。

writer部分:使用elasticsearch的rest api接口, 批量把從reader讀入的資料寫入elasticsearch。

配置中需要注意:reader中querySql中查詢的字段和writer中column中的字段必須一一對應,順序不能錯。

datax-user-job.json

{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxx",
                        "password": "xxx",
                        "connection": [
                            {
                                "querySql": [
                                    "select u.user_id as _id,ur.role_id as role_id,r.id as r_id,u.username as username,u.real_name as real_name,r.name as role_name from sys_user u left join sys_user_role ur on u.user_id = ur.user_id left join sys_role r on ur.role_id = r.id"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                     "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://es-v.elasticsearch.aliyuncs.com:9200",
            "accessId": "elastic",
            "accessKey": "xxxx",
            "index": "kefu_user",
            "type": "user_role",
            "cleanup": true,
            "settings": {"index" :{"number_of_shards": 3, "number_of_replicas": 1}},
            "discovery": false,
            "batchSize": 1000,
            "splitter": ",",
            "column": [
              {"name": "_id", "type": "id"},
              {"name": "r_id", "type": "long"},
              {"name": "role_id", "type": "long"},
              { "name": "username","type": "keyword" },
              { "name": "real_name","type": "keyword" },
              { "name": "role_name","type": "keyword" }
            ]
                    }
                }
            }
        ]
    }
}           

mysqlreader配置詳細介紹

https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

elasticsearchriter配置詳細介紹

https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md

執行job

執行job,進行全量同步。

  1. 打開bin目錄
  2. cd {datax_home}/bin
  3. 執行
  4. python datax.py /tools/datax/job/datax-user-job.json

結果

MySQL多表關聯同步到ES的實踐

配置較簡單,通過這種方式,可以将離線資料從mysql全量同步到es中。

Canal

增量同步

Canal server

安裝

直接下載下傳對應的壓縮包,deployer和adapter,然後解壓即可。案例中使用的是V1.1.4,可以點選檢視Canal各個版本。

https://github.com/alibaba/canal/releases

MySQL多表關聯同步到ES的實踐

提示:

V1.1.2版本官方才支援的ES 擴充卡

目前官方隻支援ES6和ES7的同步,如果需要支援ES5,需要修改源碼自己打包。

配置

備注 {canal_deployer_home}指的是canal delpoyer安裝目錄。

修改配置檔案:

vi {canal_deployer_home}/conf/example/instance.properties

主要修改下列參數,配置mysql的連接配接資訊。

# position info
canal.instance.master.address=localhost:3306
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
canal.instance.connectionCharset = UTF-8           

啟動

打開canal 安裝目錄。

cd {canal_deployer_home}/bin

sh startup.sh

檢視日志

tail -f {canal_deployer_home}/logs/canal/canal.log

MySQL多表關聯同步到ES的實踐

tail -f {canal_deployer_home}/logs/example/example.log

MySQL多表關聯同步到ES的實踐

通過日志可以看到,Canal服務端啟動成功,而且要保持啟動狀态,不然後續的adaper啟動會報錯。

CanalAdapter

配置

擴充卡配置分兩部分,一部分是總的基礎配置,另一部分是同步ES的配置。

備注:{canal_adapter_home} adapter安裝目錄

Adapter基礎配置

打開配置檔案目錄,編輯配置檔案。

cd {canal_adapter_home}/conf/

MySQL多表關聯同步到ES的實踐

主要配置源資料庫和擴充卡執行個體資訊,具體如案例所示:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null


canal.conf:
  canalServerHost: 127.0.0.1:11111  # 對應單機模式下的canal server的ip:port
  batchSize: 500                     # 每次擷取資料的批大小, 機關為K
  syncBatchSize: 1000                # 每次同步的批數量
  retries: 0
  timeout:
  mode: tcp # kafka rocketMQ        # canal client的模式: tcp kafka rocketMQ
  srcDataSources:                   # 源資料庫
    defaultDS:                      # 自定義名稱
      url: jdbc:mysql://localhost:3306/test?useUnicode=true  # jdbc url 
      username: xxx
      password: xxx
  canalAdapters:
  - instance: example 
    groups:
    - groupId: g1
      outerAdapters:
      - 
        key: exampleKey                    # canal 執行個體名或者 MQ topic 名
        name: es                           # or es7
        hosts: es-cn-v.elasticsearch.aliyuncs.com:9200      # es 叢集位址, 逗号分隔
        properties:
          mode: rest  # or rest         # 可指定transport模式或者rest模式
          security.auth: xxx:aaaaaa      # only used for rest mode
          cluster.name: elasticsearch       # es cluster name           

說明:

  1. 一份資料可以被多個group同時消費, 多個group之間會是一個并行執行, 一個group内部是一個串行執行多個outerAdapters,案例中隻有一個。
  2. 目前client adapter資料訂閱的方式支援兩種,直連canal server或者訂閱kafka/RocketMQ的消息。案例中是直連canal server。

ES同步sql配置

擴充卡将會自動加載conf/es下的所有.yml結尾的配置檔案,在目錄下建立mytest_user.yml檔案。

cd {canal_adapter_home}/conf/es/

MySQL多表關聯同步到ES的實踐

編輯mytest_user.yml檔案。

dataSourceKey: defaultDS        # 源資料源的key, 對應上面配置的srcDataSources中的值
outerAdapterKey: exampleKey     # 對應application.yml中es配置的key 
destination: example            # cannal的instance或者MQ的topic
groupId:                        # 對應MQ模式下的groupId, 隻會同步對應groupId的資料
esMapping:
  _index: xx_user           # es 的索引名稱
  _type: user_role                   # es 的type名稱, es7下無需配置此項
  _id: _id
  upsert: true                      # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動配置設定
  # pk: _id                       # 如果不需要_id, 則需要指定一個屬性為主鍵屬性
  # sql映射
  sql: "select u.user_id as _id,ur.role_id as role_id,r.id as r_id,u.username as username,u.real_name as real_name,r.name as role_name from sys_user u left join sys_user_role ur on u.user_id = ur.user_id left join sys_role r on ur.role_id = r.id"
#  objFields:
#    _labels: array:;           # 數組或者對象屬性, array:; 代表以;字段裡面是以;分隔的
#    _obj: object               # json對象
  # etlCondition: "where a.c_time>='{0}'"     # etl 的條件參數
  commitBatch: 3000           

sql映射說明

sql支援多表關聯自由組合, 但是有一定的限制:

  1. 主表不能為子查詢語句。
  2. 隻能使用left outer join即最左表一定要是主表。
  3. 關聯從表如果是子查詢不能有多張表。
  4. 主sql中不能有where查詢條件(從表子查詢中可以有where條件但是不推薦, 可能會造成資料同步的不一緻, 比如修改了where條件中的字段内容)。
  5. 關聯條件隻允許主外鍵的'='操作不能出現其他常量判斷比如: on a.role_id=b.id and b.statues=1。
  6. 關聯條件必須要有一個字段出現在主查詢語句中比如: on a.role_id=b.id其中的a.role_id 或者b.id必須出現在主select語句中。
  7. Elastic Search的mapping 屬性與sql的查詢值要一一對應(不支援 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 這裡以别名(如果有别名)作為最終的映射字段. 這裡的_id可以填寫到配置檔案的 _id: _id映射。

常見問題

在實際測試階段,還是遇到了一些問題,如果要應用到生産上,需要對源碼做一定優化。

1. 多表關聯部分情況不同步

舉例:

  • 使用者表
  • 使用者角色表
  • 角色表

三表關聯查詢同步到ES

  • 修改使用者表,關聯資料修改(使用者表)
  • 修改角色表,關聯資料會修改(角色表)。
  • 修改使用者角色表,關聯的資料資訊不會更新。

2. 多表關聯同步性能問題

看源碼,直接将多表關聯的sql拆分,将條件前的部分直接包裹,進行全表掃描:

MySQL多表關聯同步到ES的實踐

3. DataX全量

  1. date類型,mysql中的字段值為null,同步到es,會指派為目前時間。
  2. 其他類型字段為null時,同步到es會不存在這個字段。

4. Canal Deployer資料源配置

很多文章中的anal.instance.master.address資料庫配置都是這種格式:jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true,這樣配置啟動會報錯。

通過檢視源碼,可以發現具體原因,代碼中針對連接配接配置是根據“:”分割,擷取的位址和端口。

package com.alibaba.otter.canal.instance.spring.support;


import java.beans.PropertyEditorSupport;
import java.net.InetSocketAddress;


import org.apache.commons.lang.StringUtils;
import org.springframework.beans.PropertyEditorRegistrar;
import org.springframework.beans.PropertyEditorRegistry;


public class SocketAddressEditor extends PropertyEditorSupport implements PropertyEditorRegistrar {


    public void registerCustomEditors(PropertyEditorRegistry registry) {
        registry.registerCustomEditor(InetSocketAddress.class, this);
    }


    public void setAsText(String text) throws IllegalArgumentException {
        String[] addresses = StringUtils.split(text, ":");
        if (addresses.length > 0) {
            if (addresses.length != 2) {
                throw new RuntimeException("address[" + text + "] is illegal, eg.127.0.0.1:3306");
            } else {
                setValue(new InetSocketAddress(addresses[0], Integer.valueOf(addresses[1])));
            }
        } else {
            setValue(null);
        }
    }
}           

附Canal的全量同步功能ETL

檢視源碼中發現,Canal實際也是支援ES 的全量同步,進行測試了下,性能要比Datax差一些。

如果有興趣使用,還是需要注意一些問題。

  • adapter伺服器請求該位址,參數多個,用";"隔開。
  • curl http://127.0.0.1:8081/etl/es/exampleKey/ticket.yml\?params\="2019-06-01;2019-07-15" -X POST
MySQL多表關聯同步到ES的實踐
  • 注意請求位址中,參數key的指派。
MySQL多表關聯同步到ES的實踐
  • 查詢條件

如果按時間段分批同步,時間格式需配置這種格式{},也可以通過 where b.created_at BETWEEN {} AND {}。

github中案例描述有瑕疵:

MySQL多表關聯同步到ES的實踐

具體原因見源碼部分,對條件的解析是替換{},然後順序指派。

MySQL多表關聯同步到ES的實踐

如果按照官網描述傳值,會提示異常:

{"succeeded":false,"errorMessage":"ES 資料導入異常 =>java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0)."}           

相關開源産品

Canal:

https://github.com/alibaba/canal

CanalAdapter:

https://github.com/alibaba/canal/wiki/ClientAdapter

Sync-ES:

https://github.com/alibaba/canal/wiki/Sync-ES

DataX文檔:https://github.com/alibaba/DataX/blob/master/introduction.md

作者:小強

來源:微信公衆号:得物技術

出處:https://mp.weixin.qq.com/s/7Zzrr4J9w9ZYLomUwE0kwA

繼續閱讀