MySQL與Elasticsearch同步
當業務需要對海量資料進行多元度、實時的搜尋時,關系型資料庫顯然力不從心。一個非常典型的例子就是對産品或者商品進行多元度搜尋。此時,業務經常需要借助搜尋引擎Elasticsearch滿足多樣化的實時搜尋訴求。搜尋開始前,首先需要解決的問題是如何将待搜尋資料從關系型資料庫實時同步到Elaticsearch
方案1:阿裡雲資料傳輸DTS
DTS支援本地IDC自建MySQL/其他雲廠商MySQL/阿裡雲ECS自建MySQL/RDS MySQL->Elasticsearch之間的資料實時同步。通過DTS提供的資料實時同步功能,使用者隻要3步就可搭建起MySQL同Elasticsearch的實時同步執行個體,實作基于MySQL Binlog的毫秒級同步延遲。
方案2:Logstash将MySQL資料同步到ElasticSearch
開源工具成本高:待同步表必須更新時間字段、業務寫入資料必須更新時間
性能影響大:通過SQL讀取資料、對線上業務影響大
同步延遲高:定期資料同步、同步時延高達數小時
穩定性差:無法解決RDS執行個體遷移及、日志回收情況下的同步穩定性
除mysql外,還支援oracle、SQL server、PostgreSQL等資料庫類型
使用logstash-input-jdbc插件讀取mysql的資料,這個插件的工作原理比較簡單,就是定時執行一個sql,然後将sql執行的結果寫入到流中,增量擷取的方式沒有通過 binlog 方式同步,而是用一個遞增字段作為條件去查詢,每次都記錄目前查詢的位置,由于遞增的特性,隻需要查詢比目前大的記錄即可擷取這段時間内的全部增量,一般的遞增字段有兩種,AUTO_INCREMENT 的主鍵 id 和 ON UPDATE CURRENT_TIMESTAMP的update_time字段,id字段隻适用于那種隻有插入沒有更新的表,update_time 更加通用一些,建議在mysql表設計的時候都增加一個update_time字段。
- 安裝MySQL的jdbc驅動
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
unzip mysql-connector-java-5.1.46.zip
- 在mysql庫中建立一個測試表(其中updatetime用于記錄資料更新時間戳):
create table jm_es_employee (
id varchar(10),
first_name varchar(20),
last_name varchar(20),
age int(10),
about varchar(100),
interests varchar(100),
updatetime timestamp null default current_timestamp on update current_timestamp );
建立Logstash配置檔案,檔案名為logstash-mysql-es.conf
input{
jdbc {
jdbc_driver_library => "/opt/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://39.107.84.120:3306/lidehang"
jdbc_user => "root"
jdbc_password => "Ldh_0571@"
jdbc_paging_enabled => "true"
jdbc_page_size => "1000"
jdbc_default_timezone =>"Asia/Shanghai"
#設定監聽間隔 各字段含義(從左至右)分、時、天、月、年,全為*預設含義為每分鐘都更新
schedule => "* * * * *"
statement => "select * from jm_es_employee where updatetime >= :sql_last_value"
#statement_filepath => "mysql.sql"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "updatetime"
last_run_metadata_path => "./logstash_jdbc_last_run"
}
}
output{
elasticsearch {
hosts => "172.17.134.34:9200"
index => "employee"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
jdbc_driver_library: jdbc驅動的路徑,在上一步中已經下載下傳
jdbc_driver_class: 驅動類的名字,mysql填com.mysql.jdbc.Driver
jdbc_connection_string: mysql 位址
jdbc_user: mysql 使用者
jdbc_password: mysql密碼
schedule: 執行sql時機,類似 crontab 的排程,上面配置表示每分鐘重新整理一次。
statement: 要執行的sql,以 “:” 開頭是定義的變量,可以通過parameters 來設定變量,這裡的sql_last_value是内置的變量,表示上一次sql執行中update_time的值
statement_filepath:和上面statement參數二選一,存放需要執行的SQL語句的檔案位置,适用于多個sql語句的場景。
use_column_value: 使用遞增列的值
tracking_column_type: 遞增字段的類型,numeric表示數值類型, timestamp 表示時間戳類型
tracking_column: 遞增字段的名稱,這裡使用updatetime這一列,這列的類型是timestamp
last_run_metadata_path: 同步點檔案,這個檔案記錄了上次的同步點,重新開機時會讀取這個檔案,這個檔案可以手動修改
index: 導入到es中的index名,這裡我直接設定成了mysql表的名字
document_id: 導入到es中的文檔id,這個需要設定成主鍵,否則同一條記錄更新後在es中會出現兩條記錄,%{id} 表示引用mysql表中id字段的值
- 啟動logstash服務,開始同步mysql資料到es:
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-mysql-es.conf
- 寫入幾條測試資料:
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]');
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]');
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]');
執行指令檢查ES服務中的索引是否被建立:
curl -XGET 'http://172.17.134.34:9200/_cat/indices?v'
通過關鍵字檢索ES服務,驗證寫入Mysql的資料是否被成功索引到ES并被檢索到,執行指令通過關鍵字“Smith “來檢索資料:
curl -XGET 'http://172.17.134.34:9200/employee/_search?q=last_name:Smith&pretty'
方案3:使用阿裡雲開源工具Canal
canal是阿裡巴巴旗下的一款開源項目,純Java開發。基于資料庫增量日志解析,提供增量資料訂閱&消費,目前主要支援了MySQL(也支援mariaDB)。
目前的 canal 支援源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
- canal工作原理
canal模拟mysql slave的互動協定,僞裝自己為mysql slave,向mysql master發送dump協定
mysql master收到dump請求,開始推送binary log給slave(也就是canal)
canal解析binary log對象(原始為byte流)

- Canal 的組成部分
簡單來說,Canal 會将自己僞裝成 MySQL 從節點(Slave),并從主節點(Master)擷取 Binlog,解析和貯存後供下遊消費端使用。Canal 包含兩個組成部分:服務端和用戶端。服務端負責連接配接至不同的 MySQL 執行個體,并為每個執行個體維護一個事件消息隊列;用戶端則可以訂閱這些隊列中的資料變更事件,處理并存儲到資料倉庫中。下面我們來看如何快速搭建起一個 Canal 服務。
- 配置 MySQL 主節點
MySQL 預設沒有開啟 Binlog,是以我們需要對 my.cnf 檔案做以下修改:
server-id = 1
log_bin = /path/to/mysql-bin.log
binlog_format = ROW
注意 binlog_format 必須設定為 ROW, 因為在 STATEMENT 或 MIXED 模式下, Binlog 隻會記錄和傳輸 SQL 語句(以減少日志大小),而不包含具體資料,我們也就無法儲存了。
從節點通過一個專門的賬号連接配接主節點,這個賬号需要擁有全局的 REPLICATION 權限。我們可以使用 GRANT 指令建立這樣的賬号:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
- 啟動 Canal 服務端
從 GitHub 項目釋出頁中下載下傳 Canal 服務端代碼(
https://github.com/alibaba/canal/releases)
配置檔案在 conf 檔案夾下,有以下目錄結構:
canal.deployer/conf/canal.properties
canal.deployer/conf/instanceA/instance.properties
canal.deployer/conf/instanceB/instance.properties
conf/canal.properties 是主配置檔案,如其中的 canal.port 用以指定服務端監聽的端口。instanceA/instance.properties 則是各個執行個體的配置檔案,主要的配置項有:
# slaveId 不能與 my.cnf 中的 server-id 項重複
canal.instance.mysql.slaveId = 1234
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.connectionCharset = UTF-8
# 訂閱執行個體中所有的資料庫和表
canal.instance.filter.regex = .*\\..*
- 編寫 Canal 用戶端
從服務端消費變更消息時,我們需要建立一個 Canal 用戶端,指定需要訂閱的資料庫和表,并開啟輪詢。