mysql 作為成熟穩定的資料持久化解決方案,廣泛地應用在各種領域,但是在資料分析方面稍有不足,而 elasticsearch 作為資料分析領域的佼佼者,剛好可以彌補這項不足,而我們要做的隻需要将 mysql 中的資料同步到 elasticsearch 中即可,而 logstash 剛好就可以支援,所有你需要做的隻是寫一個配置檔案而已
logstash 擷取
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.3.zip
unzip logstash-6.2.3.zip && cd logstash-6.2.3
安裝 jdbc 和 elasticsearch 插件
bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-output-elasticsearch
擷取 jdbc mysql 驅動
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
unzip mysql-connector-java-5.1.46.zip
編寫配置檔案
logstash-input-jdbc
使用 logstash-input-jdbc 插件讀取 mysql 的資料,這個插件的工作原理比較簡單,就是定時執行一個 sql,然後将 sql 執行的結果寫入到流中,增量擷取的方式沒有通過 binlog 方式同步,而是用一個遞增字段作為條件去查詢,每次都記錄目前查詢的位置,由于遞增的特性,隻需要查詢比目前大的記錄即可擷取這段時間内的全部增量,一般的遞增字段有兩種,AUTO_INCREMENT 的主鍵 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段隻适用于那種隻有插入沒有更新的表,update_time 更加通用一些,建議在 mysql 表設計的時候都增加一個 update_time 字段
input {
jdbc {
jdbc_driver_library => "../mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<mysql_host>:3306/rta"
jdbc_user => "<username>"
jdbc_password => "<password>"
schedule => "* * * * *"
statement => "SELECT * FROM table WHERE update_time >= :sql_last_value"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time"
last_run_metadata_path => "syncpoint_table"
}
}
jdbc_driver_library: jdbc mysql 驅動的路徑,在上一步中已經下載下傳
jdbc_driver_class: 驅動類的名字,mysql 填 com.mysql.jdbc.Driver 就好了
jdbc_connection_string: mysql 位址
jdbc_user: mysql 使用者
jdbc_password: mysql 密碼
schedule: 執行 sql 時機,類似 crontab 的排程
statement: 要執行的 sql,以 “:” 開頭是定義的變量,可以通過 parameters 來設定變量,這裡的 sql_last_value 是内置的變量,表示上一次 sql 執行中 update_time 的值,這裡 update_time 條件是 >= 因為時間有可能相等,沒有等号可能會漏掉一些增量
use_column_value: 使用遞增列的值
tracking_column_type: 遞增字段的類型,numeric 表示數值類型, timestamp 表示時間戳類型
tracking_column: 遞增字段的名稱,這裡使用 update_time 這一列,這列的類型是 timestamp
last_run_metadata_path: 同步點檔案(不是路徑),這個檔案記錄了上次的同步點,需要事先建立好,重新開機時會讀取這個檔案,這個檔案可以手動修改
logstash-output-elasticsearch
output {
elasticsearch {
hosts => ["172.31.22.165", "172.31.17.241", "172.31.30.84", "172.31.18.178"]
user => "<user>"
password => "<password>"
index => "table"
document_id => "%{id}"
}
}
hosts: es 叢集位址
user: es 使用者名
password: es 密碼
index: 導入到 es 中的 index 名,這裡我直接設定成了 mysql 表的名字
document_id: 導入到 es 中的文檔 id,這個需要設定成主鍵,否則同一條記錄更新後在 es 中會出現兩條記錄,%{id} 表示引用 mysql 表中 id 字段的值
運作
把上面的代碼儲存到一個配置檔案裡面 sync_table.cfg,執行下面指令即可
cd logstash-6.2.3 && bin/logstash -f config/sync_table.cfg
如果成功了會在标準輸出輸出執行的 sql 語句
[2018-04-14T18:12:00,278][INFO ][logstash.inputs.jdbc ] (0.001011s) SELECT version()
[2018-04-14T18:12:00,284][INFO ][logstash.inputs.jdbc ] (0.000723s) SELECT * FROM table WHERE update_time > ‘2018-04-14 17:55:00’
其他問題
多表同步
一個 logstash 執行個體可以借助 pipelines 機制同步多個表,隻需要寫多個配置檔案就可以了,假設我們有兩個表 table1 和 table2,對應兩個配置檔案 sync_table1.cfg 和 sync_table2.cfg
在 config/pipelines.yml 中配置
-
pipeline.id: table1
path.config: “config/sync_table1.cfg”
-
pipeline.id: table2
path.config: “config/sync_table2.cfg”
直接 bin/logstash 啟動即可
@timestamp 字段
預設情況下 @timestamp 字段是 logstash-input-jdbc 添加的字段,預設是目前時間,這個字段在資料分析的時候非常有用,但是有時候我們希望使用資料中的某些字段來指定這個字段,這個時候可以使用 filter.date, 這個插件是專門用來設定 @timestamp 這個字段的
比如我有我希望用字段 timeslice 來表示 @timestamp,timeslice 是一個字元串,格式為 %Y%m%d%H%M
filter {
date {
match => [ “timeslice”, “yyyyMMddHHmm” ]
timezone => “Asia/Shanghai”
}
}
把這一段配置加到 sync_table.cfg 中,現在 @timestamp 和 timeslice 一緻了
示例2:
input{
jdbc{
jdbc_driver_library => "logstash-6.3.2/driver/mysql-connector-java-5.1.30.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.1.2:3306/mysql?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true"
jdbc_user => "123456"
jdbc_password => "123456"
use_column_value => true
tracking_column_type=>"timestamp"
tracking_column => "upd_time"
last_run_metadata_path => "/temp/run_metadata/data"
clean_run => "false"
statement => "select * from my_data where upd_time> :sql_last_value order by upd_time asc"
schedule => "10 10 * * *"
#type => "data"
add_field => { "[@metadata][indextype]" => "data" }
}
jdbc{
jdbc_driver_library => "logstash-6.3.2/driver/mysql-connector-java-5.1.30.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.1.2:3306/mysql?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true"
jdbc_user => "123456"
jdbc_password => "123456"
use_column_value => true
tracking_column_type=>"timestamp"
tracking_column => "upd_time"
last_run_metadata_path => "/temp/run_metadata/data2"
clean_run => "false"
statement => "select * from my_data2 where upd_time> :sql_last_value order by upd_time asc"
schedule => "10 10 * * *"
#type => "data2"
add_field => { "[@metadata][indextype]" => "data2" }
}
}
filter {
date {
match => [ "upd_time", "yyyy-MM-dd HH:mm:ss" ]
timezone => "Asia/Shanghai"
}
date {
match => [ "created_time", "yyyy-MM-dd HH:mm:ss" ]
timezone => "Asia/Shanghai"
}
}
output{
if [@metadata][indextype] == "data" {
elasticsearch {
hosts => ["192.168.1.1:9200"]
index => "mydata"
document_id => "%{row_id}"
}
}
if [@metadata][indextype] == “data2” {
elasticsearch {
hosts => [“192.168.1.2:9200”]
index => “mydata2”
document_id => “%{row_id}”
}
}
# 這裡輸出調試,正式運作時可以注釋掉
stdout {
codec => json_lines
}
}
特别要注意的是:
1.select * from my_data where upd_time> :sql_last_value order by upd_time asc,必須對upd_time 進行排序,不然last_run_metadata_path 裡存儲的時間不是最近的更新時間。
2.涉及到多個查詢時,last_run_metadata_path 不能配置一樣,不然有覆寫問題。
3.配置檔案中,當在input的jdbc下,增加type屬性時,會導緻該索引下增加type字段。是以sql查詢出的字段不要用type,如果有,as成其他的名字,不然的話,這裡判斷會有異常。