背景
線上問題:業務系統查詢,涉及多表關聯查詢,條件次元較大且有模糊比對需求,索引無法覆寫,導緻查詢性能較低。
解決方向:引入搜尋引擎,将資料實時同步到ES,提升查詢性能。
具體分析:如果是單表同步到ES,然後在ES進行聯合查詢,這樣不但性能有所損耗,而且增加了查詢的複雜度。直接多表關聯,将資料拉平後同步到ES,這樣在ES查詢的性能最高,同時對現有系統改造成本較低。
落地方案:
全量離線同步使用DataX,增量同步使用Canal。
方案揭秘
DataX
DataX的工作原理
DataX本身作為離線資料同步架構,采用Framework + plugin架構建構。将資料源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步架構中。
- Reader:Reader為資料采集子產品,負責采集資料源的資料,将資料發送給Framework。
- Writer:Writer為資料寫入子產品,負責不斷向Framework取資料,并将資料寫入到目的端。
- Framework:Framework用于連接配接reader和writer,作為兩者的資料傳輸通道,并處理緩沖、流控、并發和資料轉換等核心技術問題。
目前已經支援的插件
在我們的實際案例中,我們使用mysql的Reader插件和ES的Writer插件,進行一系列的配置,通過Datax的FrameWork進行資料傳輸,轉換,實作資料同步。
DataX具體的細節,官方講解較詳細,以下網址可以檢視官方介紹。
https://github.com/alibaba/DataX/blob/master/introduction.md
Canal
Canal的工作原理
描述Canal工作原理前,先回顧下Mysql的主備複制原理:
MySQL主備複制原理
- MySQL master 将資料變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行檢視)。
- MySQL slave 将 master 的 binary log events 拷貝到它的中繼日志(relay log)。
- MySQL slave 重放 relay log 中事件,将資料變更反映它自己的資料。
Canal工作原理
- Canal 模拟 MySQL slave 的互動協定,僞裝自己為 MySQL slave ,向 MySQL master 發送dump協定。
- MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即canal)。
- Canal 解析 binary log 對象(原始為 byte 流)。
方案落地
以下描述的實踐過程,主要介紹mysql->es多表關聯同步的核心過程 (以使用者表,權限表,使用者權限表的場景模拟)。
測試環境:
jdk1.8、python 2.7.1、ES6.3.2
DataX
全量同步
DataX的下載下傳
官方提供了兩種方式:
- 直接下載下傳打好的包,下載下傳後直接解壓到自己本地的某個目錄。
- 這種方式目前存在問題,裡面沒有es的插件,需要自己将es的插件進行打包。然後将es的插件安裝到DataX中。
- a. 打開源碼,将elasticsearchwriter子產品進行編譯,編譯後的目錄:
b. 打開插件目錄
{data_home}/plugin/writer
c. 将elasticsearchwriter複制到 datax中
備注:data_home:DataX本地安裝目錄
- 下載下傳源碼,本地編譯,打包。
- 下載下傳位址:https://github.com/alibaba/DataX
準備job檔案
DataX準備好之後,開始準備需要執行的job檔案,配置将mysql中的資料同步到es的規則。
打開job目錄,編輯job檔案,格式為json檔案。編輯檔案中reader和writer屬性部分。
cd {datax_home}/job/
如案例所示:
reader部分:配置插件為mysqlreader,MysqlReader通過JDBC連接配接器連接配接到遠端的Mysql資料庫,并根據使用者配置的資訊生成查詢SELECT SQL語句,然後發送到遠端Mysql資料庫,并将該SQL執行傳回結果使用DataX自定義的資料類型拼裝為抽象的資料集,并傳遞給下遊Writer處理。
writer部分:使用elasticsearch的rest api接口, 批量把從reader讀入的資料寫入elasticsearch。
配置中需要注意:reader中querySql中查詢的字段和writer中column中的字段必須一一對應,順序不能錯。
datax-user-job.json
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxx",
"password": "xxx",
"connection": [
{
"querySql": [
"select u.user_id as _id,ur.role_id as role_id,r.id as r_id,u.username as username,u.real_name as real_name,r.name as role_name from sys_user u left join sys_user_role ur on u.user_id = ur.user_id left join sys_role r on ur.role_id = r.id"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8"
]
}
]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://es-v.elasticsearch.aliyuncs.com:9200",
"accessId": "elastic",
"accessKey": "xxxx",
"index": "kefu_user",
"type": "user_role",
"cleanup": true,
"settings": {"index" :{"number_of_shards": 3, "number_of_replicas": 1}},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{"name": "_id", "type": "id"},
{"name": "r_id", "type": "long"},
{"name": "role_id", "type": "long"},
{ "name": "username","type": "keyword" },
{ "name": "real_name","type": "keyword" },
{ "name": "role_name","type": "keyword" }
]
}
}
}
]
}
}
mysqlreader配置詳細介紹
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
elasticsearchriter配置詳細介紹
https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md
執行job
執行job,進行全量同步。
- 打開bin目錄
- cd {datax_home}/bin
- 執行
- python datax.py /tools/datax/job/datax-user-job.json
結果
配置較簡單,通過這種方式,可以将離線資料從mysql全量同步到es中。
Canal
增量同步
Canal server
安裝
直接下載下傳對應的壓縮包,deployer和adapter,然後解壓即可。案例中使用的是V1.1.4,可以點選檢視Canal各個版本。
https://github.com/alibaba/canal/releases
提示:
V1.1.2版本官方才支援的ES 擴充卡
目前官方隻支援ES6和ES7的同步,如果需要支援ES5,需要修改源碼自己打包。
配置
備注 {canal_deployer_home}指的是canal delpoyer安裝目錄。
修改配置檔案:
vi {canal_deployer_home}/conf/example/instance.properties
主要修改下列參數,配置mysql的連接配接資訊。
# position info
canal.instance.master.address=localhost:3306
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
canal.instance.connectionCharset = UTF-8
啟動
打開canal 安裝目錄。
cd {canal_deployer_home}/bin
sh startup.sh
檢視日志
tail -f {canal_deployer_home}/logs/canal/canal.log
tail -f {canal_deployer_home}/logs/example/example.log
通過日志可以看到,Canal服務端啟動成功,而且要保持啟動狀态,不然後續的adaper啟動會報錯。
CanalAdapter
配置
擴充卡配置分兩部分,一部分是總的基礎配置,另一部分是同步ES的配置。
備注:{canal_adapter_home} adapter安裝目錄
Adapter基礎配置
打開配置檔案目錄,編輯配置檔案。
cd {canal_adapter_home}/conf/
主要配置源資料庫和擴充卡執行個體資訊,具體如案例所示:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
canalServerHost: 127.0.0.1:11111 # 對應單機模式下的canal server的ip:port
batchSize: 500 # 每次擷取資料的批大小, 機關為K
syncBatchSize: 1000 # 每次同步的批數量
retries: 0
timeout:
mode: tcp # kafka rocketMQ # canal client的模式: tcp kafka rocketMQ
srcDataSources: # 源資料庫
defaultDS: # 自定義名稱
url: jdbc:mysql://localhost:3306/test?useUnicode=true # jdbc url
username: xxx
password: xxx
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
-
key: exampleKey # canal 執行個體名或者 MQ topic 名
name: es # or es7
hosts: es-cn-v.elasticsearch.aliyuncs.com:9200 # es 叢集位址, 逗号分隔
properties:
mode: rest # or rest # 可指定transport模式或者rest模式
security.auth: xxx:aaaaaa # only used for rest mode
cluster.name: elasticsearch # es cluster name
說明:
- 一份資料可以被多個group同時消費, 多個group之間會是一個并行執行, 一個group内部是一個串行執行多個outerAdapters,案例中隻有一個。
- 目前client adapter資料訂閱的方式支援兩種,直連canal server或者訂閱kafka/RocketMQ的消息。案例中是直連canal server。
ES同步sql配置
擴充卡将會自動加載conf/es下的所有.yml結尾的配置檔案,在目錄下建立mytest_user.yml檔案。
cd {canal_adapter_home}/conf/es/
編輯mytest_user.yml檔案。
dataSourceKey: defaultDS # 源資料源的key, 對應上面配置的srcDataSources中的值
outerAdapterKey: exampleKey # 對應application.yml中es配置的key
destination: example # cannal的instance或者MQ的topic
groupId: # 對應MQ模式下的groupId, 隻會同步對應groupId的資料
esMapping:
_index: xx_user # es 的索引名稱
_type: user_role # es 的type名稱, es7下無需配置此項
_id: _id
upsert: true # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動配置設定
# pk: _id # 如果不需要_id, 則需要指定一個屬性為主鍵屬性
# sql映射
sql: "select u.user_id as _id,ur.role_id as role_id,r.id as r_id,u.username as username,u.real_name as real_name,r.name as role_name from sys_user u left join sys_user_role ur on u.user_id = ur.user_id left join sys_role r on ur.role_id = r.id"
# objFields:
# _labels: array:; # 數組或者對象屬性, array:; 代表以;字段裡面是以;分隔的
# _obj: object # json對象
# etlCondition: "where a.c_time>='{0}'" # etl 的條件參數
commitBatch: 3000
sql映射說明
sql支援多表關聯自由組合, 但是有一定的限制:
- 主表不能為子查詢語句。
- 隻能使用left outer join即最左表一定要是主表。
- 關聯從表如果是子查詢不能有多張表。
- 主sql中不能有where查詢條件(從表子查詢中可以有where條件但是不推薦, 可能會造成資料同步的不一緻, 比如修改了where條件中的字段内容)。
- 關聯條件隻允許主外鍵的'='操作不能出現其他常量判斷比如: on a.role_id=b.id and b.statues=1。
- 關聯條件必須要有一個字段出現在主查詢語句中比如: on a.role_id=b.id其中的a.role_id 或者b.id必須出現在主select語句中。
- Elastic Search的mapping 屬性與sql的查詢值要一一對應(不支援 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 這裡以别名(如果有别名)作為最終的映射字段. 這裡的_id可以填寫到配置檔案的 _id: _id映射。
常見問題
在實際測試階段,還是遇到了一些問題,如果要應用到生産上,需要對源碼做一定優化。
1. 多表關聯部分情況不同步
舉例:
- 使用者表
- 使用者角色表
- 角色表
三表關聯查詢同步到ES
- 修改使用者表,關聯資料修改(使用者表)
- 修改角色表,關聯資料會修改(角色表)。
- 修改使用者角色表,關聯的資料資訊不會更新。
2. 多表關聯同步性能問題
看源碼,直接将多表關聯的sql拆分,将條件前的部分直接包裹,進行全表掃描:
3. DataX全量
- date類型,mysql中的字段值為null,同步到es,會指派為目前時間。
- 其他類型字段為null時,同步到es會不存在這個字段。
4. Canal Deployer資料源配置
很多文章中的anal.instance.master.address資料庫配置都是這種格式:jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true,這樣配置啟動會報錯。
通過檢視源碼,可以發現具體原因,代碼中針對連接配接配置是根據“:”分割,擷取的位址和端口。
package com.alibaba.otter.canal.instance.spring.support;
import java.beans.PropertyEditorSupport;
import java.net.InetSocketAddress;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.PropertyEditorRegistrar;
import org.springframework.beans.PropertyEditorRegistry;
public class SocketAddressEditor extends PropertyEditorSupport implements PropertyEditorRegistrar {
public void registerCustomEditors(PropertyEditorRegistry registry) {
registry.registerCustomEditor(InetSocketAddress.class, this);
}
public void setAsText(String text) throws IllegalArgumentException {
String[] addresses = StringUtils.split(text, ":");
if (addresses.length > 0) {
if (addresses.length != 2) {
throw new RuntimeException("address[" + text + "] is illegal, eg.127.0.0.1:3306");
} else {
setValue(new InetSocketAddress(addresses[0], Integer.valueOf(addresses[1])));
}
} else {
setValue(null);
}
}
}
附Canal的全量同步功能ETL
檢視源碼中發現,Canal實際也是支援ES 的全量同步,進行測試了下,性能要比Datax差一些。
如果有興趣使用,還是需要注意一些問題。
- adapter伺服器請求該位址,參數多個,用";"隔開。
- curl http://127.0.0.1:8081/etl/es/exampleKey/ticket.yml\?params\="2019-06-01;2019-07-15" -X POST
- 注意請求位址中,參數key的指派。
- 查詢條件
如果按時間段分批同步,時間格式需配置這種格式{},也可以通過 where b.created_at BETWEEN {} AND {}。
github中案例描述有瑕疵:
具體原因見源碼部分,對條件的解析是替換{},然後順序指派。
如果按照官網描述傳值,會提示異常:
{"succeeded":false,"errorMessage":"ES 資料導入異常 =>java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0)."}
相關開源産品
Canal:
https://github.com/alibaba/canal
CanalAdapter:
https://github.com/alibaba/canal/wiki/ClientAdapter
Sync-ES:
https://github.com/alibaba/canal/wiki/Sync-ES
DataX文檔:https://github.com/alibaba/DataX/blob/master/introduction.md
作者:小強
來源:微信公衆号:得物技術
出處:https://mp.weixin.qq.com/s/7Zzrr4J9w9ZYLomUwE0kwA