天天看點

Pulsar IO之CDC Debezium Connector背景DebeziumPulsar IO入門參考如果有寫的不對的地方,歡迎大家指正

文章目錄

  • 背景
  • Debezium
  • Pulsar IO
  • 入門
    • 啟動MySQL
    • 啟動Pulsar standalone
    • Debezium connector
    • 訂閱Pulsar topic,監聽MySQL資料變化
    • 修改MySQL資料
  • 參考
  • 如果有寫的不對的地方,歡迎大家指正

背景

在業務系統中,會涉及到多個資料源的資料流轉,例如線上系統的資料流轉到分析系統、流計算系統、搜尋引擎、緩存系統、事件處理系統等。

如圖所示:

Pulsar IO之CDC Debezium Connector背景DebeziumPulsar IO入門參考如果有寫的不對的地方,歡迎大家指正

Debezium

官網位址:https://debezium.io/

Debezium是一個開源項目,為捕獲資料更改(Capture Data Change,CDC)提供了一個低延遲的流式處理平台,通過安裝配置Debezium監控資料庫,可以實時消費行級别(row-level)的更改。身為一個分布式系統,Debezium也擁有良好的容錯性。

Debezium的源端(即支援監控哪些資料庫) : MySQL,MongoDB,PostgreSQL,Oracle,SQL Server

Debezium的目标端(即可以資料導入端) : Kafka

Debezium的應用 : 實時同步資料,實時消費資料

Pulsar IO

Pulsar與Debezium對接,Pulsar IO 架構的内置 connector (https://pulsar.apache.org/docs/en/io-connectors),使實時同步,消費資料更加簡單。

入門

啟動MySQL

啟動MySQL,并開啟MySQL的binlog功能。Windows版本需配置my.ini檔案(Linux是my.cnf檔案),添加以下配置:

[mysqld]  
log-bin=mysql-bin #添加這一行就ok  
binlog-format=ROW #選擇row模式  
server_id=1 
           

啟動Pulsar standalone

#下載下傳Pulsar(目前是2.4.0版本):

$ wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz
           

#解壓:

$ tar -zxvf apache-pulsar-2.4.0-bin.tar.gz`
           

#啟動Pulsar standalone:

$ cd apache-pulsar-2.4.0
$ bin/pulsar standalone
           

Debezium connector

#建立 connectors檔案夾

$ mkdir connectors
           

#下載下傳pulsar-io-debezium-mysql-2.4.0.nar

$ cd connectors
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/pulsar/pulsar-2.4.0/connectors/pulsar-io-debezium-mysql-2.4.0.nar
           

#vim debezium-mysql-source-config.yaml

tenant: "public"
    namespace: "default"
    name: "debezium-mysql-source"
    topicName: "debezium-mysql-topic"
    archive: "connectors/pulsar-io-debezium-mysql-2.4.0.nar"
    
    parallelism: 1
    
    configs:
      ## config for mysql, docker image: debezium/example-mysql:0.8
      database.hostname: "MySQL的ip位址"
      database.port: "MySQL端口号"
      database.user: "username"
      database.password: "password"
      database.server.id: "184054"  #連接配接器的辨別符,在資料庫叢集中必須是唯一的,類似于Database的server-id配置屬性。
      database.server.name: "dbserver1" #資料庫伺服器/叢集的邏輯名稱,用于連接配接器寫入的Kafka主題的所有名稱
      database.serverTimezone: "UTC"  #時區,必須加
      database.whitelist: "inventory"   #用于比對要監視的資料庫名稱
      database.blacklist : "" #比對要從監視中排除的資料庫名稱
      table.whitelist: "" #比對要監視的表,每個辨別符的格式為databaseName.tableName
    
      database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
      database.history.pulsar.topic: "history-topic"
      database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
      ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
      key.converter: "org.apache.kafka.connect.json.JsonConverter"
      value.converter: "org.apache.kafka.connect.json.JsonConverter"
      ## PULSAR_SERVICE_URL_CONFIG
      pulsar.service.url: "pulsar://127.0.0.1:6650"
      ## OFFSET_STORAGE_TOPIC_CONFIG
      offset.storage.topic: "offset-topic"
           

#啟動Debezium連接配接

$ bin/pulsar-admin source localrun  --source-config-file connectors/debezium-mysql-source-config.yaml
           

訂閱Pulsar topic,監聽MySQL資料變化

PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://ip位址:6650").build();
	//Pulsar有三種訂閱主題形式: topic,topics, pattern topic,這裡采取topics方式
    List<String> topics = new ArrayList<>();
    topics.add("dbserver1.資料庫名.表名");
    ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.KeyValue(Schema.BYTES, Schema.BYTES));
    Consumer consumer = consumerBuilder
               .topics(topics).subscriptionName("sub-products")
               .subscribe();
    while(true) {
          Message msg = consumer.receive();
          KeyValue<byte[], byte[]> keyValues =  Schema.KeyValue(Schema.BYTES, Schema.BYTES).decode(msg.getData());
         JSONSchema<Map> jsonSchema = JSONSchema.of(SchemaDefinition.<Map>builder().withPojo(Map.class).build());
          Map keyResult = jsonSchema.decode(keyValues.getKey());
          Map valueResult = jsonSchema.decode(keyValues.getValue());
          //TODO 解析binlog日志

          consumer.acknowledge(msg);
    }
           

這裡講一下用戶端消費的binlog日志的payload字段:

"payload": {
"before": null,
"after": {
"id": 1,
"userName": "userName",
"passWord": "123456",
"user_sex": "1",
"nick_name": "昵稱"
},
"source": {
"version": "0.9.2.Final",
"connector": "mysql",
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000002",
"pos": 55725,
"row": 0,
"snapshot": true,
"thread": null,
"db": "test",
"table": "users",
"query": null
},
"op": "c",
"ts_ms": 1563096917451
}
           

op是表示MySQL資料的操作:c建立(或插入),u更新,d删除和r讀取

before,after:分别表示操作前的資料以及操作後的資料

source:事件源中繼資料的結構

修改MySQL資料

mysql> use inventory;
mysql> show tables;
mysql> SELECT * FROM  products ;
mysql> UPDATE products SET name='1111111111' WHERE id=101;
mysql> UPDATE products SET name='1111111111' WHERE id=107;
           

參考

https://pulsar.apache.org/docs/en/io-cdc-debezium/

https://debezium.io/docs/connectors/mysql/

如果有寫的不對的地方,歡迎大家指正

繼續閱讀