天天看點

MySQL 的 Debezium 連接配接器-中文版MySQL 的 Debezium 連接配接器

MySQL 的 Debezium 連接配接器

MySQL 有一個二進制日志(binlog),它按照送出到資料庫的順序記錄所有操作。這包括對表模式的更改以及對表中資料的更改。MySQL 使用 binlog 進行複制和恢複。

Debezium MySQL 連接配接器讀取 binlog,為行級

INSERT

、、

UPDATE

DELETE

操作生成更改事件,并将更改事件發送到 Kafka 主題。用戶端應用程式讀取這些 Kafka 主題。

由于 MySQL 通常設定為在指定時間段後清除 binlog,是以 MySQL 連接配接器會對您的每個資料庫執行初始一緻快照。MySQL 連接配接器從建立快照的位置讀取 binlog。

有關與此連接配接器相容的 MySQL 資料庫版本的資訊,請參閱Debezium 版本概述。

連接配接器的工作原理

連接配接器支援的 MySQL 拓撲的概述對于規劃您的應用程式很有用。為了優化配置和運作 Debezium MySQL 連接配接器,了解連接配接器如何跟蹤表結構、公開模式更改、執行快照以及确定 Kafka 主題名稱會很有幫助。

Debezium MySQL 連接配接器尚未在 MariaDB 上進行測試,但來自社群的多份報告表明該連接配接器已成功用于該資料庫。計劃在未來的 Debezium 版本中提供對 MariaDB 的官方支援。

支援的 MySQL 拓撲

Debezium MySQL 連接配接器支援以下 MySQL 拓撲:

  • 獨立

    當使用單個 MySQL 伺服器時,伺服器必須啟用 binlog(并且可選地啟用 GTID),以便 Debezium MySQL 連接配接器可以監控伺服器。這通常是可以接受的,因為二進制日志也可以用作增量備份。在這種情況下,MySQL 連接配接器始終連接配接并跟随這個獨立的 MySQL 伺服器執行個體。

  • 主副本和副本

    Debezium MySQL 連接配接器可以跟随主伺服器之一或副本之一(如果該副本啟用了其 binlog),但連接配接器僅看到該伺服器可見的叢集中的更改。通常,除了多主拓撲之外,這不是問題。連接配接器将其位置記錄在伺服器的 binlog 中,這在叢集中的每台伺服器上都是不同的。是以,連接配接器必須隻跟随一個 MySQL 伺服器執行個體。如果該伺服器出現故障,則必須重新啟動或恢複該伺服器,然後連接配接器才能繼續。

  • 高可用叢集

    MySQL 存在多種高可用性解決方案,它們使容忍問題和故障并幾乎立即從問題和故障中恢複變得更加容易。大多數 HA MySQL 叢集使用 GTID,以便副本能夠跟蹤任何主伺服器上的所有更改。

  • 多主

    網絡資料庫 (NDB) 叢集複制使用一個或多個 MySQL 副本節點,每個節點從多個主伺服器複制。這是聚合多個 MySQL 叢集的複制的強大方法。此拓撲需要使用 GTID。Debezium MySQL 連接配接器可以使用這些多主 MySQL 副本作為源,并且隻要新副本趕上舊副本,就可以故障轉移到不同的多主 MySQL 副本。也就是說,新副本具有在第一個副本上看到的所有事務。即使連接配接器僅使用資料庫和/或表的一個子集,這也有效,因為可以将連接配接器配置為在嘗試重新連接配接到新的多主 MySQL 副本并在二進制日志。

  • 托管

    支援 Debezium MySQL 連接配接器以使用托管選項,例如 Amazon RDS 和 Amazon Aurora。因為這些托管選項不允許全局讀鎖,是以使用表級鎖來建立一緻快照。

架構曆史主題

當資料庫用戶端查詢資料庫時,用戶端使用資料庫的目前模式。但是,資料庫架構可以随時更改,這意味着連接配接器必須能夠識别每次插入、更新或删除操作被記錄時的架構。此外,連接配接器不能隻使用目前模式,因為連接配接器可能正在處理在更改表模式之前記錄的相對較舊的事件。

為了確定正确處理架構更改後發生的更改,MySQL 在 binlog 中不僅包括對資料的行級更改,還包括應用于資料庫的 DDL 語句。當連接配接器讀取 binlog 并遇到這些 DDL 語句時,它會解析它們并更新每個表模式的記憶體表示。連接配接器使用此模式表示來識别每次插入、更新或删除操作時的表結構,并産生适當的更改事件。在單獨的資料庫曆史 Kafka 主題中,連接配接器記錄所有 DDL 語句以及每個 DDL 語句出現在 binlog 中的位置。

當連接配接器在崩潰或優雅停止後重新啟動時,連接配接器會從特定位置,即從特定時間點開始讀取 binlog。連接配接器通過讀取資料庫曆史 Kafka 主題并解析所有 DDL 語句,直到連接配接器啟動的二進制日志中的點,來重建此時存在的表結構。

此資料庫曆史主題僅供連接配接器使用。連接配接器可以選擇将模式更改事件發送到針對消費者應用程式的不同主題。

當 MySQL 連接配接器捕獲應用了架構更改工具(例如

gh-ost

或)的表中的更改

pt-online-schema-change

時,會在遷移過程中建立輔助表。需要配置連接配接器以捕獲對這些幫助表的更改。如果消費者不需要為幫助表生成的記錄,則可以應用單個消息轉換将它們過濾掉。

檢視接收 Debezium 事件記錄的主題的預設名稱。

架構更改主題

您可以配置 Debezium MySQL 連接配接器以生成模式更改事件,這些事件描述應用于資料庫中捕獲的表的模式更改。連接配接器将架構更改事件寫入名為 的 Kafka 主題

*<serverName>*

,其中是連接配接器配置屬性

*serverName*

中指定的邏輯伺服器名稱。

database.server.name

連接配接器發送到架構更改主題的消息包含有效負載,并且(可選)還包含更改事件消息的架構。

架構更改事件消息的有效負載包括以下元素:

  • ddl

    提供導緻架構更改的 SQL

    CREATE

    ALTER

    或語句。

    DROP

  • databaseName

    應用 DDL 語句的資料庫的名稱。的值

    databaseName

    用作消息鍵。
  • pos

    語句出現在 binlog 中的位置。
  • tableChanges

    架構更改後整個表架構的結構化表示。該

    tableChanges

    字段包含一個數組,其中包含表中每一列的條目。由于結構化表示以 JSON 或 Avro 格式呈現資料,是以消費者可以輕松讀取消息,而無需先通過 DDL 解析器對其進行處理。
對于處于捕獲模式的表,連接配接器不僅将模式更改的曆史記錄存儲在模式更改主題中,還會存儲在内部資料庫曆史記錄主題中。内部資料庫曆史主題僅供連接配接器使用,不适合消費應用程式直接使用。確定需要有關架構更改通知的應用程式僅使用來自架構更改主題的資訊。
永遠不要對資料庫曆史主題進行分區。要使資料庫曆史主題正确運作,它必須保持連接配接器向其發出的事件記錄的一緻的全局順序。為確定主題不會在分區之間拆分,請使用以下方法之一設定主題的分區計數:如果您手動建立資料庫曆史主題,請将分區計數指定為

1

.如果您使用 Apache Kafka 代理自動建立資料庫曆史主題,則會建立主題,請将Kafka

num.partitions

配置選項的值設定為

1

.
連接配接器向其模式更改主題發出的消息格式處于孵化狀态,如有更改,恕不另行通知。

示例:發送到 MySQL 連接配接器架構更改主題的消息

以下示例顯示了 JSON 格式的典型架構更改消息。該消息包含表模式的邏輯表示。

{
  "schema": {
  ...
  },
  "payload": {
        "source": {  // (1)
        "version": "1.9.5.Final",
        "connector": "mysql",
        "name": "dbserver1",
        "ts_ms": 0,
        "snapshot": "false",
        "db": "inventory",
        "sequence": null,
        "table": "customers",
        "server_id": 0,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 219,
        "row": 0,
        "thread": null,
        "query": null
    },
    "databaseName": "inventory", // (2)
    "schemaName": null,
    "ddl": "ALTER TABLE customers ADD COLUMN middle_name VARCHAR(2000)", // (3)
    "tableChanges": [ // (4)
        {
        "type": "ALTER", // (5)
        "id": "\"inventory\".\"customers\"",  // (6)
        "table": { // (7)
            "defaultCharsetName": "latin1",
            "primaryKeyColumnNames": [  // (8)
                "id"
            ],
            "columns": [ // (9)
                {
                "name": "id",
                "jdbcType": 4,
                "nativeType": null,
                "typeName": "INT",
                "typeExpression": "INT",
                "charsetName": null,
                "length": 11,
                "scale": null,
                "position": 1,
                "optional": false,
                "autoIncremented": true,
                "generated": true
            },
            {
                "name": "first_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 2,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },                        {
                "name": "last_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 3,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },
            {
                "name": "email",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 4,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },
            {
                "name": "middle_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 2000,
                "scale": null,
                "position": 5,
                "optional": true,
                "autoIncremented": false,
                "generated": false
            }
          ]
        }
      }
    ]
  },
  "payload": {
    "databaseName": "inventory",
    "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;",
    "source" : {
      "version": "1.9.5.Final",
      "name": "mysql-server-1",
      "server_id": 0,
      "ts_ms": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": null,
      "table": null,
      "query": null
    }
  }
}
           
物品 字段名稱 描述
1

source

source

字段的結構與連接配接器寫入特定于表的主題的标準資料更改事件完全相同。此字段可用于關聯不同主題的事件。
2

databaseName

schemaName

辨別包含更改的資料庫和架構。該

databaseName

字段的值用作記錄的消息鍵。
3

ddl

此字段包含負責架構更改的 DDL。該

ddl

字段可以包含多個 DDL 語句。每個語句都适用于資料庫中的

databaseName

字段。多個 DDL 語句按照它們應用于資料庫的順序出現。 用戶端可以送出多個适用于多個資料庫的 DDL 語句。如果 MySQL 以原子方式應用它們,則連接配接器按順序擷取 DDL 語句,按資料庫對它們進行分組,并為每個組建立一個模式更改事件。如果 MySQL 單獨應用它們,連接配接器會為每個語句建立一個單獨的模式更改事件。
4

tableChanges

包含由 DDL 指令生成的架構更改的一個或多個項目的數組。
5

type

描述變化的種類。該值為以下之一:

CREATE

表已建立。

ALTER

表已修改。

DROP

表已删除。
6

id

建立、更改或删除的表的完整辨別符。在表重命名的情況下,此辨別符是表名的串聯。

*<old>*,*<new>*

7

table

表示應用更改後的表中繼資料。
8

primaryKeyColumnNames

組成表的主鍵的列的清單。
9

columns

已更改表中每一列的中繼資料。

另請參閱:模式曆史主題。

快照

首次啟動 Debezium MySQL 連接配接器時,它會執行資料庫的初始一緻快照。以下流程描述了連接配接器如何建立此快照。此流程适用于預設快照模式,即

initial

. 有關其他快照模式的資訊,請參閱MySQL 連接配接器

snapshot.mode

配置屬性。

行動
1 擷取阻止其他資料庫用戶端寫入的全局讀鎖。 快照本身不會阻止其他用戶端應用可能會幹擾連接配接器嘗試讀取 binlog 位置和表模式的 DDL。連接配接器在讀取 binlog 位置時保持全局讀鎖,并如後面的步驟所述釋放鎖。
2 啟動具有可重複讀取語義的事務,以確定事務中的所有後續讀取都針對一緻的快照完成。
3 讀取目前的 binlog 位置。
4 讀取連接配接器配置為捕獲更改的資料庫和表的架構。
5 釋放全局讀鎖。其他資料庫用戶端現在可以寫入資料庫。
6 如果适用,将 DDL 更改寫入架構更改主題,包括所有必要

DROP…

CREATE…

DDL 語句。
7 掃描資料庫表。對于每一行,連接配接器将

CREATE

事件發送到相關的特定于表的 Kafka 主題。
8 送出事務。
9 在連接配接器偏移中記錄完成的快照。
  • 連接配接器重新啟動

    如果連接配接器在執行初始快照時發生故障、停止或重新平衡,則在連接配接器重新啟動後,它會執行新的快照。在初始快照完成後,Debezium MySQL 連接配接器從 binlog 中的相同位置重新啟動,是以它不會錯過任何更新。如果連接配接器停止的時間足夠長,MySQL 可能會清除舊的二進制日志檔案,連接配接器的位置就會丢失。如果位置丢失,連接配接器将恢複為其起始位置的*初始快照。*有關對 Debezium MySQL 連接配接器進行故障排除的更多提示,請參閱出現問題時的行為。

  • 不允許全局讀鎖

    某些環境不允許全局讀鎖。如果 Debezium MySQL 連接配接器檢測到不允許全局讀鎖,則連接配接器使用表級鎖代替并使用此方法執行快照。這要求 Debezium 連接配接器的資料庫使用者具有

    LOCK TABLES

    權限。表 3. 使用表級鎖執行初始快照的工作流程步行動1擷取表級鎖。2啟動具有可重複讀取語義的事務,以確定事務中的所有後續讀取都針對一緻的快照完成。3讀取和過濾資料庫和表的名稱。4讀取目前的 binlog 位置。5讀取連接配接器配置為捕獲更改的資料庫和表的架構。6如果适用,将 DDL 更改寫入架構更改主題,包括所有必要

    DROP…

    CREATE…

    DDL 語句。7掃描資料庫表。對于每一行,連接配接器将

    CREATE

    事件發送到相關的特定于表的 Kafka 主題。8送出事務。9釋放表級鎖。10在連接配接器偏移中記錄完成的快照。

即席快照

預設情況下,連接配接器僅在首次啟動後才運作初始快照操作。在這個初始快照之後,在正常情況下,連接配接器不會重複快照過程。連接配接器捕獲的任何未來更改事件資料僅通過流式處理進入。

但是,在某些情況下,連接配接器在初始快照期間獲得的資料可能會變得陳舊、丢失或不完整。為了提供一種重新捕獲表資料的機制,Debezium 包含一個執行臨時快照的選項。資料庫中的以下更改可能會導緻執行臨時快照:

  • 修改連接配接器配置以捕獲一組不同的表。
  • Kafka 主題被删除,必須重建。
  • 由于配置錯誤或其他問題而發生資料損壞。

您可以通過啟動所謂的ad-hoc 快照為之前捕獲快照的表重新運作快照。即席快照需要使用信令表。您可以通過向 Debezium 信号表發送信号請求來啟動臨時快照。

當您啟動現有表的臨時快照時,連接配接器會将内容附加到表已存在的主題中。如果删除了以前存在的主題,如果啟用了自動主題建立,Debezium 可以自動建立主題。

即席快照信号指定要包含在快照中的表。快照可以捕獲資料庫的全部内容,或僅捕獲資料庫中表的子集。

execute-snapshot

您可以通過向信令表發送消息來指定要捕獲的表。将

execute-snapshot

信号的類型設定為

incremental

,并提供要包含在快照中的表的名稱,如下表所述:

場地 預設 價值

type

incremental

指定要運作的快照類型。 設定類型是可選的。目前,您隻能請求

incremental

快照。

data-collections

不适用 一個數組,其中包含要生成快照的表的完全限定名稱。名稱的格式與配置選項 的格式相同。

signal.data.collection

觸發臨時快照

execute-snapshot

您可以通過将具有信号類型的條目添加到信令表來啟動臨時快照。連接配接器處理完消息後,将開始快照操作。快照程序讀取第一個和最後一個主鍵值,并将這些值用作每個表的起點和終點。根據表中的條目數和配置的塊大小,Debezium 将表劃分為塊,并繼續對每個塊進行快照,一次一個。

目前,

execute-snapshot

操作類型僅觸發增量快照。有關詳細資訊,請參閱增量快照。

增量快照

為了提供管理快照的靈活性,Debezium 包含一個補充快照機制,稱為增量快照。增量快照依靠 Debezium 機制向 Debezium 連接配接器發送信号。增量快照基于DDD-3設計文檔。

在增量快照中,Debezium 不是像在初始快照中那樣一次捕獲資料庫的完整狀态,而是在一系列可配置的塊中分階段捕獲每個表。您可以指定您希望快照捕獲的表和每個塊的大小。塊大小決定了快照在資料庫上的每次提取操作期間收集的行數。增量快照的預設塊大小為 1 KB。

随着增量快照的進行,Debezium 使用水印來跟蹤其進度,維護它捕獲的每個表行的記錄。與标準初始快照過程相比,這種分階段捕獲資料的方法具有以下優勢:

  • 您可以在流式資料捕獲的同時運作增量快照,而不是将流式傳輸推遲到快照完成。連接配接器在整個快照過程中繼續從更改日志中捕獲近乎實時的事件,并且兩個操作都不會阻塞另一個操作。
  • 如果增量快照的進度中斷,您可以恢複它而不會丢失任何資料。程序恢複後,快照從它停止的點開始,而不是從頭重新捕獲表。
  • 您可以随時按需運作增量快照,并根據需要重複該過程以适應資料庫更新。例如,您可以在修改連接配接器配置以将表添加到其

    table.include.list

    屬性後重新運作快照。

增量快照過程

當您運作增量快照時,Debezium 按主鍵對每個表進行排序,然後根據配置的塊大小将表拆分為塊。逐塊工作,然後捕獲塊中的每個表行。對于它捕獲的每一行,快照都會發出一個

READ

事件。該事件表示塊的快照開始時行的值。

随着快照的進行,其他程序可能會繼續通路資料庫,可能會修改表記錄。為反映此類更改,

INSERT

UPDATE

DELETE

操作将照常送出到事務日志。同樣,正在進行的 Debezium 流式處理繼續檢測這些更改事件并将相應的更改事件記錄發送到 Kafka。

Debezium 如何解決具有相同主鍵的記錄之間的沖突

在某些情況下,流式處理發出的

UPDATE

DELETE

事件被亂序接收。也就是說,流式處理可能會在快照捕獲包含該行的

READ

事件的塊之前發出一個修改表行的事件。當快照最終為該行發出相應的

READ

事件時,它的值已經被取代。為了確定以正确的邏輯順序處理亂序到達的增量快照事件,Debezium 采用了一種緩沖方案來解決沖突。隻有在解決了快照事件和流事件之間的沖突後,Debezium 才會向 Kafka 發出事件記錄。

快照視窗

為了幫助解決延遲到達事件和修改同一表行的流事件之間的沖突

READ

,Debezium 采用了所謂的快照視窗。快照視窗劃分了增量快照捕獲指定表塊資料的時間間隔。在一個塊的快照視窗打開之前,Debezium 遵循其通常的行為并從事務日志直接向下遊發送事件到目标 Kafka 主題。但是從特定塊的快照打開的那一刻起,直到它關閉,Debezium 執行重複資料删除步驟以解決具有相同主鍵的事件之間的沖突。

對于每個資料集合,Debezium 發出兩種類型的事件,并将它們的記錄存儲在單個目标 Kafka 主題中。它直接從表中捕獲的快照記錄作為

READ

操作發出。同時,随着使用者不斷更新資料集合中的記錄,事務日志也更新以反映每次送出,Debezium 會針對每次更改發出

UPDATE

或操作。

DELETE

當快照視窗打開時,Debezium 開始處理快照塊,它将快照記錄傳遞到記憶體緩沖區。在快照視窗期間,

READ

緩沖區中事件的主鍵與傳入流事件的主鍵進行比較。如果未找到比對項,則将流式事件記錄直接發送到 Kafka。如果 Debezium 檢測到比對,它會丢棄緩沖的

READ

事件,并将流式記錄寫入目标主題,因為流式事件在邏輯上取代了靜态快照事件。塊的快照視窗關閉後,緩沖區僅包含

READ

不存在相關事務日志事件的事件。Debezium 将這些剩餘

READ

事件發送到表的 Kafka 主題。

連接配接器對每個快照塊重複該過程。

觸發增量快照

目前,啟動增量快照的唯一方法是将臨時快照信号發送到源資料庫上的信令表。

INSERT

您将信号作為 SQL查詢送出給表。Debezium 檢測到信号表中的變化後,它會讀取信号,并運作請求的快照操作。

您送出的查詢指定要包含在快照中的表,并且可以選擇指定快照操作的類型。目前,快照操作的唯一有效選項是預設值

incremental

.

要指定要包含在快照中的表,請提供一個

data-collections

列出這些表的數組,例如,

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的

data-collections

數組沒有預設值。如果

data-collections

數組為空,Debezium 檢測到不需要任何操作并且不執行快照。

如果要包含在快照中的表

.

的名稱在資料庫、模式或表的名稱中包含點 (),則要将表添加到

data-collections

數組中,您必須用雙引号對名稱的每個部分進行轉義. 例如,要包含存在于

**public**

架構中且名稱為 的表

**My.Table**

,請使用以下格式:

**"public"."My.Table"**

.

先決條件

  • 信令已啟用。
    • 源資料庫上存在信令資料集合,連接配接器配置為捕獲它。
    • 信令資料集合在

      signal.data.collection

      屬性中指定。

程式

  1. 發送 SQL 查詢以将臨時增量快照請求添加到信令表:

    例如,

    id

    指令中的、

    type

    和參數的值

    data

    對應于信令表的字段。

    下表描述了這些參數:

    價值 描述

    myschema.debezium_signal

    指定源資料庫上的信令表的完全限定名稱

    ad-hoc-1

    id

    參數指定一個任意字元串,該字元串被配置設定為

    id

    信号請求的辨別符。 使用此字元串将日志消息辨別到信令表中的條目。Debezium 不使用此字元串。相反,在快照期間,Debezium 會生成自己的

    id

    字元串作為水印信号。

    execute-snapshot

    指定

    type

    參數指定信号要觸發的操作。

    data-collections

    信号字段的必需元件,

    data

    它指定要包含在快照中的表名數組。 該數組按表的完全限定名稱列出表,使用的格式與您在

    signal.data.collection

    配置屬性中指定連接配接器信号表的名稱時使用的格式相同。

    incremental

    信号字段的可選

    type

    元件

    data

    ,指定要運作的快照操作的種類。 目前,唯一有效的選項是預設值

    incremental

    . 在您送出給信令表的 SQL 查詢中指定一個

    type

    值是可選的。 如果您未指定值,則連接配接器将運作增量快照。

以下示例顯示了連接配接器捕獲的增量快照事件的 JSON。

示例:增量快照事件消息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 
    },
    "op":"r", 
    "ts_ms":"1620393591654",
    "transaction":null
}
           
物品 字段名稱 描述
1

snapshot

指定要運作的快照操作的類型。 目前,唯一有效的選項是預設值

incremental

. 在您送出給信令表的 SQL 查詢中指定一個

type

值是可選的。 如果您未指定值,則連接配接器将運作增量快照。
2

op

指定事件類型。 快照事件的值為

r

,表示

READ

操作。

隻讀增量快照

MySQL 連接配接器允許使用與資料庫的隻讀連接配接來運作增量快照。要運作具有隻讀通路權限的增量快照,連接配接器使用已執行的全局事務 ID (GTID) 設定為高水位線和低水位線。通過将二進制日志 (binlog) 事件的 GTID 或伺服器的心跳與低水位線和高水位線進行比較來更新塊視窗的狀态。

要切換到隻讀實作,請将

read.only

屬性的值設定為

true

先決條件

  • 啟用 MySQL GTID。
  • 如果連接配接器從多線程副本(即,值

    replica_parallel_workers

    大于的副本 )讀取,則必須設定以下選項之一:
    • replica_preserve_commit_order=ON

    • slave_preserve_commit_order=ON

即席隻讀增量快照

當 MySQL 連接配接為隻讀時,信令表機制還可以通過向signal.kafka.topic屬性中指定的 Kafka 主題發送消息來運作快照。

Kafka 消息的鍵必須與

database.server.name

連接配接器配置選項的值比對。

該值是一個帶有

type

data

字段的 JSON 對象。

信号類型是

execute-snapshot

data

字段必須有以下字段:

場地 預設 價值

type

incremental

要執行的快照的類型。目前僅

incremental

支援。 有關詳細資訊,請參閱下一節。

data-collections

不适用 要快照的表的限定名稱數組。名稱的格式與signal.data.collection配置選項 的格式相同。

執行快照 Kafka 消息的示例:

鍵 = `test_connector`

值 = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
           

快照事件的操作類型

READ

MySQL 連接配接器将快照事件作為操作發出

("op" : "r")

。如果您希望連接配接器将快照事件作為

CREATE

(

c

) 事件發出,請配置 Debezium

ReadToInsertEvent

單消息轉換 (SMT) 以修改事件類型。

以下示例顯示了如何配置 SMT:

示例:使用

ReadToInsertEvent

SMT 更改快照事件的類型

轉換=快照插入,...
transforms.snapshotasinsert.type=io.debezium.connector.mysql.transforms.ReadToInsertEvent
           

主題名稱

預設情況下,MySQL 連接配接器将表中發生的所有 、 和 操作的更改事件寫入

INSERT

特定

UPDATE

DELETE

該表的單個 Apache Kafka 主題。

連接配接器使用以下約定來命名更改事件主題:

serverName.databaseName.tableName

假設

fulfillment

是伺服器名稱,

inventory

是資料庫名稱,并且資料庫包含名為

orders

customers

和的表

products

。Debezium MySQL 連接配接器向三個 Kafka 主題發出事件,每個主題對應一個資料庫中的表:

履行.庫存.訂單
履行.庫存.客戶
履行.庫存.産品
           

以下清單提供了預設名稱元件的定義:

  • 伺服器名稱

    database.server.name

    由連接配接器配置屬性指定的伺服器的邏輯名稱。
  • 模式名稱

    發生操作的模式的名稱。

  • tableName

    發生操作的表的名稱。

連接配接器應用類似的命名約定來标記其内部資料庫曆史主題、模式更改主題和事務中繼資料主題。

如果預設主題名稱不符合您的要求,您可以配置自定義主題名稱。要配置自定義主題名稱,請在邏輯主題路由 SMT 中指定正規表達式。有關使用邏輯主題路由 SMT 自定義主題命名的更多資訊,請參閱主題路由。

交易中繼資料

Debezium 可以生成表示事務邊界和豐富資料更改事件消息的事件。

Debezium 接收交易中繼資料的時間限制Debezium 僅注冊和接收部署連接配接器後發生的事務的中繼資料。部署連接配接器之前發生的事務的中繼資料不可用。

BEGIN

Debezium 為每個事務中的和

END

分隔符生成事務邊界事件。事務邊界事件包含以下字段:

  • status

    BEGIN

    END

  • id

    唯一交易辨別符的字元串表示形式。
  • event_count

    (用于

    END

    活動)

    事務發出的事件總數。

  • data_collections

    (用于

    END

    活動)

    一對

    data_collection

    event_count

    元素的數組。表示連接配接器針對源自資料集合的更改發出的事件數。

例子

{
  "status": "BEGIN",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}
           

除非通過

transaction.topic

選項覆寫,否則連接配接器會向主題發出事務事件。

**

.transaction

更改資料事件豐富

啟用事務中繼資料後,資料消息

Envelope

會增加一個新

transaction

字段。此字段以字段組合的形式提供有關每個事件的資訊:

  • id

    - 唯一交易辨別符的字元串表示
  • total_order

    - 事件在事務産生的所有事件中的絕對位置
  • data_collection_order

    - 事件在事務發出的所有事件中的每個資料收集位置

以下是消息的示例:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
    "total_order": "1",
    "data_collection_order": "1"
  }
}
           

對于沒有啟用 GTID 的系統,事務辨別符是使用 binlog 檔案名和 binlog 位置的組合建構的。例如,如果事務 BEGIN 事件對應的 binlog 檔案名和位置分别為 mysql-bin.000002 和 1913,則 Debezium 構造的事務辨別符将為

file=mysql-bin.000002,pos=1913

.

資料更改事件

Debezium MySQL 連接配接器為每個行級、、和操作生成資料

INSERT

更改

UPDATE

事件

DELETE

。每個事件都包含一個鍵和一個值。鍵和值的結構取決于已更改的表。

Debezium 和 Kafka Connect 是圍繞連續的事件消息流設計的。但是,這些事件的結構可能會随着時間的推移而發生變化,這對于消費者來說可能難以處理。為了解決這個問題,每個事件都包含其内容的模式,或者,如果您使用的是模式系統資料庫,消費者可以使用它從系統資料庫中擷取模式的模式 ID。這使得每個事件都是獨立的。

以下骨架 JSON 顯示了更改事件的基本四個部分。但是,如何配置您選擇在應用程式中使用的 Kafka Connect 轉換器決定了這四個部分在更改事件中的表示。僅當您将轉換器配置為生成該字段時,該

schema

字段才處于更改事件中。同樣,僅當您将轉換器配置為生成它時,事件鍵和事件有效負載才在更改事件中。如果您使用 JSON 轉換器并将其配置為生成所有四個基本更改事件部分,則更改事件具有以下結構:

{
 "schema": { 
   ...
  },
 "payload": { 
   ...
 },
 "schema": { 
   ...
 },
 "payload": { 
   ...
 },
}
           
物品 字段名稱 描述
1

schema

第一個

schema

字段是事件鍵的一部分。它指定了一個 Kafka Connect 模式,該模式描述了事件鍵

payload

部分中的内容。換句話說,第一個

schema

字段描述了主鍵的結構,如果表沒有主鍵,則描述唯一鍵的結構,用于已更改的表。 可以通過設定

message.key.columns

連接配接器配置屬性來覆寫表的主鍵。在這種情況下,第一個模式字段描述了由該屬性辨別的鍵的結構。
2

payload

第一個

payload

字段是事件鍵的一部分。它具有前一個字段描述的結構,

schema

并且包含已更改行的鍵。
3

schema

第二個

schema

字段是事件值的一部分。它指定了描述事件值

payload

部分内容的 Kafka Connect 模式。換句話說,第二個

schema

描述了被改變的行的結構。通常,此模式包含嵌套模式。
4

payload

第二個

payload

字段是事件值的一部分。它具有前一個字段描述的結構,

schema

并且包含已更改行的實際資料。

預設情況下,連接配接器将事件記錄流更改為名稱與事件源表相同的主題。請參閱主題名稱。

MySQL 連接配接器確定所有 Kafka Connect 模式名稱都遵循Avro 模式名稱格式。這意味着邏輯伺服器名稱必須以拉丁字母或下劃線開頭,即 az、AZ 或 _。邏輯伺服器名稱中的每個剩餘字元以及資料庫和表名稱中的每個字元都必須是拉丁字母、數字或下劃線,即 az、AZ、0-9 或 _。如果存在無效字元,則将其替換為下劃線字元。如果邏輯伺服器名稱、資料庫名稱或表名稱包含無效字元,并且用于區分名稱的唯一字元無效并是以替換為下劃線,這可能會導緻意外沖突。

更改事件鍵

更改事件的鍵包含更改表鍵的模式和更改行的實際鍵。

PRIMARY KEY

在連接配接器建立事件時,模式及其相應的有效負載都包含已更改表(或唯一限制)中每一列的字段。

請考慮下

customers

表,其後是此表的更改事件鍵的示例。

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
           

捕獲對表的更改的每個更改事件

customers

都具有相同的事件鍵架構。隻要

customers

表具有先前的定義,捕獲

customers

表更改的每個更改事件都具有以下關鍵結構。在 JSON 中,它看起來像這樣:

{
 "schema": { 
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key", 
    "optional": false, 
    "fields": [ 
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
 "payload": { 
    "id": 1001
  }
}
           
物品 字段名稱 描述
1

schema

鍵的模式部分指定了描述鍵

payload

部分内容的 Kafka Connect 模式。
2

mysql-server-1.inventory.customers.Key

定義密鑰有效負載結構的架構名稱。此架構描述了已更改表的主鍵結構。鍵架構名稱的格式為connector-name。資料庫名稱。表名。

Key

. 在這個例子中:

mysql-server-1

是生成此事件的連接配接器的名稱。

inventory

是包含已更改表的資料庫。

customers

是更新的表。
3

optional

payload

訓示事件鍵是否必須在其字段中包含值。在此示例中,密鑰的有效負載中的值是必需的。當表沒有主鍵時,鍵的有效負載字段中的值是可選的。
4

fields

指定 中預期的

payload

每個字段,包括每個字段的名稱、類型以及是否需要。
5

payload

包含為其生成此更改事件的行的鍵。在此示例中,鍵包含一個

id

值為 的字段

1001

更改事件值

更改事件中的值比鍵複雜一些。與鍵一樣,值也有一個

schema

部分和一個

payload

部分。該

schema

部分包含描述該部分

Envelope

結構的架構

payload

,包括其嵌套字段。建立、更新或删除資料的操作的更改事件都有一個帶有信封結構的值負載。

考慮用于顯示更改事件鍵示例的相同示例表:

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
           

對此表的更改的更改事件的值部分描述為:

  • 建立事件
  • 更新事件
  • 主鍵更新
  • 删除事件
  • 墓碑事件

建立事件

以下示例顯示了連接配接器為在

customers

表中建立資料的操作生成的更改事件的值部分:

{
  "schema": { 
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value", 
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source", 
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope" 
  },
  "payload": { 
    "op": "c", 
    "ts_ms": 1465491411815, 
    "before": null, 
    "after": { 
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "[email protected]"
    },
    "source": { 
      "version": "1.9.5.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', '[email protected]')"
    }
  }
}
           
物品 字段名稱 描述
1

schema

值的架構,它描述了值的有效負載的結構。在連接配接器為特定表生成的每個更改事件中,更改事件的值架構都是相同的。
2

name

在該

schema

部分中,每個

name

字段都為值的有效負載中的字段指定架構。

mysql-server-1.inventory.customers.Value

是有效負載

before

after

字段的架構。此架構特定于

customers

表。 模式名稱

before

after

字段的格式為,這確定模式名稱在資料庫中是唯一的。這意味着當使用Avro 轉換器時,每個邏輯源中每個表的生成 Avro 模式都有自己的演變和曆史。

*logicalName*.*tableName*.Value

3

name

io.debezium.connector.mysql.Source

是有效負載

source

字段的架構。此模式特定于 MySQL 連接配接器。連接配接器将它用于它生成的所有事件。
4

name

mysql-server-1.inventory.customers.Envelope

是有效負載整體結構的架構,其中

mysql-server-1

是連接配接器名稱,

inventory

是資料庫,

customers

是表。
5

payload

該值的實際資料。這是更改事件提供的資訊。 看起來事件的 JSON 表示比它們描述的行大得多。這是因為 JSON 表示必須包括消息的模式和有效負載部分。但是,通過使用Avro 轉換器,您可以顯着減小連接配接器流式傳輸到 Kafka 主題的消息的大小。
6

op

強制字元串,描述導緻連接配接器生成事件的操作類型。在本例中,

c

表示操作建立了一行。有效值為:

c

= 建立

u

= 更新

d

= 删除

r

= 讀取(僅适用于快照)
7

ts_ms

顯示連接配接器處理事件的時間的可選字段。該時間基于運作 Kafka Connect 任務的 JVM 中的系統時鐘。 在

source

對象中,

ts_ms

訓示在資料庫中進行更改的時間。通過比較 for

payload.source.ts_ms

的值

payload.ts_ms

,您可以确定源資料庫更新和 Debezium 之間的延遲。
8

before

一個可選字段,指定事件發生之前行的狀态。當該

op

字段

c

用于建立時,如本例所示,該

before

字段是

null

因為此更改事件用于新内容。
9

after

一個可選字段,指定事件發生後行的狀态。在此示例中,該

after

字段包含新行的

id

first_name

last_name

email

列的值。
10

source

描述事件源中繼資料的必填字段。此字段包含可用于将此事件與其他事件進行比較的資訊,包括事件的來源、事件發生的順序以及事件是否屬于同一事務的一部分。源中繼資料包括:Debezium 版本連接配接器名稱記錄事件的 binlog 名稱二進制日志位置活動中的行如果事件是快照的一部分包含新行的資料庫和表的名稱建立事件的 MySQL 線程的 ID(僅限非快照)MySQL 伺服器 ID(如果可用)在資料庫中進行更改時的時間戳如果啟用了

binlog_rows_query_log_events

MySQL 配置選項并啟用了連接配接器配置

include.query

屬性,則該

source

字段還提供該

query

字段,該字段包含導緻更改事件的原始 SQL 語句。

更新事件

示例表中更新的更改事件的值與該表的建立

customers

事件具有相同的模式。同樣,事件值的有效負載具有相同的結構。但是,事件值有效負載在更新事件中包含不同的值。以下是連接配接器為表中的更新生成的事件中的更改事件值示例:

customers

{
  "schema": { ... },
  "payload": {
    "before": { 
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "[email protected]"
    },
    "after": { 
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "[email protected]"
    },
    "source": { 
      "version": "1.9.5.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581029100,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 
    "ts_ms": 1465581029523 
  }
}
           
物品 字段名稱 描述
1

before

一個可選字段,指定事件發生之前行的狀态。在更新事件值中,該

before

字段包含每個表列的字段以及資料庫送出之前該列中的值。在本例中,

first_name

值為

Anne.

2

after

一個可選字段,指定事件發生後行的狀态。您可以比較

before

after

結構來确定對此行的更新是什麼。在示例中,

first_name

值為 now

Anne Marie

3

source

描述事件源中繼資料的必填字段。字段結構與建立事件中的

source

字段相同,但有些值不同,例如示例更新事件來自binlog中的不同位置。源中繼資料包括:Debezium 版本連接配接器名稱記錄事件的 binlog 名稱二進制日志位置活動中的行如果事件是快照的一部分包含更新行的資料庫和表的名稱建立事件的 MySQL 線程的 ID(僅限非快照)MySQL 伺服器 ID(如果可用)在資料庫中進行更改時的時間戳如果啟用了

binlog_rows_query_log_events

MySQL 配置選項并啟用了連接配接器配置

include.query

屬性,則該

source

字段還提供該

query

字段,該字段包含導緻更改事件的原始 SQL 語句。
4

op

描述操作類型的強制字元串。在更新事件值中,

op

字段值為

u

,表示該行因更新而更改。
5

ts_ms

顯示連接配接器處理事件的時間的可選字段。該時間基于運作 Kafka Connect 任務的 JVM 中的系統時鐘。 在

source

對象中,

ts_ms

訓示在資料庫中進行更改的時間。通過比較 for

payload.source.ts_ms

的值

payload.ts_ms

,您可以确定源資料庫更新和 Debezium 之間的延遲。
更新行的主鍵/唯一鍵的列會更改行鍵的值。當一個鍵發生變化時,Debezium 輸出三個事件:一個

DELETE

事件和一個墓碑事件,該事件具有該行的舊鍵,然後是一個具有該行的新鍵的事件。詳細資訊在下一節中。

主鍵更新

更改行的主鍵字段的

UPDATE

操作稱為主鍵更改。對于主鍵更改,代替

UPDATE

事件記錄,連接配接器會發出

DELETE

舊鍵的

CREATE

事件記錄和新(更新的)鍵的事件記錄。這些事件具有通常的結構和内容,此外,每個事件都有一個與主鍵更改相關的消息頭:

  • 事件

    DELETE

    記錄具有

    __debezium.newkey

    消息頭。此标頭的值是更新行的新主鍵。
  • 事件

    CREATE

    記錄具有

    __debezium.oldkey

    消息頭。此标頭的值是更新行所具有的先前(舊)主鍵。

删除事件

删除更改事件中的值與同一表的建立和更新

schema

事件具有相同的部分。示例表的删除事件中的部分如下所示:

payload``customers

{
  "schema": { ... },
  "payload": {
    "before": { 
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "[email protected]"
    },
    "after": null, 
    "source": { 
      "version": "1.9.5.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581902300,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 805,
      "row": 0,
      "thread": 7,
      "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d", 
    "ts_ms": 1465581902461 
  }
}
           
物品 字段名稱 描述
1

before

可選字段,指定事件發生前行的狀态。在删除事件值中,該

before

字段包含在使用資料庫送出删除之前該行中的值。
2

after

可選字段,指定事件發生後行的狀态。在删除事件值中,

after

字段為

null

,表示該行不再存在。
3

source

描述事件源中繼資料的必填字段。在删除事件值中,

source

字段結構與同一表的建立和更新事件相同。許多

source

字段值也相同。在删除事件值中,

ts_ms

pos

字段值以及其他值可能已更改。但是删除事件值

source

中的字段提供了相同的中繼資料:Debezium 版本連接配接器名稱記錄事件的 binlog 名稱二進制日志位置活動中的行如果事件是快照的一部分包含更新行的資料庫和表的名稱建立事件的 MySQL 線程的 ID(僅限非快照)MySQL 伺服器 ID(如果可用)在資料庫中進行更改時的時間戳如果啟用了

binlog_rows_query_log_events

MySQL 配置選項并啟用了連接配接器配置

include.query

屬性,則該

source

字段還提供該

query

字段,該字段包含導緻更改事件的原始 SQL 語句。
4

op

描述操作類型的強制字元串。

op

字段值為

d

,表示該行已被删除。
5

ts_ms

顯示連接配接器處理事件的時間的可選字段。該時間基于運作 Kafka Connect 任務的 JVM 中的系統時鐘。 在

source

對象中,

ts_ms

訓示在資料庫中進行更改的時間。通過比較 for

payload.source.ts_ms

的值

payload.ts_ms

,您可以确定源資料庫更新和 Debezium 之間的延遲。

删除更改事件記錄為消費者提供了處理删除該行所需的資訊。包含舊值是因為某些消費者可能需要它們才能正确處理删除。

MySQL 連接配接器事件旨在與Kafka 日志壓縮一起使用。隻要至少保留每個鍵的最新消息,日志壓縮就可以删除一些較舊的消息。這讓 Kafka 可以回收存儲空間,同時確定主題包含完整的資料集并可用于重新加載基于鍵的狀态。

墓碑事件

當删除一行時,删除事件值仍然适用于日志壓縮,因為 Kafka 可以删除所有具有相同鍵的早期消息。但是,要讓 Kafka 删除具有相同鍵的所有消息,消息值必須是

null

. 為了實作這一點,在 Debezium 的 MySQL 連接配接器發出删除事件後,連接配接器會發出一個特殊的墓碑事件,該事件具有相同的鍵但有一個

null

值。

資料類型映射

Debezium MySQL 連接配接器表示對行的更改,事件的結構類似于行所在的表。該事件包含每個列值的字段。該列的 MySQL 資料類型決定了 Debezium 如何表示事件中的值。

存儲字元串的列在 MySQL 中使用字元集和排序規則定義。MySQL 連接配接器在讀取 binlog 事件中列值的二進制表示時使用列的字元集。

連接配接器可以将 MySQL 資料類型映射到文字和語義類型。

  • 文字類型:如何使用 Kafka Connect 模式類型表示值。
  • 語義類型:Kafka Connect 模式如何捕獲字段(模式名稱)的含義。

如果預設資料類型轉換不能滿足您的需求,您可以為連接配接器建立自定義轉換器。

基本類型

下表顯示了連接配接器如何映射基本 MySQL 資料類型。

MySQL 類型 文字類型 語義類型

BOOLEAN, BOOL

BOOLEAN

不适用

BIT(1)

BOOLEAN

不适用

BIT(>1)

BYTES

io.debezium.data.Bits

length

schema 參數包含一個表示位數的整數 。

byte[]

包含little-endian形式的位,并調整大小以包含指定數量的位。例如,

n

位在哪裡:

numBytes = n/8 + (n%8== 0 ? 0 : 1)

TINYINT

INT16

不适用

SMALLINT[(M)]

INT16

不适用

MEDIUMINT[(M)]

INT32

不适用

INT, INTEGER[(M)]

INT32

不适用

BIGINT[(M)]

INT64

不适用

REAL[(M,D)]

FLOAT32

不适用

FLOAT[(M,D)]

FLOAT64

不适用

DOUBLE[(M,D)]

FLOAT64

不适用

CHAR(M)]

STRING

不适用

VARCHAR(M)]

STRING

不适用

BINARY(M)]

BYTES

或者

STRING

n/a 原始位元組(預設)、base64 編碼的字元串或十六進制編碼的字元串,取決于

binary.handling.mode

連接配接器配置屬性設定。

VARBINARY(M)]

BYTES

或者

STRING

n/a 原始位元組(預設)、base64 編碼的字元串或十六進制編碼的字元串,取決于

binary.handling.mode

連接配接器配置屬性設定。

TINYBLOB

BYTES

或者

STRING

n/a 原始位元組(預設)、base64 編碼的字元串或十六進制編碼的字元串,取決于

binary.handling.mode

連接配接器配置屬性設定。

TINYTEXT

STRING

不适用

BLOB

BYTES

或者

STRING

n/a 原始位元組(預設)、base64 編碼的字元串或十六進制編碼的字元串,取決于

binary.handling.mode

連接配接器配置屬性設定。 僅支援最大為 2GB 的值。建議使用聲明檢查模式将大列值外部化。

TEXT

STRING

不适用 僅支援最大為 2GB 的值。建議使用聲明檢查模式将大列值外部化。

MEDIUMBLOB

BYTES

或者

STRING

n/a 原始位元組(預設)、base64 編碼的字元串或十六進制編碼的字元串,取決于

binary.handling.mode

連接配接器配置屬性設定。

MEDIUMTEXT

STRING

不适用

LONGBLOB

BYTES

或者

STRING

n/a 原始位元組(預設)、base64 編碼的字元串或十六進制編碼的字元串,取決于

binary.handling.mode

連接配接器配置屬性設定。 僅支援最大為 2GB 的值。建議使用聲明檢查模式将大列值外部化。

LONGTEXT

STRING

不适用 僅支援最大為 2GB 的值。建議使用聲明檢查模式将大列值外部化。

JSON

STRING

io.debezium.data.Json

包含

JSON

文檔、數組或标量的字元串表示形式。

ENUM

STRING

io.debezium.data.Enum

allowed

schema 參數包含逗号分隔的允許值清單 。

SET

STRING

io.debezium.data.EnumSet

allowed

schema 參數包含逗号分隔的允許值清單 。
`YEAR[(2 4)]`

INT32

TIMESTAMP[(M)]

STRING

io.debezium.time.ZonedTimestamp

采用ISO 8601格式,精度為微秒。MySQL 允許

M

0-6

.

時間類型

排除

TIMESTAMP

資料類型,MySQL 時态類型取決于

time.precision.mode

連接配接器配置屬性的值。對于

TIMESTAMP

預設值指定為

CURRENT_TIMESTAMP

或的列

NOW

,該值

1970-01-01 00:00:00

用作 Kafka Connect 架構中的預設值。

MySQL 允許、 和列使用零值

DATE

,因為零值有時優于空值。當列定義允許空值時,MySQL 連接配接器将零值表示為空值,或者當列不允許空值時,将零值表示為紀元日。

DATETIME``TIMESTAMP

沒有時區的時間值

DATETIME

類型表示本地日期和時間,例如“2018-01-13 09:48:27”。如您所見,沒有時區資訊。此類列使用 UTC 根據列的精度轉換為紀元毫秒或微秒。該

TIMESTAMP

類型表示沒有時區資訊的時間戳。MySQL 在寫入時将其從伺服器(或會話的)目前時區轉換為 UTC,在讀回值時将其從 UTC 轉換為伺服器(或會話的)目前時區。例如:

  • DATETIME

    值為.

    2018-06-20 06:37:03

    _

    1529476623000

  • TIMESTAMP

    值為.

    2018-06-20 06:37:03

    _

    2018-06-20T13:37:03Z

io.debezium.time.ZonedTimestamp

根據伺服器(或會話的)目前時區,此類列将轉換為 UTC 中的等效項。預設從伺服器查詢時區。如果失敗,則必須由資料庫

connectionTimeZone

MySQL 配置選項明确指定。例如,如果資料庫的時區(全局或通過

connectionTimeZone

選項為連接配接器配置)是“America/Los_Angeles”,則 TIMESTAMP 值“2018-06-20 06:37:03”

ZonedTimestamp

由值“2018-06-20T13:37:03Z”。

運作 Kafka Connect 和 Debezium 的 JVM 的時區不會影響這些轉換。

有關與時間值相關的屬性的更多詳細資訊,請參見MySQL 連接配接器配置屬性的文檔。

  • time.precision.mode=adaptive_time_microseconds(預設)

    MySQL 連接配接器根據列的資料類型定義确定文字類型和語義類型,以便事件準确表示資料庫中的值。所有時間字段都以微秒為機關。隻有在 to 範圍内的正字段

    TIME

    值才能被正确捕獲。

    00:00:00.000000``23:59:59.999999

    表 13. 映射時

    time.precision.mode=adaptive_time_microseconds

    MySQL 類型文字類型語義類型

    DATE``INT32``io.debezium.time.Date

    表示自紀元以來的天數。

    TIME[(M)]``INT64``io.debezium.time.MicroTime

    以微秒為機關表示時間值,不包括時區資訊。MySQL 允許

    M

    0-6

    .

    DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)``INT64``io.debezium.time.Timestamp

    表示經過紀元的毫秒數,不包括時區資訊。

    DATETIME(4), DATETIME(5), DATETIME(6)``INT64``io.debezium.time.MicroTimestamp

    表示經過紀元的微秒數,不包括時區資訊。
  • time.precision.mode=連接配接

    MySQL 連接配接器使用定義的 Kafka Connect 邏輯類型。這種方法不如預設方法精确,如果資料庫列的小數秒精度值大于

    3

    .

    00:00:00.000

    隻能處理to範圍内的值

    23:59:59.999

    time.precision.mode=connect

    僅當您可以確定

    TIME

    表中的值永遠不會超過支援的範圍時才設定。該

    connect

    設定預計将在 Debezium 的未來版本中删除。表 14. 映射時

    time.precision.mode=connect

    MySQL 類型文字類型語義類型

    DATE``INT32``org.apache.kafka.connect.data.Date

    表示自紀元以來的天數。

    TIME[(M)]``INT64``org.apache.kafka.connect.data.Time

    表示自午夜以來的時間值(以微秒為機關),不包括時區資訊。

    DATETIME[(M)]``INT64``org.apache.kafka.connect.data.Timestamp

    表示自紀元以來的毫秒數,不包括時區資訊。

小數類型

Debezium 連接配接器根據

decimal.handling.mode

連接配接器配置屬性的設定處理小數。

  • decimal.handling.mode=精确

    表 15. 映射時

    decimal.handling.mode=precise

    MySQL 類型文字類型語義類型

    NUMERIC[(M[,D])]``BYTES``org.apache.kafka.connect.data.Decimal

    schema 參數包含一個整數,

    scale

    表示小數點移動了多少位。

    DECIMAL[(M[,D])]``BYTES``org.apache.kafka.connect.data.Decimal

    schema 參數包含一個整數,

    scale

    表示小數點移動了多少位。
  • decimal.handling.mode=double

    表 16. 映射時

    decimal.handling.mode=double

    MySQL 類型文字類型語義類型

    NUMERIC[(M[,D])]``FLOAT64

    不适用

    DECIMAL[(M[,D])]``FLOAT64

    不适用
  • 十進制處理模式=字元串

    表 17. 映射時

    decimal.handling.mode=string

    MySQL 類型文字類型語義類型

    NUMERIC[(M[,D])]``STRING

    不适用

    DECIMAL[(M[,D])]``STRING

    不适用

布爾值

MySQL

BOOLEAN

以特定方式在内部處理該值。該

BOOLEAN

列在内部映射到

TINYINT(1)

資料類型。在流式傳輸期間建立表時,它使用正确的

BOOLEAN

映射,因為 Debezium 接收原始 DDL。在快照期間,Debezium 執行以擷取為和列

SHOW CREATE TABLE

傳回的表定義。Debezium 然後無法獲得原始類型映射,是以映射到.

TINYINT(1)``BOOLEAN``TINYINT(1)``TINYINT(1)

為了使您能夠将源列轉換為布爾資料類型,Debezium 提供了一個

TinyIntOneToBooleanConverter

自定義轉換器,您可以通過以下方式之一使用它:

  • TINYINT(1)

    将所有或

    TINYINT(1) UNSIGNED

    列映射到

    BOOLEAN

    類型。
  • 使用逗号分隔的正規表達式清單枚舉列的子集。

    要使用這種類型的轉換,您必須

    converters

    使用參數設定配置屬性,

    selector

    如下例所示:
    converters=boolean
    boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
    boolean.selector=db1.table1.*, db1.table2.column1
               
  • tinyint unsigned

    注意: MySQL8在執行快照時沒有顯示類型的長度

    SHOW CREATE TABLE

    ,這意味着這個轉換器不起作用。新選項

    length.checker

    可以解決這個問題,預設值為

    true

    . 禁用

    length.checker

    并指定需要轉換為

    selector

    屬性的列,而不是根據類型轉換所有列,如下例所示:
    converters=boolean
    boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
    boolean.length.checker=false
    boolean.selector=db1.table1.*, db1.table2.column1
               

空間類型

目前,Debezium MySQL 連接配接器支援以下空間資料類型。

MySQL 類型 文字類型 語義類型

GEOMETRY,LINESTRING,POLYGON,MULTIPOINT,MULTILINESTRING,MULTIPOLYGON,GEOMETRYCOLLECTION

STRUCT

io.debezium.data.geometry.Geometry

包含具有兩個字段的結構:

srid (INT32

: 定義存儲在結構中的幾何對象類型的空間參考系統 ID

wkb (BYTES)

: 以 Well-Known-Binary (wkb) 格式編碼的幾何對象的二進制表示。有關詳細資訊,請參閱開放地理空間聯盟。

設定 MySQL

在安裝和運作 Debezium 連接配接器之前,需要執行一些 MySQL 設定任務。

建立使用者

Debezium MySQL 連接配接器需要 MySQL 使用者帳戶。此 MySQL 使用者必須對 Debezium MySQL 連接配接器捕獲更改的所有資料庫具有适當的權限。

先決條件

  • 一個 MySQL 伺服器。
  • SQL 指令的基本知識。

程式

  1. 建立 MySQL 使用者:
    mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
               
  2. 授予使用者所需的權限:
    mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
               
    下表描述了權限。
    如果使用不允許全局讀取鎖定的托管選項(例如 Amazon RDS 或 Amazon Aurora),則使用表級鎖定來建立一緻快照。在這種情況下,您還需要向

    LOCK TABLES

    您建立的使用者授予權限。有關更多詳細資訊,請參閱快照。
  3. 最終确定使用者的權限:
    mysql> FLUSH PRIVILEGES;
               
關鍵詞 描述

SELECT

使連接配接器能夠從資料庫中的表中選擇行。這僅在執行快照時使用。

RELOAD

允許連接配接器使用該

FLUSH

語句來清除或重新加載内部緩存、重新整理表或擷取鎖。這僅在執行快照時使用。

SHOW DATABASES

SHOW DATABASE

通過發出語句使連接配接器能夠檢視資料庫名稱。這僅在執行快照時使用。

REPLICATION SLAVE

使連接配接器能夠連接配接并讀取 MySQL 伺服器 binlog。

REPLICATION CLIENT

允許連接配接器使用以下語句:

SHOW MASTER STATUS``SHOW SLAVE STATUS``SHOW BINARY LOGS

連接配接器總是需要這個。

ON

辨別權限适用的資料庫。

TO 'user'

指定要授予權限的使用者。

IDENTIFIED BY 'password'

指定使用者的 MySQL 密碼。

啟用二進制日志

您必須為 MySQL 複制啟用二進制日志記錄。二進制日志記錄複制工具的事務更新以傳播更改。

先決條件

  • 一個 MySQL 伺服器。
  • 适當的 MySQL 使用者權限。

程式

  1. 檢查該

    log-bin

    選項是否已打開:
    // for MySql 5.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM information_schema.global_variables WHERE variable_name='log_bin';
    // for MySql 8.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM performance_schema.global_variables WHERE variable_name='log_bin';
               
  2. 如果是

    OFF

    ,請使用以下屬性配置您的 MySQL 伺服器配置檔案,如下表所述:
    server-id         = 223344
    log_bin           = mysql-bin
    binlog_format     = ROW
    binlog_row_image  = FULL
    expire_logs_days  = 10
               
  3. 通過再次檢查 binlog 狀态來确認您的更改:
    // for MySql 5.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM information_schema.global_variables WHERE variable_name='log_bin';
    // for MySql 8.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM performance_schema.global_variables WHERE variable_name='log_bin';
               
财産 描述

server-id

對于 MySQL 叢集中的每個伺服器和複制用戶端,的值

server-id

必須是唯一的。在 MySQL 連接配接器設定期間,Debezium 為連接配接器配置設定一個唯一的伺服器 ID。

log_bin

的值

log_bin

是二進制日志檔案序列的基本名稱。

binlog_format

binlog-format

必須設定為

ROW

或。

row

binlog_row_image

binlog_row_image

必須設定為

FULL

或。

full

expire_logs_days

這是自動删除 binlog 檔案的天數。預設值為 ,表示不自動删除。設定值以比對您的環境需求。請參閱MySQL 清除 binlog 檔案。

啟用 GTID

全局事務辨別符 (GTID) 唯一辨別叢集内伺服器上發生的事務。盡管 Debezium MySQL 連接配接器不需要,但使用 GTID 可以簡化複制并使您能夠更輕松地确認主伺服器和副本伺服器是否一緻。

GTID 在 MySQL 5.6.5 及更高版本中可用。有關更多詳細資訊,請參閱MySQL 文檔。

先決條件

  • 一個 MySQL 伺服器。
  • SQL 指令的基本知識。
  • 通路 MySQL 配置檔案。

程式

  1. 啟用

    gtid_mode

    mysql> gtid_mode=ON
               
  2. 啟用

    enforce_gtid_consistency

    mysql> enforce_gtid_consistency=ON
               
  3. 确認更改:
    mysql> show global variables like '%GTID%';
               

結果

+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| enforce_gtid_consistency | ON    |
| gtid_mode                | ON    |
+--------------------------+-------+
           
選項 描述

gtid_mode

布爾值,指定是否啟用 MySQL 伺服器的 GTID 模式。

ON

= 啟用

OFF

= 禁用

enforce_gtid_consistency

布爾值,指定伺服器是否通過允許執行可以以事務安全方式記錄的語句來強制執行 GTID 一緻性。使用 GTID 時需要。

ON

= 啟用

OFF

= 禁用

配置會話逾時

當為大型資料庫制作初始一緻快照時,您建立的連接配接可能會在讀取表時逾時。您可以通過在 MySQL 配置檔案中配置

interactive_timeout

和來防止這種行為。

wait_timeout

先決條件

  • 一個 MySQL 伺服器。
  • SQL 指令的基本知識。
  • 通路 MySQL 配置檔案。

程式

  1. 配置

    interactive_timeout

    mysql> interactive_timeout=<duration-in-seconds>
               
  2. 配置

    wait_timeout

    mysql> wait_timeout=<duration-in-seconds>
               
選項 描述

interactive_timeout

伺服器在關閉互動式連接配接之前等待其活動的秒數。有關詳細資訊,請參閱MySQL 的文檔。

wait_timeout

伺服器在關閉非互動式連接配接之前等待其活動的秒數。有關詳細資訊,請參閱MySQL 的文檔。

啟用查詢日志事件

您可能希望檢視

SQL

每個 binlog 事件的原始語句。在 MySQL 配置檔案中啟用該

binlog_rows_query_log_events

選項允許您執行此操作。

此選項在 MySQL 5.6 及更高版本中可用。

先決條件

  • 一個 MySQL 伺服器。
  • SQL 指令的基本知識。
  • 通路 MySQL 配置檔案。

程式

  • 啟用

    binlog_rows_query_log_events

    mysql> binlog_rows_query_log_events=ON
               

    binlog_rows_query_log_events

    設定為啟用/禁用對

    SQL

    在 binlog 條目中包含原始語句的支援的值。
    • ON

      = 啟用
    • OFF

      = 禁用

部署

要部署 Debezium MySQL 連接配接器,您需要安裝 Debezium MySQL 連接配接器存檔,配置連接配接器,然後通過将其配置添加到 Kafka Connect 來啟動連接配接器。

先決條件

  • 已安裝Apache Zookeeper、Apache Kafka和Kafka Connect。
  • MySQL 伺服器已安裝并設定為與 Debezium 連接配接器一起使用。

程式

  1. 下載下傳 Debezium MySQL 連接配接器插件。
  2. 将檔案提取到您的 Kafka Connect 環境中。
  3. 将包含 JAR 檔案的目錄添加到Kafka Connect 的

    plugin.path

    .
  4. 配置連接配接器并将配置添加到您的 Kafka Connect 叢集。
  5. 重新啟動 Kafka Connect 程序以擷取新的 JAR 檔案。

如果您正在使用不可變容器,請參閱Debezium 的Apache Zookeeper、Apache Kafka、MySQL 和 Kafka Connect 容器映像,其中 MySQL 連接配接器已安裝并準備運作。

您還可以在 Kubernetes 和 OpenShift 上運作 Debezium。

MySQL 連接配接器配置示例

以下是連接配接器執行個體的配置示例,該執行個體從位于 192.168.99.100 的端口 3306 上的 MySQL 伺服器捕獲資料,我們在邏輯上将其命名為

fullfillment

. 通常,您通過設定可用于連接配接器的配置屬性,在 JSON 檔案中配置 Debezium MySQL 連接配接器。

您可以選擇為資料庫中的模式和表的子集生成事件。或者,您可以忽略、屏蔽或截斷包含敏感資料、大于指定大小或您不需要的列。

{
    "name": "inventory-connector", 
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "192.168.99.100", 
        "database.port": "3306", 
        "database.user": "debezium-user", 
        "database.password": "debezium-user-pw", 
        "database.server.id": "184054", 
        "database.server.name": "fullfillment", 
        "database.include.list": "inventory", 
        "database.history.kafka.bootstrap.servers": "kafka:9092", 
        "database.history.kafka.topic": "dbhistory.fullfillment", 
        "include.schema.changes": "true" 
    }
}
           
向 Kafka Connect 服務注冊時的連接配接器名稱。
連接配接器的類名。
MySQL 伺服器位址。
MySQL 伺服器端口号。
具有适當權限的 MySQL 使用者。
MySQL 使用者的密碼。
連接配接器的唯一 ID。
MySQL 伺服器或叢集的邏輯名稱。
指定伺服器托管的資料庫清單。
連接配接器用于将 DDL 語句寫入和恢複到資料庫曆史主題的 Kafka 代理清單。
資料庫曆史主題的名稱。本主題僅供内部使用,消費者不得使用。
指定連接配接器是否應為 DDL 更改生成事件并将它們發送到

fulfillment

架構更改主題以供使用者使用的标志。

有關可以為 Debezium MySQL 連接配接器設定的配置屬性的完整清單,請參閱MySQL 連接配接器配置屬性。

您可以使用指令将此配置發送

POST

到正在運作的 Kafka Connect 服務。該服務記錄配置并啟動一個執行以下操作的連接配接器任務:

  • 連接配接到 MySQL 資料庫。
  • 在捕獲模式下讀取表的更改資料表。
  • 流将事件記錄更改為 Kafka 主題。

添加連接配接器配置

要開始運作 MySQL 連接配接器,請配置連接配接器配置,并将配置添加到您的 Kafka Connect 叢集。

先決條件

  • MySQL 設定為使用 Debezium 連接配接器。
  • Debezium MySQL 連接配接器已安裝。

程式

  1. 為 MySQL 連接配接器建立配置。
  2. 使用Kafka Connect REST API将該連接配接器配置添加到您的 Kafka Connect 叢集。

結果

連接配接器啟動後,它會為連接配接器配置的 MySQL 資料庫執行一緻的快照。然後,連接配接器開始為行級操作生成資料更改事件,并将更改事件記錄流式傳輸到 Kafka 主題。

連接配接器屬性

Debezium MySQL 連接配接器有許多配置屬性,您可以使用它們來為您的應用程式實作正确的連接配接器行為。許多屬性都有預設值。有關屬性的資訊組織如下:

  • 必需的連接配接器配置屬性
  • 進階連接配接器配置屬性
  • 控制 Debezium 如何處理從資料庫曆史主題中讀取的事件的資料庫曆史連接配接器配置屬性。
    • 傳遞資料庫曆史記錄屬性
  • 控制資料庫驅動程式行為的傳遞資料庫驅動程式屬性。

除非預設值可用,否則需要以下配置屬性。

必需的 Debezium MySQL 連接配接器配置屬性

财産 預設 描述

name

無預設值 連接配接器的唯一名稱。嘗試使用相同名稱再次注冊失敗。所有 Kafka Connect 連接配接器都需要此屬性。

connector.class

無預設值 連接配接器的 Java 類的名稱。始終指定

io.debezium.connector.mysql.MySqlConnector

MySQL 連接配接器。

tasks.max

1

應為此連接配接器建立的最大任務數。MySQL 連接配接器始終使用單個任務,是以不使用此值,是以預設值始終是可以接受的。

database.hostname

無預設值 MySQL 資料庫伺服器的 IP 位址或主機名。

database.port

3306

MySQL 資料庫伺服器的整數端口号。

database.user

無預設值 連接配接到 MySQL 資料庫伺服器時要使用的 MySQL 使用者的名稱。

database.password

無預設值 連接配接到 MySQL 資料庫伺服器時使用的密碼。

database.server.name

無預設值 為 Debezium 在其中捕獲更改的特定 MySQL 資料庫伺服器/叢集辨別并提供命名空間的邏輯名稱。邏輯名稱在所有其他連接配接器中應該是唯一的,因為它用作所有接收此連接配接器發出的事件的 Kafka 主題名稱的字首。資料庫伺服器邏輯名稱中隻能使用字母數字字元、連字元、點和下劃線。 +不要更改此屬性的值。如果您更改名稱值,在重新啟動後,連接配接器不會繼續向原始主題發出事件,而是向名稱基于新值的主題發出後續事件。連接配接器也無法恢複其資料庫曆史主題。

database.server.id

随機的 此資料庫用戶端的數字 ID,在 MySQL 叢集中所有目前運作的資料庫程序中必須是唯一的。此連接配接器作為另一台伺服器(具有此唯一 ID)加入 MySQL 資料庫叢集,是以它可以讀取 binlog。預設情況下,會生成一個介于 5400 和 6400 之間的随機數,但建議顯式設定一個值。

database.include.list

空字元串 一個可選的、以逗号分隔的正規表達式清單,與要為其捕獲更改的資料庫的名稱相比對。連接配接器不會捕獲名稱不在的任何資料庫中的更改

database.include.list

。預設情況下,連接配接器會捕獲所有資料庫中的更改。不要同時設定

database.exclude.list

連接配接器配置屬性。

database.exclude.list

空字元串 一個可選的、以逗号分隔的正規表達式清單,與您不想捕獲其更改的資料庫的名稱相比對。連接配接器捕獲名稱不在

database.exclude.list

. 不要同時設定

database.include.list

連接配接器配置屬性。

table.include.list

空字元串 一個可選的、以逗号分隔的正規表達式清單,與您要捕獲其更改的表的完全限定表辨別符比對。連接配接器不會捕獲任何未包含在

table.include.list

. 每個辨別符的格式為databaseName。表名。預設情況下,連接配接器會捕獲每個資料庫中每個非系統表中的更改,這些表的更改正在被捕獲。不要同時指定

table.exclude.list

連接配接器配置屬性。

table.exclude.list

空字元串 一個可選的、以逗号分隔的正規表達式清單,比對您不想捕獲其更改的表的完全限定表辨別符。連接配接器捕獲未包含在任何表中的更改

table.exclude.list

。每個辨別符的格式為databaseName。表名。不要同時指定

table.include.list

連接配接器配置屬性。

column.exclude.list

空字元串 一個可選的、以逗号分隔的正規表達式清單,與要從更改事件記錄值中排除的列的完全限定名稱比對。列的完全限定名稱的格式為databaseName。表名。列名。

column.include.list

空字元串 一個可選的、以逗号分隔的正規表達式清單,與要包含在更改事件記錄值中的列的完全限定名稱比對。列的完全限定名稱的格式為databaseName。表名。列名。

column.truncate.to._length_.chars

不适用 如果字段值長于指定的字元數,則以逗号分隔的可選正規表達式清單比對基于字元的列的完全限定名稱,這些列的值應在更改事件記錄值中被截斷。您可以在單個配置中配置具有不同長度的多個屬性。長度必須是正整數。列的完全限定名稱的格式為databaseName。表名。列名。

column.mask.with._length_.chars

不适用 一個可選的、以逗号分隔的正規表達式清單,比對基于字元的列的完全限定名稱,其值應在更改事件消息值中替換為由指定數量的星号 (

*

) 字元組成的字段值。您可以在單個配置中配置具有不同長度的多個屬性。每個長度必須是正整數或零。列的完全限定名稱的格式為databaseName。表名。列名。

column.mask.hash.*hashAlgorithm*.with.salt.*salt*

;

column.mask.hash.v2.*hashAlgorithm*.with.salt.*salt*

不适用 一個可選的、以逗号分隔的正規表達式清單,比對基于字元的列的完全限定名稱。列的完全限定名稱采用. 在生成的更改事件記錄中,指定列的值将替換為假名。

*<databaseName>*.*<tableName>*.*<columnName>*

假名由應用指定的hashAlgorithm和salt産生的散列值組成。基于所使用的散列函數,參照完整性得以保持,而列值被替換為假名。Java Cryptography Architecture Standard Algorithm Name Documentation的MessageDigest 部分描述了支援的散列函數。 在以下示例中,

CzQMA0cB5K

是随機選擇的鹽。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

如有必要,假名會自動縮短為列的長度。連接配接器配置可以包括指定不同雜湊演算法和鹽的多個屬性。 根據使用的hashAlgorithm、選擇的salt和實際資料集,生成的資料集可能不會被完全屏蔽。 如果值在不同的地方或系統中進行散列,則應使用散列政策版本 2 來確定保真度。

column.propagate.source.type

不适用 一個可選的,以逗号分隔的正規表達式清單,比對列的完全限定名稱,其原始類型和長度應作為參數添加到發出的更改事件記錄中的相應字段模式。這些架構參數:

__Debezium.source.column.type``__Debezium.source.column.length``__Debezium.source.column.scale

分别用于傳播可變寬度類型的原始類型名稱和長度。這對于正确調整接收器資料庫中相應列的大小很有用。列的完全限定名稱是以下形式之一:資料庫名稱。表名。列名**資料庫名稱。架構名稱。表名。列名

datatype.propagate.source.type

不适用 一個可選的、以逗号分隔的正規表達式清單,它與列的特定于資料庫的資料類型名稱相比對,其原始類型和長度應作為參數添加到發出的更改事件記錄中的相應字段模式中。這些架構參數:

__debezium.source.column.type``__debezium.source.column.length``__debezium.source.column.scale

分别用于傳播可變寬度類型的原始類型名稱和長度。這對于正确調整接收器資料庫中相應列的大小很有用。完全限定的資料類型名稱是以下形式之一:資料庫名稱。表名。類型名稱**資料庫名稱。架構名稱。表名。類型名稱檢視MySQL 連接配接器如何映射資料類型以擷取 MySQL 特定的資料類型名稱清單。

time.precision.mode

adaptive_time_microseconds

時間、日期和時間戳可以用不同類型的精度表示,包括:(

adaptive_time_microseconds

預設)根據資料庫列的類型使用毫秒、微秒或納秒精度值精确地捕獲資料庫中的日期、日期時間和時間戳值, TIME 類型字段除外,它們始終以微秒為機關捕獲。

adaptive

(已棄用)根據資料庫列的類型,使用毫秒、微秒或納秒精度值精确捕獲資料庫中的時間和時間戳值。

connect

始終使用 Kafka Connect 的時間、日期和時間戳的内置表示來表示時間和時間戳值,無論資料庫列的精度如何,它們都使用毫秒精度。

decimal.handling.mode

precise

指定連接配接器應如何處理

DECIMAL

NUMERIC

precise

的值:(預設)使用

java.math.BigDecimal

二進制形式的更改事件中表示的值精确地表示它們。

double

使用

double

值表示它們,這可能會導緻精度損失,但更易于使用。

string

将值編碼為格式化字元串,這很容易使用,但有關真實類型的語義資訊會丢失。

bigint.unsigned.handling.mode

long

指定 BIGINT UNSIGNED 列應如何在更改事件中表示。可能的設定是:

long

使用 Java 表示值

long

,這可能無法提供精度,但在消費者中易于使用。

long

通常是首選設定。

precise

用于表示值,這些值使用二進制表示和 Kafka Connect 的類型

java.math.BigDecimal

編碼在更改事件中。

org.apache.kafka.connect.data.Decimal

處理大于 2^63 的值時使用此設定,因為這些值無法通過使用

long

.

include.schema.changes

true

布爾值,指定連接配接器是否應将資料庫架構中的更改釋出到與資料庫伺服器 ID 同名的 Kafka 主題。通過使用包含資料庫名稱并且其值包含 DDL 語句的鍵來記錄每個架構更改。這與連接配接器内部記錄資料庫曆史的方式無關。

include.schema.comments

false

布爾值,指定連接配接器是否應解析和釋出中繼資料對象的表和列注釋。啟用此選項将對記憶體使用産生影響。邏輯模式對象的數量和大小在很大程度上影響了 Debezium 連接配接器消耗的記憶體量,并且向它們中的每一個添加潛在的大字元串資料可能會非常昂貴。

include.query

false

布爾值,指定連接配接器是否應包含生成更改事件的原始 SQL 查詢。 如果您将此選項設定為,那麼您還必須使用設定為的選項

true

配置 MySQL 。當是時,對于快照過程生成的事件不存在查詢。 設定為可能會公開通過在更改事件中包含原始 SQL 語句而顯式排除或屏蔽的表或字段。是以,預設設定為。

binlog_rows_query_log_events``ON``include.query``true

include.query``true``false

event.deserialization.failure.handling.mode

fail

指定連接配接器在反序列化二進制日志事件期間應如何對異常做出反應。

fail

傳播異常,該異常訓示有問題的事件及其二進制日志偏移量,并導緻連接配接器停止。

warn

記錄有問題的事件及其二進制日志偏移量,然後跳過該事件。

ignore

跳過有問題的事件并且不記錄任何内容。

inconsistent.schema.handling.mode

fail

指定連接配接器應如何對與内部模式表示中不存在的表相關的二進制日志事件作出反應。即内部表示與資料庫不一緻。

fail

抛出一個異常,訓示有問題的事件及其二進制日志偏移量,并導緻連接配接器停止。

warn

記錄有問題的事件及其二進制日志偏移量并跳過該事件。

skip

跳過有問題的事件并且不記錄任何内容。

max.batch.size

2048

正整數值,指定在此連接配接器的每次疊代期間應處理的每批事件的最大大小。預設為 2048。

max.queue.size

8192

正整數值,指定阻塞隊列可以儲存的最大記錄數。當 Debezium 讀取從資料庫流式傳輸的事件時,它會将事件放入阻塞隊列中,然後再将它們寫入 Kafka。在連接配接器接收消息的速度快于将消息寫入 Kafka 的速度或 Kafka 不可用時,阻塞隊列可以為從資料庫讀取更改事件提供背壓。當連接配接器定期記錄偏移量時,将忽略隊列中儲存的事件。始終将 的值設定

max.queue.size

為大于 的值

max.batch.size

max.queue.size.in.bytes

一個長整數值,指定阻塞隊列的最大容量(以位元組為機關)。預設情況下,沒有為阻塞隊列指定卷限制。要指定隊列可以使用的位元組數,請将此屬性設定為正長值。 如果

max.queue.size

也設定了,當隊列的大小達到任一屬性指定的限制時,寫入隊列将被阻止。例如,如果設定

max.queue.size=1000

, 和

max.queue.size.in.bytes=5000

,則在隊列包含 1000 條記錄或隊列中的記錄量達到 5000 位元組後阻止寫入隊列。

poll.interval.ms

1000

正整數值,指定連接配接器在開始處理一批事件之前應等待新更改事件出現的毫秒數。預設為 1000 毫秒或 1 秒。

connect.timeout.ms

30000

一個正整數值,指定此連接配接器在嘗試連接配接到 MySQL 資料庫伺服器後在逾時之前應等待的最長時間(以毫秒為機關)。預設為 30 秒。

gtid.source.includes

無預設值 與 GTID 集中的源 UUID 比對的正規表達式的逗号分隔清單,用于查找 MySQL 伺服器中的 binlog 位置。僅使用具有與其中一種包含模式比對的源的 GTID 範圍。不要同時指定

gtid.source.excludes

.

gtid.source.excludes

無預設值 與 GTID 集中的源 UUID 比對的正規表達式的逗号分隔清單,用于查找 MySQL 伺服器中的 binlog 位置。僅使用具有不比對任何這些排除模式的源的 GTID 範圍。不要同時為 指定值

gtid.source.includes

gtid.new.channel.position

已棄用并計劃移除

earliest

當設定為

latest

時,當連接配接器看到一個新的 GTID 通道時,它會從該 GTID 通道中最後執行的事務開始消費。如果設定為

earliest

(預設),則連接配接器從第一個可用(未清除)GTID 位置開始讀取該通道。

earliest

當您使用 Debezium 連接配接到主伺服器的主動-被動 MySQL 設定時非常有用。在這種情況下,在故障轉移期間,具有新 UUID(和 GTID 通道)的副本在連接配接 Debezium 之前開始接收寫入。使用

latest

.

tombstones.on.delete

true

控制删除事件後是否有墓碑事件。

true

- 删除操作由删除事件和随後的墓碑事件表示。

false

- 隻發出一個删除事件。 删除源記錄後,發出 tombstone 事件(預設行為)允許 Kafka 完全删除與已删除行的鍵相關的所有事件,以防主題啟用日志壓縮。

message.key.columns

不适用 一個表達式清單,指定連接配接器用于形成自定義消息鍵的列,以形成它釋出到指定表的 Kafka 主題的更改事件記錄。預設情況下,Debezium 使用表的主鍵列作為它發出的記錄的消息鍵。代替預設值,或為缺少主鍵的表指定鍵,您可以基于一個或多個列配置自定義消息鍵。 要為表建立自定義消息鍵,請列出表,然後列出用作消息鍵的列。每個清單條目采用以下格式: 要基于多個列名的表鍵,請在列名之間插入逗号。

*<fully-qualified_tableName>*:_<keyColumn>_,*<keyColumn>*

每個完全限定的表名都是以下格式的正規表達式: 該屬性可以包含多個表的條目。使用分号分隔清單中的表條目。 下面的示例設定表和的消息鍵:對于 表,列和指定為消息鍵。對于任何資料庫中的表,列和伺服器作為消息鍵。

*<databaseName>*.*<tableName>*

inventory.customers``purchase.orders

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

inventory.customer``pk1``pk2``purchaseorders``pk3``pk4

用于建立自定義消息鍵的列數沒有限制。但是,最好使用指定唯一鍵所需的最小數量。

binary.handling.mode

位元組 指定如何在更改事件中表示二進制列,例如 、、

blob

。可能的設定:将二進制資料表示為位元組數組。将二進制資料表示為 base64 編碼的字元串。将二進制資料表示為十六進制編碼 (base16) 字元串。

binary``varbinary

bytes

base64

hex

schema.name.adjustment.mode

歐元 指定應如何調整架構名稱以與連接配接器使用的消息轉換器相容。可能的設定:

avro

用下劃線替換不能在 Avro 類型名稱中使用的字元。

none

不應用任何調整。

進階 MySQL 連接配接器配置屬性

下表描述了進階 MySQL 連接配接器屬性。這些屬性的預設值很少需要更改。是以,您無需在連接配接器配置中指定它們。

财産 預設 描述

connect.keep.alive

true

一個布爾值,指定是否應使用單獨的線程來確定與 MySQL 伺服器/叢集的連接配接保持活動狀态。

converters

無預設值 枚舉連接配接器可以使用的自定義轉換器執行個體的符号名稱的逗号分隔清單。 例如,

boolean

。 此屬性是使連接配接器能夠使用自定義轉換器所必需的。對于您為連接配接器配置的每個轉換器,您還必須添加一個

.type

屬性,該屬性指定實作轉換器接口的類的完全限定名稱。該

.type

屬性使用以下格式:

*<converterSymbolicName>*.type

例如,

boolean.type:io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter

如果要進一步控制已配置轉換器的行為,可以添加一個或多個配置參數以将值傳遞給轉換器。要将這些附加配置參數與轉換器相關聯,請在參數名稱前加上轉換器的符号名稱。 例如,要定義一個

selector

指定

boolean

轉換器處理的列子集的參數,請添加以下屬性:

boolean.selector=db1.table1.*, db1.table2.column1

table.ignore.builtin

true

一個布爾值,指定是否應忽略内置系統表。無論表包含和排除清單如何,這都适用。預設情況下,系統表不會被捕獲其更改,并且在對任何系統表進行更改時不會生成任何事件。

database.ssl.mode

disabled

指定是否使用加密連接配接。可能的設定有:

disabled

指定使用未加密的連接配接。

preferred

如果伺服器支援安全連接配接,則建立加密連接配接。如果伺服器不支援安全連接配接,則回退到未加密的連接配接。

required

建立加密連接配接,如果因任何原因無法建立連接配接,則失敗。

verify_ca

行為類似

required

,但另外它會根據配置的證書頒發機構 (CA) 證書驗證伺服器 TLS 證書,如果伺服器 TLS 證書與任何有效的 CA 證書不比對,則會失敗。

verify_identity

行為類似

verify_ca

,但另外驗證伺服器證書是否與遠端連接配接的主機比對。

binlog.buffer.size

二進制日志閱讀器使用的前瞻緩沖區的大小。預設設定 禁用緩沖。 在特定情況下,MySQL binlog 中可能包含

ROLLBACK

語句完成的未送出資料。典型示例是在單個事務中使用儲存點或混合臨時和正常表更改。 當檢測到事務開始時,Debezium 會嘗試前滾 binlog 位置并找到

COMMIT

ROLLBACK

是以它可以确定是否從事務中流式傳輸更改。binlog 緩沖區的大小定義了 Debezium 在搜尋事務邊界時可以緩沖的事務中的最大更改數。如果事務的大小大于緩沖區,則 Debezium 必須在流式傳輸時回退并重新讀取未放入緩沖區的事件。 注意:此功能正在孵化。鼓勵回報。預計此功能尚未完全完善。

snapshot.mode

initial

指定連接配接器啟動時運作快照的條件。可能的設定有:

initial

- 僅當沒有為邏輯伺服器名稱記錄偏移時,連接配接器才運作快照。

initial_only

- 連接配接器僅在沒有記錄邏輯伺服器名稱的偏移量時運作快照,然後停止;即它不會從 binlog 中讀取更改事件。

when_needed

- 連接配接器在它認為有必要時在啟動時運作快照。也就是說,當沒有可用的偏移量時,或者當先前記錄的偏移量指定了伺服器中不可用的 binlog 位置或 GTID 時。

never

- 連接配接器從不使用快照。首次使用邏輯伺服器名稱啟動時,連接配接器從 binlog 的開頭讀取。謹慎配置此行為。隻有當 binlog 保證包含資料庫的全部曆史時才有效。

schema_only

- 連接配接器運作模式而不是資料的快照。當您不需要主題包含資料的一緻快照但需要它們僅具有自連接配接器啟動以來的更改時,此設定很有用。

schema_only_recovery

- 這是已捕獲更改的連接配接器的恢複設定。當您重新啟動連接配接器時,此設定可以恢複損壞或丢失的資料庫曆史主題。您可以定期設定它來“清理”一個意外增長的資料庫曆史主題。資料庫曆史主題需要無限保留。

snapshot.locking.mode

minimal

控制連接配接器是否持有全局 MySQL 讀鎖以及持有多長時間,這會在連接配接器執行快照時阻止對資料庫的任何更新。可能的設定有:

minimal

- 連接配接器僅對快照的初始部分持有全局讀鎖,在此期間連接配接器讀取資料庫模式和其他中繼資料。快照中的剩餘工作涉及從每個表中選擇所有行。連接配接器可以通過使用 REPEATABLE READ 事務以一緻的方式執行此操作。即使不再持有全局讀鎖并且其他 MySQL 用戶端正在更新資料庫,情況也是如此。

minimal_percona

- 連接配接器持有全局備份鎖僅用于連接配接器讀取資料庫模式和其他中繼資料的快照的初始部分。快照中的剩餘工作涉及從每個表中選擇所有行。連接配接器可以通過使用 REPEATABLE READ 事務以一緻的方式執行此操作。即使不再持有全局備份鎖并且其他 MySQL 用戶端正在更新資料庫,情況也是如此。此模式不會将表重新整理到磁盤,不會被長時間運作的讀取阻塞,并且僅在 Percona Server 中可用。

extended

- 在快照期間阻止所有寫入。如果有用戶端正在送出 MySQL 從 REPEATABLE READ 語義中排除的操作,請使用此設定。

none

- 防止連接配接器在快照期間擷取任何表鎖。雖然所有快照模式都允許使用此設定,但當且僅當在快照運作時沒有發生架構更改時使用它是安全的。對于使用 MyISAM 引擎定義的表,盡管在 MyISAM 擷取表鎖時設定了此屬性,但表仍将被鎖定。這種行為與擷取行級鎖的 InnoDB 引擎不同。

snapshot.include.collection.list

中指定的所有表

table.include.list

一個可選的、以逗号分隔的正規表達式清單,與

*<databaseName>.<tableName>*

要包含在快照中的表的完全限定名稱 ( ) 比對。指定的項目必須在連接配接器的

table.include.list

屬性中命名。

snapshot.mode

僅當連接配接器的屬性設定為 以外的值時,此屬性才會生效

never

。 此屬性不影響增量快照的行為。

snapshot.select.statement.overrides

無預設值 指定要包含在快照中的表行。如果您希望快照僅包含表中行的子集,請使用該屬性。此屬性僅影響快照。它不适用于連接配接器從日志中讀取的事件。該屬性包含格式為 的完全限定表名的逗号分隔清單

*<databaseName>.<tableName>*

。例如,

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

對于清單中的每個表,添加一個進一步的配置屬性,該屬性指定

SELECT

連接配接器在拍攝快照時要在表上運作的語句。指定的

SELECT

語句确定要包含在快照中的表行子集。使用以下格式指定此

SELECT

語句屬性的名稱:. 例如, 。 例子:

snapshot.select.statement.overrides.*<databaseName>*.*<tableName>*``snapshot.select.statement.overrides.customers.orders

如果您希望快照僅包含未軟删除的記錄,請從

customers.orders

包含軟删除列的表中添加以下屬性:

delete_flag``"snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"

在生成的快照中,連接配接器僅包含

delete_flag = 0

.

min.row.count.to.stream.results

1000

在快照期間,連接配接器會查詢連接配接器配置為捕獲更改的每個表。連接配接器使用每個查詢結果來生成包含該表中所有行的資料的讀取事件。此屬性确定 MySQL 連接配接器是将表的結果放入記憶體(速度快但需要大量記憶體)還是流式傳輸結果(可能較慢但适用于非常大的表)。此屬性的設定指定在連接配接器流式傳輸結果之前表必須包含的最小行數。 要跳過所有表大小檢查并始終在快照期間流式傳輸所有結果,請将此屬性設定為 。

heartbeat.interval.ms

控制連接配接器向 Kafka 主題發送心跳消息的頻率。預設行為是連接配接器不發送心跳消息。 心跳消息對于監視連接配接器是否從資料庫接收更改事件很有用。心跳消息可能有助于減少連接配接器重新啟動時需要重新發送的更改事件的數量。要發送心跳消息,請将此屬性設定為正整數,表示心跳消息之間的毫秒數。

heartbeat.topics.prefix

__debezium-heartbeat

控制連接配接器向其發送心跳消息的主題的名稱。主題名稱具有以下模式: heartbeat.topics.prefix。server.name 例如,如果資料庫伺服器名稱為

fulfillment

,則預設主題名稱為

__debezium-heartbeat.fulfillment

heartbeat.action.query

無預設值 指定連接配接器發送心跳消息時連接配接器在源資料庫上執行的查詢。 例如,這可用于定期捕獲源資料庫中設定的已執行 GTID 的狀态。

INSERT INTO gtid_history_table (select * from mysql.gtid_executed)

database.initial.statements

無預設值 建立與資料庫的 JDBC 連接配接(而不是讀取事務日志的連接配接)時要執行的 SQL 語句的分号分隔清單。要将分号指定為 SQL 語句中的字元而不是分隔符,請使用兩個分号 (

;;

)。 連接配接器可能會自行決定建立 JDBC 連接配接,是以該屬性僅用于配置會話參數。它不适用于執行 DML 語句。

snapshot.delay.ms

無預設值 連接配接器啟動時執行快照之前連接配接器應等待的時間間隔(以毫秒為機關)。如果您在叢集中啟動多個連接配接器,此屬性對于避免快照中斷很有用,這可能會導緻連接配接器重新平衡。

snapshot.fetch.size

無預設值 在快照期間,連接配接器分批讀取表内容。此屬性指定批進行中的最大行數。

snapshot.lock.timeout.ms

10000

正整數,指定執行快照時等待擷取表鎖的最長時間(以毫秒為機關)。如果連接配接器在此時間間隔内無法擷取表鎖,則快照失敗。了解MySQL 連接配接器如何執行資料庫快照。

enable.time.adjuster

true

訓示連接配接器是否将 2 位數年份規範轉換為 4 位數的布爾值。設定為何

false

時将轉換完全委托給資料庫。 MySQL 允許使用者插入 2 位或 4 位的年份值。對于 2 位值,該值将映射到 1970 - 2069 範圍内的年份。預設行為是連接配接器進行轉換。

source.struct.version

v2

source

Debezium 事件中塊的架構版本。Debezium 0.10 對塊的結構進行了一些重大更改,

source

以統一所有連接配接器的暴露結構。 通過将此選項設定為

v1

,可以生成早期版本中使用的結構。但是,不建議使用此設定,并計劃在未來的 Debezium 版本中删除。

sanitize.field.names

true

如果連接配接器配置将

key.converter

or

value.converter

屬性設定為 Avro 轉換器。

false

如果不。
訓示字段名稱是否經過清理以符合Avro 命名要求。

skipped.operations

無預設值 流式傳輸期間要跳過的操作類型的逗号分隔清單。以下值是可能的:

c

用于插入/建立、

u

用于更新、

d

用于删除。預設情況下,不會跳過任何操作。

signal.data.collection

無預設值 用于向連接配接器發送信号的資料集合的完全限定名稱。 使用以下格式指定集合名稱:

*<databaseName>*.*<tableName>*

incremental.snapshot.allow.schema.changes

false

在增量快照期間允許架構更改。啟用後,連接配接器将在增量快照期間檢測架構更改并重新選擇目前塊以避免鎖定 DDL。 請注意,不支援對主鍵的更改,如果在增量快照期間執行,可能會導緻不正确的結果。另一個限制是,如果架構更改僅影響列的預設值,則在從 binlog 流處理 DDL 之前不會檢測到更改。這不會影響快照事件的值,但快照事件的架構可能具有過時的預設值。

incremental.snapshot.chunk.size

1024

連接配接器在增量快照塊期間擷取并讀入記憶體的最大行數。增加塊大小提供了更高的效率,因為快照運作的快照查詢更少,但更大的大小。然而,更大的塊大小也需要更多的記憶體來緩沖快照資料。将塊大小調整為在您的環境中提供最佳性能的值。

read.only

false

切換到替代增量快照水印實作以避免寫入信号資料收集

provide.transaction.metadata

false

确定連接配接器是否生成具有事務邊界的事件并使用事務中繼資料豐富更改事件信封。指定

true

是否希望連接配接器執行此操作。有關詳細資訊,請參閱事務中繼資料。

transaction.topic

${database.server.name}.transaction

控制連接配接器向其發送事務中繼資料消息的主題的名稱。占位符

${database.server.name}

可用于引用連接配接器的邏輯名稱;預設為

${database.server.name}.transaction

,例如

dbserver1.transaction

Debezium 連接配接器資料庫曆史配置屬性

Debezium 提供了一組

database.history.*

屬性來控制連接配接器如何與模式曆史主題互動。

下表描述了

database.history

用于配置 Debezium 連接配接器的屬性。

财産 預設 描述

database.history.kafka.topic

連接配接器存儲資料庫架構曆史的 Kafka 主題的全名。

database.history.kafka.bootstrap.servers

連接配接器用于建立與 Kafka 叢集的初始連接配接的主機/端口對清單。此連接配接用于檢索連接配接器先前存儲的資料庫模式曆史記錄,并用于編寫從源資料庫讀取的每個 DDL 語句。每對都應該指向 Kafka Connect 程序使用的同一個 Kafka 叢集。

database.history.kafka.recovery.poll.interval.ms

100

一個整數值,指定連接配接器在啟動/恢複期間輪詢持久資料時應等待的最大毫秒數。預設值為 100 毫秒。

database.history.kafka.query.timeout.ms

3000

一個整數值,指定連接配接器在使用 Kafka 管理用戶端擷取叢集資訊時應等待的最大毫秒數。

database.history.kafka.recovery.attempts

4

在連接配接器恢複失敗并出現錯誤之前連接配接器應嘗試讀取持久曆史資料的最大次數。沒有收到資料後等待的最長時間為

recovery.attempts

x

recovery.poll.interval.ms

database.history.skip.unparseable.ddl

false

一個布爾值,指定連接配接器是否應忽略格式錯誤或未知的資料庫語句或停止處理,以便人們可以解決問題。安全的預設值為

false

. 跳過應該小心使用,因為它會在處理 binlog 時導緻資料丢失或損壞。

database.history.store.only.monitored.tables.ddl

已棄用并計劃在未來版本中删除;改為使用

database.history.store.only.captured.tables.ddl

false

一個布爾值,指定連接配接器是否應記錄所有 DDL 語句

true

僅記錄那些與 Debezium 正在捕獲其更改的表相關的 DDL 語句。

true

請謹慎設定,因為如果您更改哪些表已捕獲其更改,則可能需要丢失資料。 安全的預設值為

false

.

database.history.store.only.captured.tables.ddl

false

一個布爾值,指定連接配接器是否應記錄所有 DDL 語句

true

僅記錄那些與 Debezium 正在捕獲其更改的表相關的 DDL 語句。

true

請謹慎設定,因為如果您更改哪些表已捕獲其更改,則可能需要丢失資料。 安全的預設值為

false

.

用于配置生産者和消費者用戶端的直通資料庫曆史記錄屬性

Debezium 依賴 Kafka 生産者将模式更改寫入資料庫曆史主題。同樣,當連接配接器啟動時,它依賴于 Kafka 消費者從資料庫曆史主題中讀取。

database.history.producer.*

您可以通過将值配置設定給以和

database.history.consumer.*

字首開頭的一組傳遞配置屬性來定義 Kafka 生産者和消費者用戶端的配置。傳遞的生産者和消費者資料庫曆史屬性控制一系列行為,例如這些用戶端如何保護與 Kafka 代理的連接配接,如以下示例所示:

database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234

database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234
           

Debezium 在将屬性傳遞給 Kafka 用戶端之前從屬性名稱中去除字首。

有關Kafka 生産者配置屬性和Kafka 消費者配置屬性的更多詳細資訊,請參閱 Kafka 文檔。

Debezium 連接配接器 Kafka 信号配置屬性

當 MySQL 連接配接器配置為隻讀時,信号表的替代方案是信号 Kafka 主題。

Debezium 提供了一組

signal.*

屬性來控制連接配接器如何與 Kafka 信号主題互動。

下表描述了這些

signal

屬性。

财産 預設 描述

signal.kafka.topic

連接配接器監視即席信号的 Kafka 主題的名稱。

signal.kafka.bootstrap.servers

連接配接器用于建立與 Kafka 叢集的初始連接配接的主機/端口對清單。每對都應該指向 Kafka Connect 程序使用的同一個 Kafka 叢集。

signal.kafka.poll.timeout.ms

100

一個整數值,指定連接配接器在輪詢信号時應等待的最大毫秒數。預設值為 100 毫秒。

Debezium 連接配接器傳遞信号 Kafka 消費者用戶端配置屬性

Debezium 連接配接器提供信号 Kafka 消費者的直通配置。直通信号屬性以字首 開頭

signals.consumer.*

。例如,連接配接器将屬性傳遞

signal.consumer.security.protocol=SSL

給 Kafka 消費者。

與資料庫曆史用戶端的傳遞屬性一樣,Debezium 在将它們傳遞給 Kafka 信号消費者之前從屬性中去除字首。

Debezium 連接配接器直通資料庫驅動程式配置屬性

Debezium 連接配接器提供資料庫驅動程式的直通配置。直通資料庫屬性以字首 開頭

database.*

。例如,連接配接器将屬性傳遞

database.foobar=false

給 JDBC URL。

與資料庫曆史用戶端的傳遞屬性一樣,Debezium 在将它們傳遞給資料庫驅動程式之前從屬性中去除字首。

監控

除了 Zookeeper、Kafka 和 Kafka Connect 提供的對 JMX 名額的内置支援之外,Debezium MySQL 連接配接器還提供三種類型的名額。

  • 快照名額在執行快照時提供有關連接配接器操作的資訊。
  • 當連接配接器讀取二進制日志時,流式名額提供有關連接配接器操作的資訊。
  • 架構曆史名額提供有關連接配接器架構曆史狀态的資訊。

Debezium 監控文檔提供了有關如何使用 JMX 公開這些名額的詳細資訊。

快照名額

MBean是. _

debezium.mysql:type=connector-metrics,context=snapshot,server=*<mysql.server.name>*

除非快照操作處于活動狀态,或者自上次連接配接器啟動以來已發生快照,否則不會公開快照名額。

下表列出了可用的快照名額。

屬性 類型 描述

LastEvent

string

連接配接器讀取的最後一個快照事件。

MilliSecondsSinceLastEvent

long

自連接配接器讀取并處理最新事件以來的毫秒數。

TotalNumberOfEventsSeen

long

自上次啟動或重置以來,此連接配接器已看到的事件總數。

NumberOfEventsFiltered

long

已被連接配接器上配置的包含/排除清單過濾規則過濾的事件數。

CapturedTables

string[]

連接配接器捕獲的表清單。

QueueTotalCapacity

int

用于在快照程式和主 Kafka Connect 循環之間傳遞事件的隊列長度。

QueueRemainingCapacity

int

用于在快照程式和主 Kafka Connect 循環之間傳遞事件的隊列的可用容量。

TotalTableCount

int

快照中包含的表總數。

RemainingTableCount

int

快照尚未複制的表數。

SnapshotRunning

boolean

快照是否已啟動。

SnapshotAborted

boolean

快照是否已中止。

SnapshotCompleted

boolean

快照是否完成。

SnapshotDurationInSeconds

long

到目前為止,快照已拍攝的總秒數,即使未完成也是如此。

RowsScanned

Map<String, Long>

包含為快照中的每個表掃描的行數的映射。在處理過程中,表格會逐漸添加到地圖中。每掃描 10,000 行并在完成表時更新。

MaxQueueSizeInBytes

long

隊列的最大緩沖區(以位元組為機關)。

max.queue.size.in.bytes

如果設定為正長值,則此名額可用。

CurrentQueueSizeInBytes

long

隊列中的目前記錄量(以位元組為機關)。

執行增量快照時,連接配接器還提供以下附加快照名額:

屬性 類型 描述

ChunkId

string

目前快照塊的辨別符。

ChunkFrom

string

定義目前塊的主鍵集的下限。

ChunkTo

string

定義目前塊的主鍵集的上限。

TableFrom

string

目前快照表的主鍵集的下限。

TableTo

string

目前快照表的主鍵集的上限。

Debezium MySQL 連接配接器還提供

HoldingGlobalLock

自定義快照名額。該名額設定為一個布爾值,訓示連接配接器目前是否持有全局或表寫鎖。

流媒體名額

僅當啟用 binlog 事件緩沖時,事務相關屬性才可用。有關更多詳細資訊,請參閱

binlog.buffer.size

進階連接配接器配置屬性。

MBean是. _

debezium.mysql:type=connector-metrics,context=streaming,server=*<mysql.server.name>*

下表列出了可用的流式名額。

屬性 類型 描述

LastEvent

string

連接配接器讀取的最後一個流事件。

MilliSecondsSinceLastEvent

long

自連接配接器讀取并處理最新事件以來的毫秒數。

TotalNumberOfEventsSeen

long

自上次啟動或名額重置以來,此連接配接器已看到的事件總數。

TotalNumberOfCreateEventsSeen

long

自上次啟動或名額重置以來,此連接配接器已看到的建立事件總數。

TotalNumberOfUpdateEventsSeen

long

自上次啟動或名額重置以來,此連接配接器已看到的更新事件總數。

TotalNumberOfDeleteEventsSeen

long

自上次啟動或名額重置以來,此連接配接器已看到的删除事件總數。

NumberOfEventsFiltered

long

已被連接配接器上配置的包含/排除清單過濾規則過濾的事件數。

CapturedTables

string[]

連接配接器捕獲的表清單。

QueueTotalCapacity

int

用于在流媒體和主 Kafka Connect 循環之間傳遞事件的隊列長度。

QueueRemainingCapacity

int

隊列的空閑容量,用于在流媒體和 Kafka Connect 主循環之間傳遞事件。

Connected

boolean

表示連接配接器目前是否連接配接到資料庫伺服器的标志。

MilliSecondsBehindSource

long

上次更改事件的時間戳與處理它的連接配接器之間的毫秒數。這些值将包含運作資料庫伺服器和連接配接器的機器上的時鐘之間的任何差異。

NumberOfCommittedTransactions

long

已送出的已處理事務數。

SourceEventPosition

Map<String, String>

最後接收到的事件的坐标。

LastTransactionId

string

最後處理的事務的事務辨別符。

MaxQueueSizeInBytes

long

隊列的最大緩沖區(以位元組為機關)。

max.queue.size.in.bytes

如果設定為正長值,則此名額可用。

CurrentQueueSizeInBytes

long

隊列中的目前記錄量(以位元組為機關)。

Debezium MySQL 連接配接器還提供以下額外的流媒體名額:

屬性 類型 描述

BinlogFilename

string

連接配接器最近讀取的 binlog 檔案的名稱。

BinlogPosition

long

連接配接器已讀取的 binlog 中的最新位置(以位元組為機關)。

IsGtidModeEnabled

boolean

表示連接配接器目前是否正在跟蹤來自 MySQL 伺服器的 GTID 的标志。

GtidSet

string

連接配接器在讀取 binlog 時處理的最新 GTID 集的字元串表示形式。

NumberOfSkippedEvents

long

MySQL 連接配接器已跳過的事件數。通常,由于 MySQL 二進制日志中的格式錯誤或不可解析的事件,事件會被跳過。

NumberOfDisconnects

long

MySQL 連接配接器斷開連接配接的次數。

NumberOfRolledBackTransactions

long

已復原且未流式傳輸的已處理事務的數量。

NumberOfNotWellFormedTransactions

long

不符合

BEGIN

+

COMMIT

/預期協定的事務數

ROLLBACK

。這個值應該 在正常情況下。

NumberOfLargeTransactions

long

未放入預讀緩沖區的事務數。為獲得最佳性能,該值應明顯小于

NumberOfCommittedTransactions

NumberOfRolledBackTransactions

架構曆史名額

MBean是. _

debezium.mysql:type=connector-metrics,context=schema-history,server=*<mysql.server.name>*

下表列出了可用的架構曆史名額。

屬性 類型 描述

Status

string

之一

STOPPED

RECOVERING

從存儲中恢複曆史),

RUNNING

描述資料庫曆史的狀态。

RecoveryStartTime

long

恢複開始的時間(以紀元秒為機關)。

ChangesRecovered

long

在恢複階段讀取的更改數。

ChangesApplied

long

在恢複和運作時應用的架構更改總數。

MilliSecondsSinceLastRecoveredChange

long

自上次更改從曆史存儲中恢複以來經過的毫秒數。

MilliSecondsSinceLastAppliedChange

long

自應用上次更改以來經過的毫秒數。

LastRecoveredChange

string

從曆史存儲中恢複的最後一次更改的字元串表示形式。

LastAppliedChange

string

最後應用更改的字元串表示形式。

出現問題時的行為

Debezium 是一個分布式系統,可以捕獲多個上遊資料庫中的所有更改;它永遠不會錯過或丢失事件。當系統正常運作或被仔細管理時,Debezium 提供每個更改事件記錄的一次傳遞。

如果确實發生故障,則系統不會丢失任何事件。但是,當它從故障中恢複時,它可能會重複一些更改事件。在這些異常情況下,Debezium 和 Kafka 一樣,至少提供一次更改事件的傳遞。

本節的其餘部分描述了 Debezium 如何處理各種故障和問題。

配置和啟動錯誤

在以下情況下,連接配接器嘗試啟動失敗,在日志中報告錯誤或異常,并停止運作:

  • 連接配接器的配置無效。
  • 連接配接器無法使用指定的連接配接參數成功連接配接到 MySQL 伺服器。
  • 連接配接器嘗試在 binlog 中 MySQL 不再具有可用曆史記錄的位置重新啟動。

在這些情況下,錯誤消息包含有關問題的詳細資訊以及可能的建議解決方法。更正配置或解決 MySQL 問題後,重新啟動連接配接器。

MySQL變得不可用

如果您的 MySQL 伺服器不可用,Debezium MySQL 連接配接器将失敗并出現錯誤并且連接配接器停止。當伺服器再次可用時,重新啟動連接配接器。

但是,如果為高可用性 MySQL 叢集啟用了 GTID,您可以立即重新啟動連接配接器。它将連接配接到叢集中的不同 MySQL 伺服器,在伺服器的 binlog 中找到代表最後一個事務的位置,并開始從該特定位置讀取新伺服器的 binlog。

如果未啟用 GTID,則連接配接器僅記錄它所連接配接的 MySQL 伺服器的 binlog 位置。要從正确的 binlog 位置重新啟動,您必須重新連接配接到該特定伺服器。

Kafka Connect 優雅停止

當 Kafka Connect 正常停止時,在 Debezium MySQL 連接配接器任務停止并在新的 Kafka Connect 程序上重新啟動時會有短暫的延遲。

Kafka Connect 程序崩潰

如果 Kafka Connect 崩潰,程序會停止并且任何 Debezium MySQL 連接配接器任務都會終止,而不會記錄它們最近處理的偏移量。在分布式模式下,Kafka Connect 會重新啟動其他程序上的連接配接器任務。但是,MySQL 連接配接器從早期程序記錄的最後一個偏移量恢複。這意味着替換任務可能會生成一些在崩潰之前處理的相同僚件,進而建立重複事件。

每個更改事件消息都包含特定于源的資訊,您可以使用這些資訊來識别重複事件,例如:

  • 事件起源
  • MySQL 伺服器的事件時間
  • binlog 檔案名和位置
  • GTID(如果使用)

卡夫卡變得不可用

Kafka Connect 架構使用 Kafka 生産者 API 記錄 Kafka 中的 Debezium 更改事件。如果 Kafka 代理變得不可用,則 Debezium MySQL 連接配接器會暫停,直到連接配接重建立立并且連接配接器從中斷的地方恢複。

MySQL 清除 binlog 檔案

如果 Debezium MySQL 連接配接器停止的時間過長,MySQL 伺服器會清除舊的 binlog 檔案,連接配接器的最後位置可能會丢失。當連接配接器重新啟動時,MySQL 伺服器不再具有起始點,連接配接器執行另一個初始快照。如果禁用快照,則連接配接器将失敗并出現錯誤。

有關 MySQL 連接配接器如何執行初始快照的詳細資訊,請參閱快照。

繼續閱讀