天天看點

Flink1.11中的CDC Connectors操作實踐

Flink1.11引入了CDC的connector,通過這種方式可以很友善地捕獲變化的資料,大大簡化了資料處理的流程。Flink1.11的CDC connector主要包括:

MySQL CDC

Postgres CDC

,同時對Kafka的Connector支援

canal-json

debezium-json

以及

changelog-json

的format。本文主要分享以下内容:

  • CDC簡介
  • Flink提供的 table format
  • 使用過程中的注意點
  • mysql-cdc的操作實踐
  • canal-json的操作實踐
  • changelog-json的操作實踐

簡介

Flink CDC Connector 是ApacheFlink的一組資料源連接配接器,使用變化資料捕獲change data capture (CDC)從不同的資料庫中提取變更資料。Flink CDC連接配接器将Debezium內建為引擎來捕獲資料變更。是以,它可以充分利用Debezium的功能。

特點

  • 支援讀取資料庫快照,并且能夠持續讀取資料庫的變更日志,即使發生故障,也支援exactly-once 的處理語義
  • 對于DataStream API的CDC connector,使用者無需部署Debezium和Kafka,即可在單個作業中使用多個資料庫和表上的變更資料。
  • 對于Table/SQL API 的CDC connector,使用者可以使用SQL DDL建立CDC資料源,來監視單個表上的資料變更。

使用場景

  • 資料庫之間的增量資料同步
  • 審計日志
  • 資料庫之上的實時物化視圖
  • 基于CDC的維表join

Flink提供了一系列可以用于table connector的table format,具體如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro
Debezium CDC Apache Kafka
Canal CDC
Apache Parquet Filesystem
Apache ORC

使用MySQL CDC的注意點

如果要使用MySQL CDC connector,對于程式而言,需要添加如下依賴:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.0.0</version>
</dependency>
           

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将該jar包放在Flink安裝目錄的lib檔案夾下即可。

使用canal-json的注意點

如果要使用Kafka的canal-json,對于程式而言,需要添加如下依賴:

<!-- universal -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.0</version>
</dependency>


           

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,将該jar包放在Flink安裝目錄的lib檔案夾下即可。由于Flink1.11的安裝包 的lib目錄下并沒有提供該jar包,是以必須要手動添加依賴包,否則會報如下錯誤:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
mysql-cdc
           

使用changelog-json的注意點

如果要使用Kafka的changelog-json Format,對于程式而言,需要添加如下依賴:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-format-changelog-json</artifactId>
  <version>1.0.0</version>
</dependency>
           

如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,将該jar包放在Flink安裝目錄的lib檔案夾下即可。

建立MySQL資料源表

在建立MySQL CDC表之前,需要先建立MySQL的資料表,如下:

-- MySQL
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編号',
  `consignee` varchar(100) DEFAULT NULL COMMENT '收貨人',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人電話',
  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '總金額',
  `order_status` varchar(20) DEFAULT NULL COMMENT '訂單狀态,1表示下單,2表示支付',
  `user_id` bigint(20) DEFAULT NULL COMMENT '使用者id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送貨位址',
  `order_comment` varchar(200) DEFAULT NULL COMMENT '訂單備注',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '訂單交易編号(第三方支付用)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT '訂單描述(第三方支付用)',
  `create_time` datetime DEFAULT NULL COMMENT '建立時間',
  `operate_time` datetime DEFAULT NULL COMMENT '操作時間',
  `expire_time` datetime DEFAULT NULL COMMENT '失效時間',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流單編号',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父訂單編号',
  `img_url` varchar(200) DEFAULT NULL COMMENT '圖檔路徑',
  `province_id` int(20) DEFAULT NULL COMMENT '地區',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單表';
-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info` 
VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
INSERT INTO `order_info`
VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
INSERT INTO `order_info`
VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);

/*Table structure for table `order_detail` */
CREATE TABLE `order_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編号',
  `order_id` bigint(20) DEFAULT NULL COMMENT '訂單編号',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱(備援)',
  `img_url` varchar(200) DEFAULT NULL COMMENT '圖檔名稱(備援)',
  `order_price` decimal(10,2) DEFAULT NULL COMMENT '購買價格(下單時sku價格)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT '購買個數',
  `create_time` datetime DEFAULT NULL COMMENT '建立時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單明細表';

-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` 
VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
INSERT INTO `order_detail` 
VALUES (1330, 477, 9, '榮耀10 GT遊戲加速 AIS手持夜景 6GB+64GB 幻影藍全網通 移動聯通電信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
INSERT INTO `order_detail`
VALUES (1331, 478, 4, '小米Play 流光漸變AI雙攝 4GB+64GB 夢幻藍 全網通4G 雙卡雙待 小水滴全面屏拍照遊戲智能手機', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');
           

Flink SQL Cli建立CDC資料源

啟動 Flink 叢集,再啟動 SQL CLI,執行下面指令:

-- 建立訂單資訊表
CREATE TABLE order_info(
    id BIGINT,
    user_id BIGINT,
    create_time TIMESTAMP(0),
    operate_time TIMESTAMP(0),
    province_id INT,
    order_status STRING,
    total_amount DECIMAL(10, 5)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_info'
);
           

在Flink SQL Cli中查詢該表的資料:result-mode: tableau,+表示資料的insert

Flink1.11中的CDC Connectors操作實踐

在SQL CLI中建立訂單詳情表:

CREATE TABLE order_detail(
    id BIGINT,
    order_id BIGINT,
    sku_id BIGINT,
    sku_name STRING,
    sku_num BIGINT,
    order_price DECIMAL(10, 5),
 create_time TIMESTAMP(0)
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_detail'
);
           

查詢結果如下:

Flink1.11中的CDC Connectors操作實踐

執行JOIN操作:

SELECT
    od.id,
    oi.id order_id,
    oi.user_id,
    oi.province_id,
    od.sku_id,
    od.sku_name,
    od.sku_num,
    od.order_price,
    oi.create_time,
    oi.operate_time
FROM
   (
    SELECT * 
    FROM order_info
    WHERE 
      order_status = '2'-- 已支付
   ) oi
   JOIN
  (
    SELECT *
    FROM order_detail
  ) od 
  ON oi.id = od.order_id;
           

關于cannal的使用方式,可以參考我的另一篇文章:基于Canal與Flink實作資料實時增量同步(一)。我已經将下面的表通過canal同步到了kafka,具體格式為:

{
    "data":[
        {
            "id":"1",
            "region_name":"華北"
        },
        {
            "id":"2",
            "region_name":"華東"
        },
        {
            "id":"3",
            "region_name":"東北"
        },
        {
            "id":"4",
            "region_name":"華中"
        },
        {
            "id":"5",
            "region_name":"華南"
        },
        {
            "id":"6",
            "region_name":"西南"
        },
        {
            "id":"7",
            "region_name":"西北"
        }
    ],
    "database":"mydw",
    "es":1597128441000,
    "id":102,
    "isDdl":false,
    "mysqlType":{
        "id":"varchar(20)",
        "region_name":"varchar(20)"
    },
    "old":null,
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "id":12,
        "region_name":12
    },
    "table":"base_region",
    "ts":1597128441424,
    "type":"INSERT"
}
           

在SQL CLI中建立該canal-json格式的表:

CREATE TABLE region (
  id BIGINT,
  region_name STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
);
           
Flink1.11中的CDC Connectors操作實踐

建立MySQL資料源

參見上面的order_info

Flink SQL Cli建立changelog-json表

CREATE TABLE order_gmv2kafka (
  day_str STRING,
  gmv DECIMAL(10, 5)
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_gmv_kafka',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

INSERT INTO order_gmv2kafka
SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv
FROM order_info
WHERE order_status = '2' -- 訂單已支付
GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd'); 
           

查詢表看一下結果:

Flink1.11中的CDC Connectors操作實踐

再查一下kafka的資料:

{"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}
           

當将另外兩個訂單的狀态order_status更新為2時,

總金額=443+772+88=1293

再觀察資料:

Flink1.11中的CDC Connectors操作實踐

總結

繼續閱讀