緣起
Flink Table/ SQL 對changelog語義的解讀:

例如,在canal-json中,CanalJsonSerializationSchema#rowKind2String
private StringData rowKind2String(RowKind rowKind) {
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
return OP_INSERT;
case UPDATE_BEFORE:
case DELETE:
return OP_DELETE;
default:
throw new UnsupportedOperationException(
"Unsupported operation '" + rowKind + "' for row kind.");
}
}
在我們建實時數倉中,一般采取ODS->DWD->DWS的分層架構,如下圖,
這麼一來,在ODS端接入的changelog下發後,U語義将會被分解為先Delete(D-)這條資料,然後再Insert(I+)更新後的資料下發到kafka + canal-json,那麼下遊如果要将這組更新語義的資料存入。
迷思:例如MySQL,按照猜想的邏輯來說,在資料庫端将會有一個空隙,就是先執行完Delete到執行Insert前,在極限情況下這時候來了一條查詢此條資料的請求,資料庫将會傳回查詢不到。顯然,這種情況是錯誤的。
問題的本質:Update的語義本質上包含了兩個基本操作:
- 先找到這條舊資料所在的存儲位置(Compare old value);
- 再用新資料覆寫舊資料(Swap new value)。
這就需要存儲媒體執行Update操作時具備原子性。這種更新資料的原子性不是天然具備的,為此很多軟體都付出了額外的代價,例如,在Java中有CAS(Compare And Swap):Unsafe#compareAndSwap可以完成原子性的去修改Java Heap上的一個值。
驗證猜想
- 驗證思路:
程式1:source1->kafka+ogg-json,sink1->kafka+canal-json
程式2:source2->kafka+canal-json,sink2->mysql
source1接收一條Update資料,經過flink分解為兩條D-、I+下發到sink1;source2接收sink1的D-、I+,sink到MySQL。然後我們打開MySQL的執行記錄檔,分析一下MySQL執行的Flink下發的SQL原語是什麼。
- 測試過程
source1 ddl:
CREATE TABLE source (
appl_seq STRING,
op_ts TIMESTAMP(3),
state STRING,
amount BIGINT,
PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'format-oggcanal-source-01',
'properties.bootstrap.servers' = '...',
'properties.group.id' = 'test-01',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'ogg-json'
)
sink1 ddl:
CREATE TABLE sink (
appl_seq STRING,
op_ts TIMESTAMP(3),
state STRING,
amount BIGINT,
PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'format-oggcanal-sink-01',
'properties.bootstrap.servers' = '...',
'value.format' = 'canal-json'
)
source2 ddl:
CREATE TABLE source (
appl_seq STRING,
op_ts TIMESTAMP(3),
state STRING,
amount BIGINT,
PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'format-oggcanal-sink-01',
'properties.bootstrap.servers' = '...',
'properties.group.id' = 'test-01',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'canal-json'
)
sink2 ddl:
CREATE TABLE sink (
appl_seq STRING,
op_ts TIMESTAMP(3),
state STRING,
amount BIGINT,
PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = '...',
'table-name' = 'test_canal',
'username' = '...',
'password' = '...'
)
source1 測試資料,INSERT、UPDATE、DELETE:
{
"after": {
"appl_seq": "1",
"op_ts": "2021-01-01 00:00:00.000000""state": "01",
"amount": 1
},
"before": {
},
"current_ts": "2021-01-01 00:00:00.000000
"op_ts": "2021-01-0100: 00: 00.000000",
"op_type": "I",
"pos": "1",
"primary_keys": [
"appl_seq"
],
"table": "ODS.APPL"
}
{
"after": {
"appl_seq": "1",
"op_ts": "2021-01-01 00:00:00.000000",
"state": "01",
"amount": 10000
},
"before": {
"appl_seq": "1",
"op_ts": "2021-01-01 00:00:00.000000",
"state": "01",
"amount": 1
},
"current_ts": "2021-01-01 00:00:00.000000",
"op_ts": "2021-01-01 00:00:00.000000",
"op_type": "U",
"pos": "1",
"primary_keys": ["appl_seq"],
"table": "ODS.APPL"
}
{
"after": {
},
"before": {
"appl_seq": "1",
"op_ts": "2021-01-01 00:00:00.000000",
"state": "01",
"amount": 10000
},
"current_ts": "2021-01-01 00:00:00.000000",
"op_ts": "2021-01-01 00:00:00.000000",
"op_type": "D",
"pos": "1",
"primary_keys": ["appl_seq"],
"table": "ODS.APPL"
}
sink1和sink2的canal-json資料:
INSERT:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":1}],"type":"INSERT"}
UPDATE:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":1}],"type":"DELETE"}
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":10000}],"type":"INSERT"}
DELETE:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":10000}],"type":"DELETE"}
MySQL日志打開方式:
-- 檢視日志是否開啟,以及日志路徑
SHOW VARIABLES LIKE "general_log%";
-- 開啟日志路徑
SET GLOBAL general_log = 'ON'
-- 設定日志路徑
SET GLOBAL general_log_file = '/data/apps/job-flink/mysql_general_log.log';
在mysql_general_log.log檔案中查找關鍵字定位SQL操作,結果如下:
-- I+ 插入:
INSERT INTO `test_canal`(`appl_seq`, `op_ts`, `state`, `amount`) VALUES ('1', '2021-01-01 00:00:00', '01', 1) ON DUPLICATE KEY UPDATE
`appl_seq`=VALUES(`appl_seq`), `op_ts`=VALUES(`op_ts`), `state`=VALUES(`state`), `amount`=VALUES(`amount`)
-- U 更新:
INSERT INTO `test_canal`(`appl_seq`, `op_ts`, `state`, `amount`) VALUES ('1', '2021-01-01 00:00:00', '01', 10000) ON DUPLICATE KEY UPDATE
`appl_seq`=VALUES(`appl_seq`), `op_ts`=VALUES(`op_ts`), `state`=VALUES(`state`), `amount`=VALUES(`amount`)
-- D- 删除:
DELETE FROM `test_canal` WHERE `appl_seq` = '1'
這裡發現Flink用ON DUPLICATE KEY完成了Upsert操作,而不是像之前猜想的那樣先執行Update拆解後的兩條canal-json:D- -> I+ !
那麼問題就變成了Flink是怎麼做到能将本來我們認為會出現熵增(資訊論中的資訊損耗)的變換過程:Update -> D- 、 I+做到複原的:D- 、 I+ -> Update?
下一步就要從源碼,嘗試解答我心中的迷思。
源碼分析
jdbc-connector源碼包:
Flink将邏輯包裝成jdbc SQL執行必然會生成statement執行個體;在statement包中我們看到隻有簡單的三個類:
嘗試找到誰去使用這些類,那麼必然是距離封裝SQL邏輯最近的線索:
沒錯,就是:JdbcBatchingOutputFormat.java,這個類裡醒目的三個方法:
open:
/**
* Connects to the target database and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (JdbcBatchingOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
},
executionOptions.getBatchIntervalMs(),
executionOptions.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
}
}
分析:用scheduler線程池周期排程flush();線程池中隻有一個線程,提供保序。flush();中主要調用了attemptFlush,而attemptFlush中隻有簡單的一行代碼,調用了JdbcBatchStatementExecutor #executeBatch,至于具體調用了哪個JdbcBatchStatementExecutor實作類的executeBatch,可以程式斷點調式或者找到所有實作子類分析,在TableBufferReducedStatementExecutor實作類的注釋:
/**
* Currently, this statement executor is only used for table/sql to buffer insert/update/delete
* events, and reduce them in buffer before submit to external database.
*/
顯然,我們的程式應該是調用了這個類的executeBatch
其中,executeBatch:
@Override
public void executeBatch() throws SQLException {
for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
if (entry.getValue().f0) {
upsertExecutor.addToBatch(entry.getValue().f1);
} else {
// delete by key
deleteExecutor.addToBatch(entry.getKey());
}
}
upsertExecutor.executeBatch();
deleteExecutor.executeBatch();
reduceBuffer.clear();
}
分析:reduceBuffer是一個緩存線程池定時任務間隔之間到來的sink任務的邏輯,并且是一個Map
// the mapping is [KEY, <+/-, VALUE>]
private final Map<RowData, Tuple2<Boolean, RowData>> reduceBuffer = new HashMap<>();
按照主鍵KEY進行reduce合并,是以D-會被I+覆寫掉,最終通過ON DUPLICATE KEY 執行I+,這也就是解釋了為什麼Flink下遊可以看似無法反向複原的解析過程:Update -> D-、I+,并且解決了原子性問題。