天天看點

logstash-input-jdbc 同步原理及相關問題解讀

前言:

基于logstash-input-jdbc較其他插件的穩定性、易用性、版本和ES同步更新的特點,以下研究主要針對 logstash-input-jdbc 展開。

針對logstash-input-jdbc常見的幾個疑難問題,部分問題也在git和stackoverflow進行了激烈讨論,以下統一給出驗證和解答。

1、logstash-input-jdbc 的同步原理是什麼?

(1)對于全量同步依據

配置檔案jdbc.sql的sql語句的進行同步。

(2)對于增量實時同步依據

1)設定的定時政策。

如最小更新間隔每分鐘更新一次設定:schedule => “ *”,目前最小更新間隔為1分鐘,驗證發現,不支援60s以内的秒級更新。

2)設定的sql語句。

如jdbc.sql, 決定同步哪些内容及同步更新的條件。

{"id":10,"name":"10test","@version":"1","@timestamp":"2016-06-29T03:18:00.177Z","type":"132c_type"}           

2、logstash-input-jdbc 隻支援基于時間同步嗎?

驗證表名:同步更新除了支援根據時間同步外,還支援根據某自增列(如:自增ID)字段的變化進行同步。

上次舉例隻是舉了同步時間變化的例子,設定條件:

[root@5b9dbaaa148a logstash_jdbc_test]# cat jdbc.sql_bak

select
        *
from
        cc
where   cc.modified_at > :sql_last_value           

實際進一步研究發現,在配置檔案中有use_column_value字段決定,是否需要記錄某個column 的值,如果 record_last_run 為真,可以自定義我們需要 track 的 column 名稱,此時該參數就要為 true. 否則預設 track 的是 timestamp 的值.

舉例:以下即是設定以id的變化作為同步條件的。

[root@5b9dbaaa148a logstash_jdbc_test]# cat jdbc_xm.sql
select
        *
from
        cc
where   cc.id >= :sql_last_value           

我們可以指定檔案,來記錄上次執行到的 tracking_column 字段的值 比如上次資料庫有 12 條記錄,查詢完後該檔案中就會有數字 12 這樣的記錄,下次執行 SQL 查詢可以從 13 條處開始.

我們隻需要在 SQL 語句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是該檔案中的值(12).

last_run_metadata_path => “/etc/logstash/run_metadata.d/my_info”           

如:

[root@5b9 run_metadata.d]# cat /etc/logstash/run_metadata.d/my_info

--- 12           

已全局代碼搜尋,沒有觸發器trigger相關處理操作。

3、mysql和ES分别存儲在兩台伺服器,且時間不一緻,能否實作同步?

(1) 設定對于以時間作為判定條件的增量同步,可以以設定的時間為基準點進行同步。

驗證發現:

顯示的時間戳timestamp為ES上的UTC時間值(不論ES機器是什麼時區,都會修改為UTC時間存入ES),顯示的modified_at時間值為同步過來的mysql時間值轉化為UTC的結果值。

更新的前提是必須滿足: cc.modified_at >= :sql_last_value。即如果mysql的時間修改為小于sql_last_value的時刻值,是無法進行同步的。

[elasticsearch@5b9dbaaa148a run_metadata.d]$ cat my_info

--- 2016-06-29 02:19:00.182000000 Z           

(2) 對于標明某列作為判定條件(如自增ID),兩者(mysql和ES)時間不一緻,實際是也可以同步更新的。

測試設定的時間是mysql比ES早一天或者晚一天的時刻值,都可以實作同步更新操作。

4、如何支援實時同步mysql的delete操作到ES中?

logstash-input-jdbc插件不支援實體删除的同步更新。詳見:

http://stackoverflow.com/questions/35813923/sync-postgresql-data-with-elasticsearch/35823497#35823497 https://github.com/logstash-plugins/logstash-input-jdbc/issues/145

解決方案:

同步删除操作改為同步update更新操作實作。

第一步:進行軟體删除,而不是實體删除操作。

先不實體删除記錄,而是軟體删除,即新增一個 flag 列,辨別記錄是否已經被删除(預設為false,設定為true或者deleted代表已經被删除,業界通用方法),這樣,通過已有的同步機制,相同的标記記錄該行資料會同步更新到Elasticsearch。

第二步:ES中檢索flag标記為true或者deleted的字段資訊。

在ES可以執行簡單的term查詢操作,檢索出已經删除的資料資訊。

第三步:定時實體删除。

設定定時事件,間隔一段時間實體删除掉mysql和ES中的flag字段标記為true或deleted的記錄,即完成實體删除操。

作者:銘毅天下

轉載請标明出處,原文位址:

http://blog.csdn.net/laoyang360/article/details/51793301