文章目錄
- 背景
- Debezium
- Pulsar IO
- 入門
-
- 啟動MySQL
- 啟動Pulsar standalone
- Debezium connector
- 訂閱Pulsar topic,監聽MySQL資料變化
- 修改MySQL資料
- 參考
- 如果有寫的不對的地方,歡迎大家指正
背景
在業務系統中,會涉及到多個資料源的資料流轉,例如線上系統的資料流轉到分析系統、流計算系統、搜尋引擎、緩存系統、事件處理系統等。
如圖所示:
Debezium
官網位址:https://debezium.io/
Debezium是一個開源項目,為捕獲資料更改(Capture Data Change,CDC)提供了一個低延遲的流式處理平台,通過安裝配置Debezium監控資料庫,可以實時消費行級别(row-level)的更改。身為一個分布式系統,Debezium也擁有良好的容錯性。
Debezium的源端(即支援監控哪些資料庫) : MySQL,MongoDB,PostgreSQL,Oracle,SQL Server
Debezium的目标端(即可以資料導入端) : Kafka
Debezium的應用 : 實時同步資料,實時消費資料
Pulsar IO
Pulsar與Debezium對接,Pulsar IO 架構的内置 connector (https://pulsar.apache.org/docs/en/io-connectors),使實時同步,消費資料更加簡單。
入門
啟動MySQL
啟動MySQL,并開啟MySQL的binlog功能。Windows版本需配置my.ini檔案(Linux是my.cnf檔案),添加以下配置:
[mysqld]
log-bin=mysql-bin #添加這一行就ok
binlog-format=ROW #選擇row模式
server_id=1
啟動Pulsar standalone
#下載下傳Pulsar(目前是2.4.0版本):
$ wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz
#解壓:
$ tar -zxvf apache-pulsar-2.4.0-bin.tar.gz`
#啟動Pulsar standalone:
$ cd apache-pulsar-2.4.0
$ bin/pulsar standalone
Debezium connector
#建立 connectors檔案夾
$ mkdir connectors
#下載下傳pulsar-io-debezium-mysql-2.4.0.nar
$ cd connectors
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/pulsar/pulsar-2.4.0/connectors/pulsar-io-debezium-mysql-2.4.0.nar
#vim debezium-mysql-source-config.yaml
tenant: "public"
namespace: "default"
name: "debezium-mysql-source"
topicName: "debezium-mysql-topic"
archive: "connectors/pulsar-io-debezium-mysql-2.4.0.nar"
parallelism: 1
configs:
## config for mysql, docker image: debezium/example-mysql:0.8
database.hostname: "MySQL的ip位址"
database.port: "MySQL端口号"
database.user: "username"
database.password: "password"
database.server.id: "184054" #連接配接器的辨別符,在資料庫叢集中必須是唯一的,類似于Database的server-id配置屬性。
database.server.name: "dbserver1" #資料庫伺服器/叢集的邏輯名稱,用于連接配接器寫入的Kafka主題的所有名稱
database.serverTimezone: "UTC" #時區,必須加
database.whitelist: "inventory" #用于比對要監視的資料庫名稱
database.blacklist : "" #比對要從監視中排除的資料庫名稱
table.whitelist: "" #比對要監視的表,每個辨別符的格式為databaseName.tableName
database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
database.history.pulsar.topic: "history-topic"
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
key.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
## PULSAR_SERVICE_URL_CONFIG
pulsar.service.url: "pulsar://127.0.0.1:6650"
## OFFSET_STORAGE_TOPIC_CONFIG
offset.storage.topic: "offset-topic"
#啟動Debezium連接配接
$ bin/pulsar-admin source localrun --source-config-file connectors/debezium-mysql-source-config.yaml
訂閱Pulsar topic,監聽MySQL資料變化
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://ip位址:6650").build();
//Pulsar有三種訂閱主題形式: topic,topics, pattern topic,這裡采取topics方式
List<String> topics = new ArrayList<>();
topics.add("dbserver1.資料庫名.表名");
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.KeyValue(Schema.BYTES, Schema.BYTES));
Consumer consumer = consumerBuilder
.topics(topics).subscriptionName("sub-products")
.subscribe();
while(true) {
Message msg = consumer.receive();
KeyValue<byte[], byte[]> keyValues = Schema.KeyValue(Schema.BYTES, Schema.BYTES).decode(msg.getData());
JSONSchema<Map> jsonSchema = JSONSchema.of(SchemaDefinition.<Map>builder().withPojo(Map.class).build());
Map keyResult = jsonSchema.decode(keyValues.getKey());
Map valueResult = jsonSchema.decode(keyValues.getValue());
//TODO 解析binlog日志
consumer.acknowledge(msg);
}
這裡講一下用戶端消費的binlog日志的payload字段:
"payload": {
"before": null,
"after": {
"id": 1,
"userName": "userName",
"passWord": "123456",
"user_sex": "1",
"nick_name": "昵稱"
},
"source": {
"version": "0.9.2.Final",
"connector": "mysql",
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000002",
"pos": 55725,
"row": 0,
"snapshot": true,
"thread": null,
"db": "test",
"table": "users",
"query": null
},
"op": "c",
"ts_ms": 1563096917451
}
op是表示MySQL資料的操作:c建立(或插入),u更新,d删除和r讀取
before,after:分别表示操作前的資料以及操作後的資料
source:事件源中繼資料的結構
修改MySQL資料
mysql> use inventory;
mysql> show tables;
mysql> SELECT * FROM products ;
mysql> UPDATE products SET name='1111111111' WHERE id=101;
mysql> UPDATE products SET name='1111111111' WHERE id=107;
參考
https://pulsar.apache.org/docs/en/io-cdc-debezium/
https://debezium.io/docs/connectors/mysql/