天天看點

使用Logstash實作mysql同步資料到ES——《我的Java打怪日記》

mysql 作為成熟穩定的資料持久化解決方案,廣泛地應用在各種領域,但是有些時候,我們在做查詢時,由于查詢條件的多樣、變化多端(比如根據時間查、根據名稱模糊查、根據id查等等),或者查詢的資料來自很多不同的庫表或者系統,這時就很難以一個較快的速度(幾百毫秒)直接擷取我們想要的資料,而 elasticsearch 作為資料分析領域的佼佼者,剛好可以彌補這項不足,而我們要做的隻需要将 mysql 中的資料同步到 elasticsearch 中即可,而 logstash 剛好就是一個同步神器,能夠很好的滿足我們的需求。

一:版本說明

logstash:7.6.1

elasticsearch:7.5.2

jdk:1.8

下載下傳對應平台的壓縮包,解壓即可使用。

二:原理

使用logstash自帶的logstash-inpu-jdbc插件,實作從mysql向es同步資料

官方插件使用說明:

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

三:配置

使用 logstash-input-jdbc 插件讀取 mysql 的資料,原理就是定時執行一個 sql,然後将 sql 執行的結果寫入到流中;增量擷取的方式沒有通過 binlog 方式同步,而是用一個遞增字段作為條件去查詢,每次都記錄目前查詢的位置,由于遞增的特性,隻需要查詢比目前大的記錄即可擷取這段時間内的全部增量,一般的遞增字段有兩種,AUTO_INCREMENT 的主鍵 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段隻适用于那種隻有插入沒有更新的表,update_time 更加通用一些,建議在 mysql 表設計的時候都增加一個 update_time 字段.

input {
# 使用logstash-input-jdbc插件
  jdbc {
    # mysql連接配接驅動包
    jdbc_driver_library => "/data/www/logstash-7.6.1/mysql-connector-java-5.1.44.jar"
    # mysql連接配接驅動類
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # mysql jdbc連接配接串
    jdbc_connection_string => "jdbc:mysql://xxxxxxxxxxx"
    # 設定時區
    jdbc_default_timezone => "Asia/Shanghai"
    # 設定連接配接資料庫的使用者名和密碼
    jdbc_user => "xxx"
    jdbc_password => "xxxxx"
    # 開啟分頁
    jdbc_paging_enabled => "true"
    # 開啟分頁後,每頁資料條數大小
    jdbc_page_size => "1000"
    # 使用本地時區
    plugin_timezone => "local"
    # 開啟大小寫敏感
    lowercase_column_names => "false"
    # 同時時間間隔,分别對應”分 時 月 周 年“ 後面可以設定時區,也可不設,如果注釋schedule,則隻運作一次同步,程序及停止
    schedule => "*/5 * * * * Asia/Shanghai"
    # 讀取資料庫使用的SQL檔案,其中有一個需要特别說明的關鍵字:sql_last_value,此值是讀取last_run_metadata_path檔案内記錄的值
    statement_filepath => "/data/www/logstash-7.6.1/conf/xxxxxxx.sql"
    # 記錄每次同步tracking_column_type追蹤的值
    last_run_metadata_path => "/data/www/logstash-7.6.1/conf/xxxx_modify_time"
    # 開啟記錄最後同步的值
    record_last_run => "true"
    # 設定false為記錄時間戳,true為其他字段的值
    use_column_value => "false"
    # 追蹤字段的類型,可選timestamp、numeric
    tracking_column_type => timestamp
    # 具體跟蹤的同步表字段
    tracking_column => "modify_time"
    # 是否清除 last_run_metadata_path 的記錄,如果為真那麼每次都相當于從頭開始查詢所有的資料庫記錄
    clean_run => "false"
  }
 
}
 
#filter部分對資料進行處理,此處隻對@timestamp字段處理,設定為本地時間
filter {
 
    ruby {
      code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
    }
    ruby {
      code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
      remove_field => ["timestamp"]
    }
}
 
output {
#向es同步資料的相關配置
  elasticsearch {
    hosts => ["127.0.0.1:9200","127.0.0.1:9300"]
    #将資料中id字段設定為es中索引的_id,不使用es自動生成的随機_id,提供寫入速度
    document_id => "%{id}"
    #定義es中索引名稱,如es沒有建立或不存在,會自動建立,自動建立的索引可能會不滿足要求,建議使用自行建立的索引
    index => "xxxx_index"
    document_type => "_doc"
  }
 
# 以下配置為debug,可在測試中開啟,友善定位問題,生産或正式使用務必關閉,以免産生大量日志
  stdout {
    codec => rubydebug
  }
 
}           
  • jdbc_driver_library

mysql-connector-java-5.1.44.jar的存放目錄,這個一定要配置正确,支援全路徑和相對路徑。

  • jdbc_driver_class

驅動類的名字,mysql 填 com.mysql.jdbc.Driver 就好了

  • sql_last_value

标志目前logstash同步的位置資訊(類似offset)。比如id、updatetime。logstash通過這個标志,可以判斷目前同步到哪一條資料。

  • statement

執行同步的sql語句,可以同步部分資料;

  • statement_filepath

存儲執行同步的sql語句,不和statement同時使用;

  • schedule

定時器,表示每隔多長時間同步一次資料,格式類似crontab;

  • tracking_column

表示表中哪一列用于判斷logstash同步的位置資訊,與sql_last_value比較判斷是否需要同步這條資料;

  • tracking_column_type

racking_column指定列的類型,支援兩種類型:numeric(預設)、timestamp;

  • last_run_metadata_path

存儲sql_last_value值的檔案名稱及位置。

  • document_id

生成elasticsearch的文檔值,盡量使用同步的資料中已有的唯一辨別。

注意⚠️

1、同步單張表可以使用:logstash -f /path/to/xxxx.conf

2、同步多張表時,可通過pipeline管道的方式

3、當表資料超過百萬級别時,建議分批同步,可按照每100w條資料同步一次來進行,以加快同步速度,具體可使用表id的區間來确定同步數量,則在SQL中增加id區間值。           

多表同步的pipeline.yml配置如下:

- pipeline.id: xxxx1
  pipeline.workers: 1
  path.config: /data/www/logstash-7.6.1/member_conf/xxxx1.conf
 
- pipeline.id: xxxx2
  pipeline.workers: 1
  path.config: /data/www/logstash-7.6.1/member_conf/xxxx2.conf
 
- pipeline.id: xxxx3
  pipeline.workers: 1
  path.config: /data/www/logstash-7.6.1/member_conf/xxxx3.conf
 
- pipeline.id: xxxx4
  pipeline.workers: 1
  path.config: /data/www/logstash-7.6.1/member_conf/xxxx4.conf           

四:啟動

單表啟動,指定conf配置檔案的位置

bin/logstash -f config/logstash-mysql-es.conf > llogstash.log 2>&1 &           

當使用多表同步時,啟動時,不需要指定具體的conf配置檔案,即:

bin/logstash > logstash.log 2>&1 &