mysql 作為成熟穩定的資料持久化解決方案,廣泛地應用在各種領域,但是有些時候,我們在做查詢時,由于查詢條件的多樣、變化多端(比如根據時間查、根據名稱模糊查、根據id查等等),或者查詢的資料來自很多不同的庫表或者系統,這時就很難以一個較快的速度(幾百毫秒)直接擷取我們想要的資料,而 elasticsearch 作為資料分析領域的佼佼者,剛好可以彌補這項不足,而我們要做的隻需要将 mysql 中的資料同步到 elasticsearch 中即可,而 logstash 剛好就是一個同步神器,能夠很好的滿足我們的需求。
一:版本說明
logstash:7.6.1
elasticsearch:7.5.2
jdk:1.8
下載下傳對應平台的壓縮包,解壓即可使用。
二:原理
使用logstash自帶的logstash-inpu-jdbc插件,實作從mysql向es同步資料
官方插件使用說明:
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html三:配置
使用 logstash-input-jdbc 插件讀取 mysql 的資料,原理就是定時執行一個 sql,然後将 sql 執行的結果寫入到流中;增量擷取的方式沒有通過 binlog 方式同步,而是用一個遞增字段作為條件去查詢,每次都記錄目前查詢的位置,由于遞增的特性,隻需要查詢比目前大的記錄即可擷取這段時間内的全部增量,一般的遞增字段有兩種,AUTO_INCREMENT 的主鍵 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段隻适用于那種隻有插入沒有更新的表,update_time 更加通用一些,建議在 mysql 表設計的時候都增加一個 update_time 字段.
input {
# 使用logstash-input-jdbc插件
jdbc {
# mysql連接配接驅動包
jdbc_driver_library => "/data/www/logstash-7.6.1/mysql-connector-java-5.1.44.jar"
# mysql連接配接驅動類
jdbc_driver_class => "com.mysql.jdbc.Driver"
# mysql jdbc連接配接串
jdbc_connection_string => "jdbc:mysql://xxxxxxxxxxx"
# 設定時區
jdbc_default_timezone => "Asia/Shanghai"
# 設定連接配接資料庫的使用者名和密碼
jdbc_user => "xxx"
jdbc_password => "xxxxx"
# 開啟分頁
jdbc_paging_enabled => "true"
# 開啟分頁後,每頁資料條數大小
jdbc_page_size => "1000"
# 使用本地時區
plugin_timezone => "local"
# 開啟大小寫敏感
lowercase_column_names => "false"
# 同時時間間隔,分别對應”分 時 月 周 年“ 後面可以設定時區,也可不設,如果注釋schedule,則隻運作一次同步,程序及停止
schedule => "*/5 * * * * Asia/Shanghai"
# 讀取資料庫使用的SQL檔案,其中有一個需要特别說明的關鍵字:sql_last_value,此值是讀取last_run_metadata_path檔案内記錄的值
statement_filepath => "/data/www/logstash-7.6.1/conf/xxxxxxx.sql"
# 記錄每次同步tracking_column_type追蹤的值
last_run_metadata_path => "/data/www/logstash-7.6.1/conf/xxxx_modify_time"
# 開啟記錄最後同步的值
record_last_run => "true"
# 設定false為記錄時間戳,true為其他字段的值
use_column_value => "false"
# 追蹤字段的類型,可選timestamp、numeric
tracking_column_type => timestamp
# 具體跟蹤的同步表字段
tracking_column => "modify_time"
# 是否清除 last_run_metadata_path 的記錄,如果為真那麼每次都相當于從頭開始查詢所有的資料庫記錄
clean_run => "false"
}
}
#filter部分對資料進行處理,此處隻對@timestamp字段處理,設定為本地時間
filter {
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
}
output {
#向es同步資料的相關配置
elasticsearch {
hosts => ["127.0.0.1:9200","127.0.0.1:9300"]
#将資料中id字段設定為es中索引的_id,不使用es自動生成的随機_id,提供寫入速度
document_id => "%{id}"
#定義es中索引名稱,如es沒有建立或不存在,會自動建立,自動建立的索引可能會不滿足要求,建議使用自行建立的索引
index => "xxxx_index"
document_type => "_doc"
}
# 以下配置為debug,可在測試中開啟,友善定位問題,生産或正式使用務必關閉,以免産生大量日志
stdout {
codec => rubydebug
}
}
- jdbc_driver_library
mysql-connector-java-5.1.44.jar的存放目錄,這個一定要配置正确,支援全路徑和相對路徑。
- jdbc_driver_class
驅動類的名字,mysql 填 com.mysql.jdbc.Driver 就好了
- sql_last_value
标志目前logstash同步的位置資訊(類似offset)。比如id、updatetime。logstash通過這個标志,可以判斷目前同步到哪一條資料。
- statement
執行同步的sql語句,可以同步部分資料;
- statement_filepath
存儲執行同步的sql語句,不和statement同時使用;
- schedule
定時器,表示每隔多長時間同步一次資料,格式類似crontab;
- tracking_column
表示表中哪一列用于判斷logstash同步的位置資訊,與sql_last_value比較判斷是否需要同步這條資料;
- tracking_column_type
racking_column指定列的類型,支援兩種類型:numeric(預設)、timestamp;
- last_run_metadata_path
存儲sql_last_value值的檔案名稱及位置。
- document_id
生成elasticsearch的文檔值,盡量使用同步的資料中已有的唯一辨別。
注意⚠️
1、同步單張表可以使用:logstash -f /path/to/xxxx.conf
2、同步多張表時,可通過pipeline管道的方式
3、當表資料超過百萬級别時,建議分批同步,可按照每100w條資料同步一次來進行,以加快同步速度,具體可使用表id的區間來确定同步數量,則在SQL中增加id區間值。
多表同步的pipeline.yml配置如下:
- pipeline.id: xxxx1
pipeline.workers: 1
path.config: /data/www/logstash-7.6.1/member_conf/xxxx1.conf
- pipeline.id: xxxx2
pipeline.workers: 1
path.config: /data/www/logstash-7.6.1/member_conf/xxxx2.conf
- pipeline.id: xxxx3
pipeline.workers: 1
path.config: /data/www/logstash-7.6.1/member_conf/xxxx3.conf
- pipeline.id: xxxx4
pipeline.workers: 1
path.config: /data/www/logstash-7.6.1/member_conf/xxxx4.conf
四:啟動
單表啟動,指定conf配置檔案的位置
bin/logstash -f config/logstash-mysql-es.conf > llogstash.log 2>&1 &
當使用多表同步時,啟動時,不需要指定具體的conf配置檔案,即:
bin/logstash > logstash.log 2>&1 &