logstash和kafka_connector都僅支援基于自增id或者時間戳更新的方式增量同步資料。
回到問題本身:如果庫表裡沒有相關字段,該如何處理呢?
本文給出相關探讨和解決方案。
1、 binlog認知
1.1 啥是 binlog?
binlog是Mysql sever層維護的一種二進制日志,與innodb引擎中的redo/undo log是完全不同的日志;
其主要是用來記錄對mysql資料更新或潛在發生更新的SQL語句,并以"事務"的形式儲存在磁盤中;
作用主要有:
1)複制:達到master-slave資料一緻的目的。
2)資料恢複:通過mysqlbinlog工具恢複資料。
3)增量備份。
1.2 阿裡的Canal實作了增量Mysql同步
一圖勝千言,canal是用java開發的基于資料庫增量日志解析、提供增量資料訂閱&消費的中間件。
目前,canal主要支援了MySQL的binlog解析,解析完成後才利用canal client 用來處理獲得的相關資料。目的:增量資料訂閱&消費。
綜上,使用binlog可以突破logstash或者kafka-connector沒有自增id或者沒有時間戳字段的限制,實作增量同步。
2、基于binlog的同步方式
1)基于kafka Connect的Debezium 開源工程,位址:.
https://debezium.io/2)不依賴第三方的獨立應用: Maxwell開源項目,位址:
http://maxwells-daemon.io/由于已經部署過conluent(kafka的企業版本,自帶zookeeper、kafka、ksql、kafka-connector等),本文僅針對Debezium展開。
3、Debezium介紹
Debezium是捕獲資料實時動态變化的開源的分布式同步平台。能實時捕獲到資料源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,實時同步到Kafka,穩定性強且速度非常快。
特點:
1)簡單。無需修改應用程式。可對外提供服務。
2)穩定。持續跟蹤每一行的每一處變動。
3)快速。建構于kafka之上,可擴充,經官方驗證可處理大容量的資料。
4、同步架構
如圖,Mysql到ES的同步政策,采取“曲線救國”機制。
步驟1: 基Debezium的binlog機制,将Mysql資料同步到Kafka。
步驟2: 基于Kafka_connector機制,将kafka資料同步到Elasticsearch。
5、Debezium實作Mysql到ES增删改實時同步
軟體版本:
confluent:5.1.2;
Debezium:0.9.2_Final;
Mysql:5.7.x.
Elasticsearch:6.6.1
5.1 Debezium安裝
confluent的安裝部署參見:
http://t.cn/Ef5poZk,不再贅述。
Debezium的安裝隻需要把debezium-connector-mysql的壓縮包解壓放到Confluent的解壓後的插件目錄(share/java)中。
MySQL Connector plugin 壓縮包的下載下傳位址:
https://debezium.io/docs/install/。
注意重新開機一下confluent,以使得Debezium生效。
5.2 Mysql binlog等相關配置。
Debezium使用MySQL的binlog機制實作資料動态變化監測,是以需要Mysql提前配置binlog。
核心配置如下,在Mysql機器的/etc/my.cnf的mysqld下添加如下配置。
[mysqld]
server-id = 223344
log_bin = mysql-bin
binlog_format = row
binlog_row_image = full
expire_logs_days = 10
1
2
3
4
5
6
7
然後,重新開機一下Mysql以使得binlog生效。
systemctl start mysqld.service
5.3 配置connector連接配接器。
配置confluent路徑目錄 : /etc
建立檔案夾指令 :
mkdir kafka-connect-debezium
在mysql2kafka_debezium.json存放connector的配置資訊 :
[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json
{
"name" : "debezium-mysql-source-0223",
"config":
{
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"database.hostname" : "192.168.1.22",
"database.port" : "3306",
"database.user" : "root",
"database.password" : "XXXXXX",
"database.whitelist" : "kafka_base_db",
"table.whitlelist" : "accounts",
"database.server.id" : "223344",
"database.server.name" : "full",
"database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",
"database.history.kafka.topic" : "account_topic",
"include.schema.changes" : "true" ,
"incrementing.column.name" : "id",
"database.history.skip.unparseable.ddl" : "true",
"transforms": "unwrap,changetopic",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changetopic.regex":"(.*)",
"transforms.changetopic.replacement":"$1-smt"
}
}
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
注意如下配置:
“database.server.id”,對應Mysql中的server-id的配置。
“database.whitelist” : 待同步的Mysql資料庫名。
“table.whitlelist” :待同步的Mysq表名。
重要:“database.history.kafka.topic”:存儲資料庫的Shcema的記錄資訊,而非寫入資料的topic、
“database.server.name”:邏輯名稱,每個connector確定唯一,作為寫入資料的kafka topic的字首名稱。
坑一:transforms相關5行配置作用是寫入資料格式轉換。
如果沒有,輸入資料會包含:before、after記錄修改前對比資訊以及中繼資料資訊(source,op,ts_ms等)。
這些資訊在後續資料寫入Elasticsearch是不需要的。(注意結合自己業務場景)。
格式轉換相關原理:
https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/5.4 啟動connector
curl -X POST -H "Content-Type:application/json"
--data @mysql2kafka_debezium.json.json
http://192.168.1.22:18083/connectors| jq
5.5 驗證寫入是否成功。
檢視kafka-topic
kafka-topics --list --zookeeper localhost:2181
此處會看到寫入資料topic的資訊。
注意新寫入資料topic的格式:database.schema.table-smt 三部分組成。
本示例topic名稱:full.kafka_base_db.account-smt。
消費資料驗證寫入是否正常
./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning
至此,Debezium實作mysql同步kafka完成。
6、kafka-connector實作kafka同步Elasticsearch
6.1、Kafka-connector介紹
見官網:https://docs.confluent.io/current/connect.html
Kafka Connect是一個用于連接配接Kafka與外部系統(如資料庫,鍵值存儲,檢索系統索引和檔案系統)的架構。
連接配接器實作公共資料源資料(如Mysql、Mongo、Pgsql等)寫入Kafka,或者Kafka資料寫入目标資料庫,也可以自己開發連接配接器。
6.2、kafka到ES connector同步配置
配置路徑:
/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
配置内容:
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "full.kafka_base_db.account-smt",
"key.ignore": "true",
"connection.url": "http://192.168.1.22:9200",
"type.name": "_doc",
"name": "elasticsearch-sink-test"
6.3 kafka到ES啟動connector
啟動指令
confluent load elasticsearch-sink-test
-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
6.4 Kafka-connctor RESTFul API檢視
Mysql2kafka,kafka2ES的connector詳情資訊可以借助postman或者浏覽器或者指令行檢視。
curl -X GET http://localhost:8083/connectors
7、坑複盤。
坑2: 同步的過程中可能出現錯誤,比如:kafka topic沒法消費到資料。
排解思路如下:
1)确認消費的topic是否是寫入資料的topic;
2)确認同步的過程中沒有出錯。可以借助connector如下指令檢視。
curl -X GET http://localhost:8083/connectors-xxx/status
坑3: Mysql2ES出現日期格式不能識别。
是Mysql jar包的問題,解決方案:在my.cnf中配置時區資訊即可。
坑4: kafka2ES,ES沒有寫入資料。
排解思路:
1)建議:先建立同topic名稱一緻的索引,注意:Mapping靜态自定義,不要動态識别生成。
2)通過connetor/status排查出錯原因,一步步分析。
8、小結
binlog的實作突破了字段的限制,實際上業界的go-mysql-elasticsearch已經實作。
對比:logstash、kafka-connector,雖然Debezium“曲線救國”兩步實作了實時同步,但穩定性+實時性能相對不錯。
推薦大家使用。大家有好的同步方式也歡迎留言讨論交流。
參考:
[1]
https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/[2]
https://www.smwenku.com/a/5c0a7b61bd9eee6fb21356a1/zh-cn[3]
https://juejin.im/post/5b7c036bf265da43506e8cfd[4]
https://debezium.io/docs/connectors/mysql/#configuration[5]
https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc