天天看點

幹貨 | Debezium實作Mysql到Elasticsearch高效實時同步

幹貨 | Debezium實作Mysql到Elasticsearch高效實時同步
連結

logstash和kafka_connector都僅支援基于自增id或者時間戳更新的方式增量同步資料。

回到問題本身:如果庫表裡沒有相關字段,該如何處理呢?

本文給出相關探讨和解決方案。

1、 binlog認知

1.1 啥是 binlog?

binlog是Mysql sever層維護的一種二進制日志,與innodb引擎中的redo/undo log是完全不同的日志;

其主要是用來記錄對mysql資料更新或潛在發生更新的SQL語句,并以"事務"的形式儲存在磁盤中;

作用主要有:

1)複制:達到master-slave資料一緻的目的。

2)資料恢複:通過mysqlbinlog工具恢複資料。

3)增量備份。

1.2 阿裡的Canal實作了增量Mysql同步

幹貨 | Debezium實作Mysql到Elasticsearch高效實時同步

一圖勝千言,canal是用java開發的基于資料庫增量日志解析、提供增量資料訂閱&消費的中間件。

目前,canal主要支援了MySQL的binlog解析,解析完成後才利用canal client 用來處理獲得的相關資料。目的:增量資料訂閱&消費。

綜上,使用binlog可以突破logstash或者kafka-connector沒有自增id或者沒有時間戳字段的限制,實作增量同步。

2、基于binlog的同步方式

1)基于kafka Connect的Debezium 開源工程,位址:.

https://debezium.io/

2)不依賴第三方的獨立應用: Maxwell開源項目,位址:

http://maxwells-daemon.io/

由于已經部署過conluent(kafka的企業版本,自帶zookeeper、kafka、ksql、kafka-connector等),本文僅針對Debezium展開。

3、Debezium介紹

Debezium是捕獲資料實時動态變化的開源的分布式同步平台。能實時捕獲到資料源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,實時同步到Kafka,穩定性強且速度非常快。

特點:

1)簡單。無需修改應用程式。可對外提供服務。

2)穩定。持續跟蹤每一行的每一處變動。

3)快速。建構于kafka之上,可擴充,經官方驗證可處理大容量的資料。

4、同步架構

幹貨 | Debezium實作Mysql到Elasticsearch高效實時同步

如圖,Mysql到ES的同步政策,采取“曲線救國”機制。

步驟1: 基Debezium的binlog機制,将Mysql資料同步到Kafka。

步驟2: 基于Kafka_connector機制,将kafka資料同步到Elasticsearch。

5、Debezium實作Mysql到ES增删改實時同步

軟體版本:

confluent:5.1.2;

Debezium:0.9.2_Final;

Mysql:5.7.x.

Elasticsearch:6.6.1

5.1 Debezium安裝

confluent的安裝部署參見:

http://t.cn/Ef5poZk

,不再贅述。

Debezium的安裝隻需要把debezium-connector-mysql的壓縮包解壓放到Confluent的解壓後的插件目錄(share/java)中。

MySQL Connector plugin 壓縮包的下載下傳位址:

https://debezium.io/docs/install/

注意重新開機一下confluent,以使得Debezium生效。

5.2 Mysql binlog等相關配置。

Debezium使用MySQL的binlog機制實作資料動态變化監測,是以需要Mysql提前配置binlog。

核心配置如下,在Mysql機器的/etc/my.cnf的mysqld下添加如下配置。

[mysqld]

server-id         = 223344

log_bin           = mysql-bin

binlog_format     = row

binlog_row_image  = full

expire_logs_days  = 10

1

2

3

4

5

6

7

然後,重新開機一下Mysql以使得binlog生效。

systemctl start mysqld.service

5.3 配置connector連接配接器。

配置confluent路徑目錄 : /etc

建立檔案夾指令 :

mkdir kafka-connect-debezium

在mysql2kafka_debezium.json存放connector的配置資訊 :

[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json

{

       "name" : "debezium-mysql-source-0223",

       "config":

       {

            "connector.class" : "io.debezium.connector.mysql.MySqlConnector",

            "database.hostname" : "192.168.1.22",

            "database.port" : "3306",

            "database.user" : "root",

            "database.password" : "XXXXXX",

            "database.whitelist" : "kafka_base_db",

            "table.whitlelist" : "accounts",

            "database.server.id" : "223344",

            "database.server.name" : "full",

            "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",

            "database.history.kafka.topic" : "account_topic",

            "include.schema.changes" : "true" ,

            "incrementing.column.name" : "id",

            "database.history.skip.unparseable.ddl" : "true",

            "transforms": "unwrap,changetopic",

            "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",

            "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",

            "transforms.changetopic.regex":"(.*)",

            "transforms.changetopic.replacement":"$1-smt"

       }

}

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

注意如下配置:

“database.server.id”,對應Mysql中的server-id的配置。

“database.whitelist” : 待同步的Mysql資料庫名。

“table.whitlelist” :待同步的Mysq表名。

重要:“database.history.kafka.topic”:存儲資料庫的Shcema的記錄資訊,而非寫入資料的topic、

“database.server.name”:邏輯名稱,每個connector確定唯一,作為寫入資料的kafka topic的字首名稱。

坑一:transforms相關5行配置作用是寫入資料格式轉換。

如果沒有,輸入資料會包含:before、after記錄修改前對比資訊以及中繼資料資訊(source,op,ts_ms等)。

這些資訊在後續資料寫入Elasticsearch是不需要的。(注意結合自己業務場景)。

格式轉換相關原理:

https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

5.4 啟動connector

curl -X POST -H "Content-Type:application/json"

--data @mysql2kafka_debezium.json.json

http://192.168.1.22:18083/connectors

| jq

5.5 驗證寫入是否成功。

檢視kafka-topic

kafka-topics --list --zookeeper localhost:2181

此處會看到寫入資料topic的資訊。

注意新寫入資料topic的格式:database.schema.table-smt 三部分組成。

本示例topic名稱:full.kafka_base_db.account-smt。

消費資料驗證寫入是否正常

./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning

至此,Debezium實作mysql同步kafka完成。

6、kafka-connector實作kafka同步Elasticsearch

6.1、Kafka-connector介紹

見官網:https://docs.confluent.io/current/connect.html

Kafka Connect是一個用于連接配接Kafka與外部系統(如資料庫,鍵值存儲,檢索系統索引和檔案系統)的架構。

連接配接器實作公共資料源資料(如Mysql、Mongo、Pgsql等)寫入Kafka,或者Kafka資料寫入目标資料庫,也可以自己開發連接配接器。

6.2、kafka到ES connector同步配置

配置路徑:

/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

配置内容:

"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"tasks.max": "1",

"topics": "full.kafka_base_db.account-smt",

"key.ignore": "true",

"connection.url": "http://192.168.1.22:9200",

"type.name": "_doc",

"name": "elasticsearch-sink-test"

6.3 kafka到ES啟動connector

啟動指令

confluent load  elasticsearch-sink-test

-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

6.4 Kafka-connctor RESTFul API檢視

Mysql2kafka,kafka2ES的connector詳情資訊可以借助postman或者浏覽器或者指令行檢視。

  curl -X GET http://localhost:8083/connectors

7、坑複盤。

坑2: 同步的過程中可能出現錯誤,比如:kafka topic沒法消費到資料。

排解思路如下:

1)确認消費的topic是否是寫入資料的topic;

2)确認同步的過程中沒有出錯。可以借助connector如下指令檢視。

  curl -X GET http://localhost:8083/connectors-xxx/status

坑3: Mysql2ES出現日期格式不能識别。

是Mysql jar包的問題,解決方案:在my.cnf中配置時區資訊即可。

坑4: kafka2ES,ES沒有寫入資料。

排解思路:

1)建議:先建立同topic名稱一緻的索引,注意:Mapping靜态自定義,不要動态識别生成。

2)通過connetor/status排查出錯原因,一步步分析。

8、小結

binlog的實作突破了字段的限制,實際上業界的go-mysql-elasticsearch已經實作。

對比:logstash、kafka-connector,雖然Debezium“曲線救國”兩步實作了實時同步,但穩定性+實時性能相對不錯。

推薦大家使用。大家有好的同步方式也歡迎留言讨論交流。

參考:

[1]

https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/

[2]

https://www.smwenku.com/a/5c0a7b61bd9eee6fb21356a1/zh-cn

[3]

https://juejin.im/post/5b7c036bf265da43506e8cfd

[4]

https://debezium.io/docs/connectors/mysql/#configuration

[5]

https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc