天天看點

【Flink】第四篇:【迷思】對update語義拆解D-、I+後造成update原子性丢失

緣起

Flink Table/ SQL 對changelog語義的解讀:

【Flink】第四篇:【迷思】對update語義拆解D-、I+後造成update原子性丢失

例如,在canal-json中,CanalJsonSerializationSchema#rowKind2String

private StringData rowKind2String(RowKind rowKind) {
    switch (rowKind) {
        case INSERT:
        case UPDATE_AFTER:
            return OP_INSERT;
        case UPDATE_BEFORE:
        case DELETE:
            return OP_DELETE;
        default:
            throw new UnsupportedOperationException(
                    "Unsupported operation '" + rowKind + "' for row kind.");
    }
}           

在我們建實時數倉中,一般采取ODS->DWD->DWS的分層架構,如下圖,

【Flink】第四篇:【迷思】對update語義拆解D-、I+後造成update原子性丢失

這麼一來,在ODS端接入的changelog下發後,U語義将會被分解為先Delete(D-)這條資料,然後再Insert(I+)更新後的資料下發到kafka + canal-json,那麼下遊如果要将這組更新語義的資料存入。

迷思:例如MySQL,按照猜想的邏輯來說,在資料庫端将會有一個空隙,就是先執行完Delete到執行Insert前,在極限情況下這時候來了一條查詢此條資料的請求,資料庫将會傳回查詢不到。顯然,這種情況是錯誤的。

問題的本質:Update的語義本質上包含了兩個基本操作:

  • 先找到這條舊資料所在的存儲位置(Compare old value);
  • 再用新資料覆寫舊資料(Swap new value)。

這就需要存儲媒體執行Update操作時具備原子性。這種更新資料的原子性不是天然具備的,為此很多軟體都付出了額外的代價,例如,在Java中有CAS(Compare And Swap):Unsafe#compareAndSwap可以完成原子性的去修改Java Heap上的一個值。

驗證猜想

  1. 驗證思路:

程式1:source1->kafka+ogg-json,sink1->kafka+canal-json

程式2:source2->kafka+canal-json,sink2->mysql

source1接收一條Update資料,經過flink分解為兩條D-、I+下發到sink1;source2接收sink1的D-、I+,sink到MySQL。然後我們打開MySQL的執行記錄檔,分析一下MySQL執行的Flink下發的SQL原語是什麼。

  1. 測試過程

source1 ddl:

CREATE TABLE source (
    appl_seq STRING,
    op_ts TIMESTAMP(3),
    state STRING,
    amount BIGINT,
    PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'format-oggcanal-source-01',
    'properties.bootstrap.servers' = '...',
    'properties.group.id' = 'test-01',
    'scan.startup.mode' = 'latest-offset',
    'value.format' = 'ogg-json'
)           

sink1 ddl:

CREATE TABLE sink (
    appl_seq STRING,
    op_ts TIMESTAMP(3),
    state STRING,
    amount BIGINT,
    PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'format-oggcanal-sink-01',
    'properties.bootstrap.servers' = '...',
    'value.format' = 'canal-json'
)           

source2 ddl:

CREATE TABLE source (
    appl_seq STRING,
    op_ts TIMESTAMP(3),
    state STRING,
    amount BIGINT,
    PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'format-oggcanal-sink-01',
    'properties.bootstrap.servers' = '...',
    'properties.group.id' = 'test-01',
    'scan.startup.mode' = 'latest-offset',
    'value.format' = 'canal-json'
)           

sink2 ddl:

CREATE TABLE sink (
    appl_seq STRING,
    op_ts TIMESTAMP(3),
    state STRING,
    amount BIGINT,
    PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = '...',
    'table-name' = 'test_canal',
    'username' = '...',
    'password' = '...'
)           

source1 測試資料,INSERT、UPDATE、DELETE:

{
  "after": {
    "appl_seq": "1",
    "op_ts": "2021-01-01 00:00:00.000000""state": "01",
    "amount": 1
  },
  "before": {
  },
  "current_ts": "2021-01-01 00:00:00.000000
    "op_ts": "2021-01-0100: 00: 00.000000",
    "op_type": "I",
    "pos": "1",
    "primary_keys": [
        "appl_seq"
    ],
    "table": "ODS.APPL"
}

{
  "after": {
    "appl_seq": "1",
    "op_ts": "2021-01-01 00:00:00.000000",
    "state": "01",
    "amount": 10000
  },
  "before": {
    "appl_seq": "1",
    "op_ts": "2021-01-01 00:00:00.000000",
    "state": "01",
    "amount": 1
  },
  "current_ts": "2021-01-01 00:00:00.000000",
  "op_ts": "2021-01-01 00:00:00.000000",
  "op_type": "U",
  "pos": "1",
  "primary_keys": ["appl_seq"],
  "table": "ODS.APPL"
}

{
  "after": {
  },
  "before": {
    "appl_seq": "1",
    "op_ts": "2021-01-01 00:00:00.000000",
    "state": "01",
    "amount": 10000
  },
  "current_ts": "2021-01-01 00:00:00.000000",
  "op_ts": "2021-01-01 00:00:00.000000",
  "op_type": "D",
  "pos": "1",
  "primary_keys": ["appl_seq"],
  "table": "ODS.APPL"
}           

sink1和sink2的canal-json資料:

INSERT:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":1}],"type":"INSERT"}
UPDATE:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":1}],"type":"DELETE"}
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":10000}],"type":"INSERT"}
DELETE:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":10000}],"type":"DELETE"}           

MySQL日志打開方式:

-- 檢視日志是否開啟,以及日志路徑
SHOW VARIABLES LIKE "general_log%";

-- 開啟日志路徑
SET GLOBAL general_log = 'ON'
-- 設定日志路徑
SET GLOBAL general_log_file = '/data/apps/job-flink/mysql_general_log.log';           

在mysql_general_log.log檔案中查找關鍵字定位SQL操作,結果如下:

-- I+ 插入:
INSERT INTO `test_canal`(`appl_seq`, `op_ts`, `state`, `amount`) VALUES ('1', '2021-01-01 00:00:00', '01', 1) ON DUPLICATE KEY UPDATE
`appl_seq`=VALUES(`appl_seq`), `op_ts`=VALUES(`op_ts`), `state`=VALUES(`state`), `amount`=VALUES(`amount`)

-- U 更新:
INSERT INTO `test_canal`(`appl_seq`, `op_ts`, `state`, `amount`) VALUES ('1', '2021-01-01 00:00:00', '01', 10000) ON DUPLICATE KEY UPDATE
`appl_seq`=VALUES(`appl_seq`), `op_ts`=VALUES(`op_ts`), `state`=VALUES(`state`), `amount`=VALUES(`amount`)

-- D- 删除:
DELETE FROM `test_canal` WHERE `appl_seq` = '1'           

這裡發現Flink用ON DUPLICATE KEY完成了Upsert操作,而不是像之前猜想的那樣先執行Update拆解後的兩條canal-json:D- -> I+ !

那麼問題就變成了Flink是怎麼做到能将本來我們認為會出現熵增(資訊論中的資訊損耗)的變換過程:Update -> D- 、 I+做到複原的:D- 、 I+ -> Update?

下一步就要從源碼,嘗試解答我心中的迷思。

源碼分析

jdbc-connector源碼包:

【Flink】第四篇:【迷思】對update語義拆解D-、I+後造成update原子性丢失

Flink将邏輯包裝成jdbc SQL執行必然會生成statement執行個體;在statement包中我們看到隻有簡單的三個類:

【Flink】第四篇:【迷思】對update語義拆解D-、I+後造成update原子性丢失

嘗試找到誰去使用這些類,那麼必然是距離封裝SQL邏輯最近的線索:

【Flink】第四篇:【迷思】對update語義拆解D-、I+後造成update原子性丢失

沒錯,就是:JdbcBatchingOutputFormat.java,這個類裡醒目的三個方法:

【Flink】第四篇:【迷思】對update語義拆解D-、I+後造成update原子性丢失

open:

/**
 * Connects to the target database and initializes the prepared statement.
 *
 * @param taskNumber The number of the parallel instance.
 */
@Override
public void open(int taskNumber, int numTasks) throws IOException {
    super.open(taskNumber, numTasks);
    jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
    if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
        this.scheduler =
                Executors.newScheduledThreadPool(
                        1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
        this.scheduledFuture =
                this.scheduler.scheduleWithFixedDelay(
                        () -> {
                            synchronized (JdbcBatchingOutputFormat.this) {
                                if (!closed) {
                                    try {
                                        flush();
                                    } catch (Exception e) {
                                        flushException = e;
                                    }
                                }
                            }
                        },
                        executionOptions.getBatchIntervalMs(),
                        executionOptions.getBatchIntervalMs(),
                        TimeUnit.MILLISECONDS);
    }
}           

分析:用scheduler線程池周期排程flush();線程池中隻有一個線程,提供保序。flush();中主要調用了attemptFlush,而attemptFlush中隻有簡單的一行代碼,調用了JdbcBatchStatementExecutor #executeBatch,至于具體調用了哪個JdbcBatchStatementExecutor實作類的executeBatch,可以程式斷點調式或者找到所有實作子類分析,在TableBufferReducedStatementExecutor實作類的注釋:

/**
 * Currently, this statement executor is only used for table/sql to buffer insert/update/delete
 * events, and reduce them in buffer before submit to external database.
 */           

顯然,我們的程式應該是調用了這個類的executeBatch

【Flink】第四篇:【迷思】對update語義拆解D-、I+後造成update原子性丢失

其中,executeBatch:

@Override
public void executeBatch() throws SQLException {
    for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
        if (entry.getValue().f0) {
            upsertExecutor.addToBatch(entry.getValue().f1);
        } else {
            // delete by key
            deleteExecutor.addToBatch(entry.getKey());
        }
    }
    upsertExecutor.executeBatch();
    deleteExecutor.executeBatch();
    reduceBuffer.clear();
}           

分析:reduceBuffer是一個緩存線程池定時任務間隔之間到來的sink任務的邏輯,并且是一個Map

// the mapping is [KEY, <+/-, VALUE>]
private final Map<RowData, Tuple2<Boolean, RowData>> reduceBuffer = new HashMap<>();           

按照主鍵KEY進行reduce合并,是以D-會被I+覆寫掉,最終通過ON DUPLICATE KEY 執行I+,這也就是解釋了為什麼Flink下遊可以看似無法反向複原的解析過程:Update -> D-、I+,并且解決了原子性問題。