天天看點

深入解讀flink sql cdc的使用以及源碼分析

  • 前言
  • flink消費cdc資料
    • canal format
    • debezium format
    • CanalJson反序列化源碼解析
  • flink cdc connector
    • 背景
    • mysql-cdc
    • mysql-cdc connector源碼解析
  • changelog format
    • 使用場景
    • 示例
    • 源碼淺析

CDC,Change Data Capture,變更資料擷取的簡稱,使用CDC我們可以從資料庫中擷取已送出的更改并将這些更改發送到下遊,供下遊使用。這些變更可以包括INSERT,DELETE,UPDATE等.

使用者可以在如下的場景使用cdc:

  • 實時資料同步:比如我們将mysql庫中的資料同步到我們的數倉中。
  • 資料庫的實時物化視圖。

在以前的資料同步中,比如我們想實時擷取資料庫的資料,一般采用的架構就是采用第三方工具,比如canal、debezium等,實時采集資料庫的變更日志,然後将資料發送到kafka等消息隊列。然後再通過其他的元件,比如flink、spark等等來消費kafka的資料,計算之後發送到下遊系統。整體的架構如下所示:

深入解讀flink sql cdc的使用以及源碼分析

對于上面的這種架構,flink承擔的角色是計算層,目前flink提供的format有兩種格式:canal-json和debezium-json,下面我們簡單的介紹下。

在國内,用的比較多的是阿裡巴巴開源的canal,我們可以使用canal訂閱mysql的binlog日志,canal會将mysql庫的變更資料組織成它固定的JSON或protobuf 格式發到kafka,以供下遊使用。

canal解析後的json資料格式如下:

{
"data": [
    {
"id": "111",
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": "5.18"
    }
  ],
"database": "inventory",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
  },
"old": [
    {
"weight": "5.15"
    }
  ],
"pkNames": [
"id"
  ],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
  },
"table": "products",
"ts": 1589373560798,
"type": "UPDATE"
}      

簡單講下幾個核心的字段:

  • type : 描述操作的類型,包括‘UPDATE’, 'INSERT', 'DELETE'。
  • data : 代表操作的資料。如果為'INSERT',則表示行的内容;如果為'UPDATE',則表示行的更新後的狀态;如果為'DELETE',則表示删除前的狀态。
  • old :可選字段,如果存在,則表示更新之前的内容,如果不是update操作,則為 null。

完整的語義如下;

private String                    destination;                            // 對應canal的執行個體或者MQ的topic
private String                    groupId;                                // 對應mq的group id
private String                    database;                               // 資料庫或schema
private String                    table;                                  // 表名
private List<String>              pkNames;
private Boolean                   isDdl;
private String                    type;                                   // 類型: INSERT UPDATE DELETE
// binlog executeTime
private Long                      es;                                     // 執行耗時
// dml build timeStamp
private Long                      ts;                                     // 同步時間
private String                    sql;                                    // 執行的sql, dml sql為空
private List<Map<String, Object>> data;                                   // 資料清單
private List<Map<String, Object>> old;                                    // 舊資料清單, 用于update, size和data的size一一對應      

在flink sql中,消費這個資料的sql如下:

CREATE TABLE topic_products (
id BIGINT,
name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json'  -- using canal-json as the format
)

      

其中DDL中的表的字段和類型要和mysql中的字段及類型能比對的上,接下來我們就可以寫flink sql來查詢我們定義的topic_products了。

在國外,比較有名的類似canal的開源工具有debezium,它的功能較canal更加強大一些,不僅僅支援mysql。還支援其他的資料庫的同步,比如 PostgreSQL、Oracle等,目前debezium支援的序列化格式為 JSON 和 Apache Avro 。

debezium提供的格式如下:

{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
 },
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
 },
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}      

同樣,使用flink sql來消費的時候,sql和上面使用canal類似,隻需要把foramt改成debezium-json即可。

接下來我們看下flink的源碼中canal-json格式的實作。canal 格式作為一種flink的格式,而且是source,是以也就是涉及到讀取資料的時候進行反序列化,我們接下來就簡單看看CanalJson的反序列化的實作。具體的實作類是CanalJsonDeserializationSchema。

我們看下這個最核心的反序列化方法:

@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
		try {
		    //使用json反序列化器将message反序列化成RowData
			RowData row = jsonDeserializer.deserialize(message);

			//擷取type字段,用于下面的判斷
			String type = row.getString(2).toString();
if (OP_INSERT.equals(type)) {
				// 如果操作類型是insert,則data數組表示的是要插入的資料,則循環周遊data,然後添加一個辨別INSERT,構造RowData對象,發送下遊。
				ArrayData data = row.getArray(0);
for (int i = 0; i < data.size(); i++) {
					RowData insert = data.getRow(i, fieldCount);
insert.setRowKind(RowKind.INSERT);
out.collect(insert);
				}
			} else if (OP_UPDATE.equals(type)) {
				// 如果是update操作,從data字段裡擷取更新後的資料、
				ArrayData data = row.getArray(0);
				// old字段擷取更新之前的資料
				ArrayData old = row.getArray(1);
for (int i = 0; i < data.size(); i++) {
					// the underlying JSON deserialization schema always produce GenericRowData.
					GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
					GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
for (int f = 0; f < fieldCount; f++) {
if (before.isNullAt(f)) {
							//如果old字段非空,則說明進行了資料的更新,如果old字段是null,則說明更新前後資料一樣,這個時候把before的資料也設定成after的,也就是發送給下遊的before和after資料一樣。
before.setField(f, after.getField(f));
						}
					}
before.setRowKind(RowKind.UPDATE_BEFORE);
after.setRowKind(RowKind.UPDATE_AFTER);
					//把更新前後的資料都發送下遊
out.collect(before);
out.collect(after);
				}
			} else if (OP_DELETE.equals(type)) {
				// 如果是删除操作,data字段裡包含将要被删除的資料,把這些資料組織起來發送給下遊
				ArrayData data = row.getArray(0);
for (int i = 0; i < data.size(); i++) {
					RowData insert = data.getRow(i, fieldCount);
insert.setRowKind(RowKind.DELETE);
out.collect(insert);
				}
			} else {
if (!ignoreParseErrors) {
					throw new IOException(format(
						"Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message)));
				}
			}
		} catch (Throwable t) {
			// a big try catch to protect the processing.
if (!ignoreParseErrors) {
				throw new IOException(format(
					"Corrupt Canal JSON message '%s'.", new String(message)), t);
			}
		}
	}

      

對于上面的架構,我們需要部署canal(debezium)+ kafka,然後flink再從kafka消費資料,這種架構下我們需要部署多個元件,并且資料也需要落地到kafka,有沒有更好的方案來精簡下這個流程呢?我們接下來講講flink提供的cdc connector。

這個connector并沒有包含在flink的代碼裡,具體的位址是在https://github.com/ververica/flink-cdc-connectors裡,詳情大家可以看下這裡面的内容。

這種架構下,flink直接消費資料庫的增量日志,替代了原來作為資料采集層的canal(debezium),然後直接進行計算,經過計算之後,将計算結果 發送到下遊。整體架構如下:

深入解讀flink sql cdc的使用以及源碼分析

使用這種架構是好處有:

  • 減少canal和kafka的維護成本,鍊路更短,延遲更低
  • flink提供了exactly once語義
  • 可以從指定position讀取
  • 去掉了kafka,減少了消息的存儲成本

目前flink支援兩種内置的connector,PostgreSQL和mysql,接下來我們以mysql為例簡單講講。

在使用之前,我們需要引入相應的pom,mysql的pom如下:

<dependency>
<groupId>com.alibaba.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.1.0</version>
</dependency>      

如果是sql用戶端使用,需要下載下傳 flink-sql-connector-mysql-cdc-1.1.0.jar 并且放到<FLINK_HOME>/lib/下面

連接配接mysql資料庫的示例sql如下:

CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
 description STRING,
 weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
)      

如果訂閱的是postgres資料庫,我們需要把connector替換成postgres-cdc,DDL中表的schema和資料庫一一對應。

更加詳細的配置參見:

https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector

接下來我們以mysql-cdc為例,看看源碼層級是怎麼實作的。既然作為一個sql的connector,那麼就首先會有一個對應的TableFactory,然後在工廠類裡面構造相應的source,最後将消費下來的資料轉成flink認識的RowData格式,發送到下遊。

我們按照這個思路來看看flink cdc源碼的實作。

在flink-connector-mysql-cdc module中,找到其對應的工廠類:MySQLTableSourceFactory,進入createDynamicTableSource(Context context)方法,在這個方法裡,使用從ddl中的屬性裡擷取的host、dbname等資訊構造了一個MySQLTableSource類。

MySQLTableSource

在MySQLTableSource#getScanRuntimeProvider方法裡,我們看到,首先構造了一個用于序列化的對象RowDataDebeziumDeserializeSchema,這個對象主要是用于将Debezium擷取的SourceRecord格式的資料轉化為flink認識的RowData對象。 我們看下RowDataDebeziumDeserializeSchem#deserialize方法,這裡的操作主要就是先判斷下進來的資料類型(insert 、update、delete),然後針對不同的類型(short、int等)分别進行轉換,

最後我們看到用于flink用于擷取資料庫變更日志的Source函數是DebeziumSourceFunction,且最終傳回的類型是RowData。

也就是說flink底層是采用了Debezium工具從mysql、postgres等資料庫中擷取的變更資料。

@SuppressWarnings("unchecked")
	@Override
	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
		RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
		TypeInformation<RowData> typeInfo = (TypeInformation<RowData>) scanContext.createTypeInformation(physicalSchema.toRowDataType());
		DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema(
			rowType,
			typeInfo,
((rowData, rowKind) -> {}),
serverTimeZone);
MySQLSource.Builder<RowData> builder = MySQLSource.<RowData>builder()
			.hostname(hostname)
			..........
DebeziumSourceFunction<RowData> sourceFunction = builder.build();

return SourceFunctionProvider.of(sourceFunction, false);
	}      

DebeziumSourceFunction

我們接下來看看DebeziumSourceFunction類

@PublicEvolving
public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
CheckpointedFunction,
ResultTypeQueryable<T> {
		    .............
		}      

我們看到DebeziumSourceFunction類繼承了RichSourceFunction,并且實作了CheckpointedFunction接口,也就是說這個類是flink的一個SourceFunction,會從源端(run方法)擷取資料,發送給下遊。此外這個類還實作了CheckpointedFunction接口,也就是會通過checkpoint的機制來保證exactly once語義。

接下來我們進入run方法,看看是如何擷取資料庫的變更資料的。

@Override
public void run(SourceContext<T> sourceContext) throws Exception {
...........................
// DO NOT include schema change, e.g. DDL
		properties.setProperty("include.schema.changes", "false");
...........................
//将所有的屬性資訊列印出來,以便排查。
// dump the properties
String propsString = properties.entrySet().stream()
			.map(t -> "\t" + t.getKey().toString() + " = " + t.getValue().toString() + "\n")
			.collect(Collectors.joining());
LOG.info("Debezium Properties:\n{}", propsString);

//用于具體的處理資料的邏輯
		this.debeziumConsumer = new DebeziumChangeConsumer<>(
			sourceContext,
			deserializer,
			restoredOffsetState == null, // DB snapshot phase if restore state is null
			this::reportError);

// create the engine with this configuration ...
		this.engine = DebeziumEngine.create(Connect.class)
			.using(properties)
			.notifying(debeziumConsumer)  // 資料發給上面的debeziumConsumer
			.using((success, message, error) -> {
if (!success && error != null) {
					this.reportError(error);
				}
			})
			.build();

if (!running) {
return;
		}

// run the engine asynchronously
		executor.execute(engine);

//循環判斷,當程式被打斷,或者有錯誤的時候,打斷engine,并且抛出異常
// on a clean exit, wait for the runner thread
		try {
while (running) {
if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
					break;
				}
if (error != null) {
					running = false;
					shutdownEngine();
// rethrow the error from Debezium consumer
					ExceptionUtils.rethrow(error);
				}
			}
		}
		catch (InterruptedException e) {
// may be the result of a wake-up interruption after an exception.
// we ignore this here and only restore the interruption state
Thread.currentThread().interrupt();
		}
	}

      

在函數的開始,設定了很多的properties,比如include.schema.changes 設定為false,也就是不包含表的DDL操作,表結構的變更是不捕獲的。我們這裡隻關注資料的增删改。

接下來構造了一個DebeziumChangeConsumer對象,這個類實作了DebeziumEngine.ChangeConsumer接口,主要就是将擷取到的一批資料進行一條條的加工處理。

接下來定一個DebeziumEngine對象,這個對象是真正用來幹活的,它的底層使用了kafka的connect-api來進行擷取資料,得到的是一個org.apache.kafka.connect.source.SourceRecord對象。通過notifying方法将得到的資料交給上面定義的DebeziumChangeConsumer來來覆寫預設實作以進行複雜的操作。

接下來通過一個線程池ExecutorService來異步的啟動這個engine。

最後,做了一個循環判斷,當程式被打斷,或者有錯誤的時候,打斷engine,并且抛出異常。

總結一下,就是在Flink的source函數裡,使用Debezium 引擎擷取對應的資料庫變更資料(SourceRecord),經過一系列的反序列化操作,最終轉成了flink中的RowData對象,發送給下遊。

當我們從mysql-cdc擷取資料庫的變更資料,或者寫了一個group by的查詢的時候,這種結果資料都是不斷變化的,我們如何将這些變化的資料發到隻支援append mode的kafka隊列呢?

于是flink提供了一種changelog format,其實我們非常簡單的了解為,flink對進來的RowData資料進行了一層包裝,然後加了一個資料的操作類型,包括以下幾種 INSERT,DELETE, UPDATE_BEFORE,UPDATE_AFTER。這樣當下遊擷取到這個資料的時候,就可以根據資料的類型來判斷下如何對資料進行操作了。

比如我們的原始資料格式是這樣的。

{"day":"2020-06-18","gmv":100}

      

經過changelog格式的加工之後,成為了下面的格式:

{"data":{"day":"2020-06-18","gmv":100},"op":"+I"}

      

也就是說changelog format對原生的格式進行了包裝,添加了一個op字段,表示資料的操作類型,目前有以下幾種:

  • +I:插入操作。
  • -U :更新之前的資料内容:
  • +U :更新之後的資料内容。
  • -D :删除操作。

使用的時候需要引入相應的pom

<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-format-changelog-json</artifactId>
<version>1.1.0</version>
</dependency>      

使用flink sql操作的方式如下:

CREATE TABLE kafka_gmv (
  day_str STRING,
  gmv DECIMAL(10, 5)
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_gmv',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'changelog-json'
);      

我們定義了一個 format 為 changelog-json 的kafka connector,之後我們就可以對其進行寫入和查詢了。

完整的代碼和配置請參考:

https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format

作為一種flink的format ,我們主要看下其序列化和發序列化方法,changelog-json 使用了flink-json包進行json的處理。

反序列化

反序列化用的是ChangelogJsonDeserializationSchema類,在其構造方法裡,我們看到主要是構造了一個json的序列化器jsonDeserializer用于對資料進行處理。

public ChangelogJsonDeserializationSchema(
			RowType rowType,
			TypeInformation<RowData> resultTypeInfo,
boolean ignoreParseErrors,
			TimestampFormat timestampFormatOption) {
this.resultTypeInfo = resultTypeInfo;
this.ignoreParseErrors = ignoreParseErrors;
this.jsonDeserializer = new JsonRowDataDeserializationSchema(
			createJsonRowType(fromLogicalToDataType(rowType)),
// the result type is never used, so it's fine to pass in Debezium's result type
			resultTypeInfo,
false, // ignoreParseErrors already contains the functionality of failOnMissingField
			ignoreParseErrors,
			timestampFormatOption);
	}      

其中createJsonRowType方法指定了changelog的format是一種Row類型的格式,我們看下代碼:

private static RowType createJsonRowType(DataType databaseSchema) {
		DataType payload = DataTypes.ROW(
			DataTypes.FIELD("data", databaseSchema),
			DataTypes.FIELD("op", DataTypes.STRING()));
return (RowType) payload.getLogicalType();
	}      

在這裡,指定了這個row格式有兩個字段,一個是data,表示資料的内容,一個是op,表示操作的類型。

最後看下最核心的ChangelogJsonDeserializationSchema#deserialize(byte[] bytes, Collector<RowData> out>)

@Override
public void deserialize(byte[] bytes, Collector<RowData> out) throws IOException {
try {
			GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(bytes);
			GenericRowData data = (GenericRowData) row.getField(0);
			String op = row.getString(1).toString();
			RowKind rowKind = parseRowKind(op);
			data.setRowKind(rowKind);
			out.collect(data);
		} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(format(
"Corrupt Debezium JSON message '%s'.", new String(bytes)), t);
			}
		}
	}      

序列化

@Override
	public byte[] serialize(RowData rowData) {
reuse.setField(0, rowData);
reuse.setField(1, stringifyRowKind(rowData.getRowKind()));
return jsonSerializer.serialize(reuse);
	}