天天看點

用于 MongoDB 的 Debezium 連接配接器用于 MongoDB 的 Debezium 連接配接器

用于 MongoDB 的 Debezium 連接配接器

Debezium 的 MongoDB 連接配接器跟蹤 MongoDB 副本集或 MongoDB 分片叢集以查找資料庫和集合中的文檔更改,并将這些更改記錄為 Kafka 主題中的事件。連接配接器自動處理分片叢集中分片的添加或删除、每個副本內建員資格的更改、每個副本集中的選舉以及等待通信問題的解決。

有關與此連接配接器相容的 MongoDB 版本的資訊,請參閱Debezium 版本概述。

概述

MongoDB 的複制機制提供了備援和高可用性,是在生産環境中運作 MongoDB 的首選方式。MongoDB 連接配接器捕獲副本集或分片叢集中的更改。

MongoDB副本集由一組伺服器組成,這些伺服器都具有相同資料的副本,并且複制可確定用戶端對副本集主節點上的文檔所做的所有更改都正确應用于另一個副本集的伺服器,稱為輔助節點。MongoDB 複制的工作原理是讓主節點在其oplog(或記錄檔)中記錄更改,然後每個輔助節點讀取主節點的 oplog 并按順序将所有操作應用于自己的文檔。将新伺服器添加到副本集時,該伺服器首先執行快照主節點上的所有資料庫和集合,然後讀取主節點的 oplog 以應用自快照開始以來可能已進行的所有更改。當這個新伺服器趕上主伺服器 oplog 的尾部時,它成為輔助伺服器(并且能夠處理查詢)。

MongoDB 連接配接器支援兩種不同的模式來捕獲由

capture.mode

選項控制的更改:

  • 基于oplog
  • 基于更改流

Oplog 捕獲模式(舊版)

Debezium MongoDB 連接配接器使用與上述相同的複制機制,盡管它實際上并沒有成為副本集的成員。然而,就像 MongoDB 輔助節點一樣,連接配接器總是讀取副本集主節點的 oplog。而且,當連接配接器第一次看到副本集時,它會檢視 oplog 以擷取最後記錄的事務,然後執行主資料庫和集合的快照。複制所有資料後,連接配接器從之前從 oplog 讀取的位置開始流式傳輸更改。MongoDB oplog 中的操作是幂等的,是以無論操作多少次,它們都會導緻相同的結束狀态。

這種模式的缺點是隻有插入更改事件會包含整個文檔,而更新事件隻包含更改字段的表示(即無法從更新事件中擷取未修改的字段),而删除事件不包含已删除文檔的表示除了它的鑰匙。

這種模式應該被視為傳統模式。MongoDB 5 不支援它,強烈建議使用者不要将它用于 MongoDB 4.x 伺服器。

更改流模式

Debezium MongoDB 連接配接器使用與上述類似的複制機制,盡管它實際上并沒有成為副本集的成員。主要差別在于連接配接器不直接讀取 oplog,而是将捕獲和解碼 oplog 委托給 MongoDB 的Change Streams功能。使用更改流,MongoDB 伺服器将更改作為事件流公開給集合。Debezium 連接配接器監視流并将更改傳遞到下遊。而且,當連接配接器第一次看到副本集時,它會檢視 oplog 以擷取最後記錄的事務,然後執行主資料庫和集合的快照。當所有資料都被複制後,連接配接器然後從它之前從 oplog 讀取的位置建立一個更改流。

這是從 MongoDB 4.x 開始的推薦模式。

兩種捕獲模式都使用存儲在偏移量中的不同值,允許它們從連接配接器重新啟動後看到的最後一個位置恢複流式傳輸。是以,不可能從更改流模式切換到 oplog 模式。為防止任何意外捕獲模式更改,連接配接器具有内置安全檢查功能。當連接配接器啟動時,它會檢查存儲的偏移量。如果原始捕獲模式是基于 oplog 的,而新模式是基于變化流的,那麼它将嘗試遷移到變化流。如果原始捕獲模式是基于更改流的,它将繼續使用更改流,如果新模式是基于 oplog 的,則将向日志發出關于此的警告。

随着 MongoDB 連接配接器處理更改,它會定期記錄事件起源的 oplog/stream 中的位置。當連接配接器停止時,它會記錄它處理的最後一個 oplog/stream 位置,以便在重新啟動時它隻是從該位置開始流式傳輸。換句話說,連接配接器可以停止、更新或維護,并在一段時間後重新啟動,它将準确地從中斷的地方重新開始,而不會丢失任何一個事件。當然,MongoDB 的 oplog 通常有一個最大大小的上限,這意味着連接配接器不應停止太久,否則 oplog 中的某些操作可能會在連接配接器有機會讀取它們之前被清除。在這種情況下,重新啟動連接配接器将檢測丢失的 oplog 操作,執行快照,然後繼續流式傳輸更改。

MongoDB 連接配接器還可以容忍副本集的成員資格和上司力的變化、分片叢集中分片的添加或删除以及可能導緻通信失敗的網絡問題。連接配接器始終使用副本集的主節點流式傳輸更改,是以當副本集進行選舉并且不同的節點成為主節點時,連接配接器将立即停止流式傳輸更改,連接配接到新的主節點,并使用新的主節點開始流式傳輸更改節點。同樣,如果連接配接器在與主副本集通信時遇到任何問題,它将嘗試重新連接配接(使用指數退避以避免壓倒網絡或副本集)并從上次中斷的地方繼續流式傳輸更改。

其他資源

  • 複制機制
  • 副本集
  • 副本集選舉
  • 分片叢集
  • 分片添加
  • 碎片清除
  • 更改流

MongoDB 連接配接器的工作原理

連接配接器支援的 MongoDB 拓撲的概述對于規劃您的應用程式很有用。

配置和部署 MongoDB 連接配接器時,它首先連接配接到種子位址處的 MongoDB 伺服器,并确定每個可用副本集的詳細資訊。由于每個副本集都有自己獨立的 oplog,連接配接器将嘗試為每個副本集使用單獨的任務。連接配接器可以限制它将使用的最大任務數,如果沒有足夠的任務可用,連接配接器将為每個任務配置設定多個副本集,盡管任務仍将為每個副本集使用單獨的線程。

對分片叢集運作連接配接器時,使用的值

tasks.max

大于副本集的數量。這将允許連接配接器為每個副本集建立一個任務,并讓 Kafka Connect 跨所有可用工作程序協調、分發和管理任務。

支援的 MongoDB 拓撲

MongoDB 連接配接器支援以下 MongoDB 拓撲:

  • MongoDB副本集

    Debezium MongoDB 連接配接器可以捕獲來自單個MongoDB 副本集的更改。生産副本集至少需要三個成員。要将 MongoDB 連接配接器與副本集一起使用,請通過連接配接器的屬性提供一個或多個副本集伺服器的位址作為種子位址。

    mongodb.hosts

    連接配接器将使用這些種子連接配接到副本集,然後一旦連接配接,将從副本集中擷取完整的成員集以及哪個成員是主成員。連接配接器将啟動一個任務以連接配接到主節點并從主節點的 oplog 中捕獲更改。當副本集選出新的主節點時,任務會自動切換到新的主節點。當 MongoDB 以代理為前端時(例如在 OS X 或 Windows 上使用 Docker),當用戶端連接配接到副本集并發現成員時,MongoDB 用戶端會将代理排除為有效成員,并嘗試并失敗直接連接配接到成員,而不是通過代理。在這種情況下,将連接配接器的可選

    mongodb.members.auto.discover

    配置屬性設定為

    false

    以訓示連接配接器放棄成員資格發現,而是簡單地使用第一個種子位址(通過

    mongodb.hosts

    屬性指定)作為主節點。這可能有效,但在選舉發生時仍然會導緻問題。
  • MongoDB分片叢集

    一個MongoDB 分片叢集包括:一個或多個分片,每個分片部署為一個副本集;一個單獨的副本集,充當叢集的配置伺服器用戶端連接配接的一個或多個路由器(也稱為

    mongos

    )并将請求路由到适當的分片要将 MongoDB 連接配接器與分片叢集一起使用,請使用配置伺服器副本集的主機位址配置連接配接器。當連接配接器連接配接到這個副本集時,它發現它正在充當分片叢集的配置伺服器,發現叢集中用作分片的每個副本集的資訊,然後将啟動一個單獨的任務來捕獲每個副本集的變化。如果将新分片添加到叢集或移除現有分片,連接配接器将自動相應地調整其任務。
  • MongoDB獨立伺服器

    MongoDB 連接配接器無法監控獨立 MongoDB 伺服器的更改,因為獨立伺服器沒有 oplog。如果将獨立伺服器轉換為具有一個成員的副本集,則連接配接器将起作用。

MongoDB 不建議在生産環境中運作獨立伺服器。有關更多資訊,請參閱MongoDB 文檔。

邏輯連接配接器名稱

連接配接器配置屬性

mongodb.name

用作MongoDB 副本集或分片叢集的*邏輯名稱。*連接配接器以多種方式使用邏輯名稱:作為所有主題名稱的字首,以及在記錄每個副本集的 oplog/change 流位置時作為唯一辨別符。

您應該為每個 MongoDB 連接配接器指定一個唯一的邏輯名稱,以有意義地描述源 MongoDB 系統。我們建議邏輯名稱以字母或下劃線字元開頭,其餘字元以字母數字或下劃線開頭。

執行快照

當任務使用副本集啟動時,它使用連接配接器的邏輯名稱和副本集名稱來查找描述連接配接器先前停止讀取更改的位置的*偏移量。*如果可以找到偏移量并且它仍然存在于 oplog 中,則任務立即進行流式更改,從記錄的偏移量位置開始。

但是,如果沒有找到偏移量或 oplog 不再包含該位置,則任務必須首先通過執行快照擷取副本集内容的目前狀态。此過程首先記錄 oplog 的目前位置并将其記錄為偏移量(以及表示快照已啟動的标志)。然後該任務将繼續複制每個集合,生成盡可能多的線程(直到

snapshot.max.threads

配置屬性的值)以并行執行此工作。連接配接器将為它看到的每個文檔記錄一個單獨的讀取事件,該讀取事件将包含對象的辨別符、對象的完整狀态和源有關找到對象的 MongoDB 副本集的資訊。源資訊還将包括一個标志,表示該事件是在快照期間産生的。

此快照将繼續,直到它複制了與連接配接器的過濾器比對的所有集合。如果連接配接器在任務的快照完成之前停止,則在重新啟動時連接配接器會再次開始快照。

在連接配接器執行任何副本集的快照時,盡量避免任務重新配置設定和重新配置。連接配接器生成日志消息以報告快照的進度。為了提供最大的控制,請為每個連接配接器運作一個單獨的 Kafka Connect 叢集。

即席快照

預設情況下,連接配接器僅在首次啟動後才運作初始快照操作。在這個初始快照之後,在正常情況下,連接配接器不會重複快照過程。連接配接器捕獲的任何未來更改事件資料僅通過流式處理進入。

但是,在某些情況下,連接配接器在初始快照期間獲得的資料可能會變得陳舊、丢失或不完整。為了提供一種重新捕獲收集資料的機制,Debezium 包含一個執行臨時快照的選項。資料庫中的以下更改可能會導緻執行臨時快照:

  • 修改連接配接器配置以捕獲一組不同的集合。
  • Kafka 主題被删除,必須重建。
  • 由于配置錯誤或其他問題而發生資料損壞。

您可以通過啟動所謂的ad-hoc 快照為之前捕獲快照的集合重新運作快照。即席快照需要使用信号集合。您可以通過向 Debezium 信号集合發送信号請求來啟動臨時快照。

當您啟動現有集合的臨時快照時,連接配接器會将内容附加到集合已存在的主題中。如果删除了以前存在的主題,如果啟用了自動主題建立,Debezium 可以自動建立主題。

即席快照信号指定要包含在快照中的集合。快照可以捕獲資料庫的全部内容,或僅捕獲資料庫中集合的子集。

execute-snapshot

您可以通過向信令集合發送消息來指定要捕獲的集合。将

execute-snapshot

信号的類型設定為

incremental

,并提供要包含在快照中的集合的名稱,如下表所述:

場地 預設 價值

type

incremental

指定要運作的快照類型。 設定類型是可選的。目前,您隻能請求

incremental

快照。

data-collections

不适用 一個數組,其中包含要生成快照的集合的完全限定名稱。名稱的格式與配置選項 的格式相同。

signal.data.collection

觸發臨時快照

execute-snapshot

您可以通過将具有信号類型的條目添加到信令集合來啟動臨時快照。連接配接器處理完消息後,将開始快照操作。快照程序讀取第一個和最後一個主鍵值,并将這些值用作每個集合的起點和終點。根據集合中的條目數和配置的塊大小,Debezium 将集合劃分為塊,并繼續對每個塊進行快照,一次一個。

目前,

execute-snapshot

操作類型僅觸發增量快照。有關詳細資訊,請參閱增量快照。

增量快照

為了提供管理快照的靈活性,Debezium 包含一個補充快照機制,稱為增量快照。增量快照依靠 Debezium 機制向 Debezium 連接配接器發送信号。增量快照基于DDD-3設計文檔。

在增量快照中,Debezium 不是像在初始快照中那樣一次捕獲資料庫的完整狀态,而是在一系列可配置的塊中分階段捕獲每個集合。您可以指定您希望快照捕獲的集合以及每個塊的大小。塊大小決定了快照在資料庫上的每次提取操作期間收集的行數。增量快照的預設塊大小為 1 KB。

随着增量快照的進行,Debezium 使用水印來跟蹤其進度,維護它捕獲的每個集合行的記錄。與标準初始快照過程相比,這種分階段捕獲資料的方法具有以下優勢:

  • 您可以在流式資料捕獲的同時運作增量快照,而不是将流式傳輸推遲到快照完成。連接配接器在整個快照過程中繼續從更改日志中捕獲近乎實時的事件,并且兩個操作都不會阻塞另一個操作。
  • 如果增量快照的進度中斷,您可以恢複它而不會丢失任何資料。程序恢複後,快照從它停止的點開始,而不是從頭重新捕獲集合。
  • 您可以随時按需運作增量快照,并根據需要重複該過程以适應資料庫更新。例如,您可以在修改連接配接器配置以将集合添加到其

    collection.include.list

    屬性後重新運作快照。

增量快照過程

當您運作增量快照時,Debezium 按主鍵對每個集合進行排序,然後根據配置的塊大小将集合拆分為塊。逐塊工作,然後捕獲塊中的每個集合行。對于它捕獲的每一行,快照都會發出一個

READ

事件。該事件表示塊的快照開始時行的值。

随着快照的進行,其他程序可能會繼續通路資料庫,進而可能會修改集合記錄。為反映此類更改,

INSERT

UPDATE

DELETE

操作将照常送出到事務日志。同樣,正在進行的 Debezium 流式處理繼續檢測這些更改事件并将相應的更改事件記錄發送到 Kafka。

Debezium 如何解決具有相同主鍵的記錄之間的沖突

在某些情況下,流式處理發出的

UPDATE

DELETE

事件被亂序接收。也就是說,流式處理可能會在快照捕獲包含該行的

READ

事件的塊之前發出一個修改集合行的事件。當快照最終為該行發出相應的

READ

事件時,它的值已經被取代。為了確定以正确的邏輯順序處理亂序到達的增量快照事件,Debezium 采用了一種緩沖方案來解決沖突。隻有在解決了快照事件和流事件之間的沖突後,Debezium 才會向 Kafka 發出事件記錄。

快照視窗

為了幫助解決延遲到達事件和修改同一集合行的流事件之間的沖突

READ

,Debezium 采用了所謂的快照視窗。快照視窗劃分了增量快照為指定收集塊捕獲資料的時間間隔。在一個塊的快照視窗打開之前,Debezium 遵循其通常的行為并從事務日志直接向下遊發送事件到目标 Kafka 主題。但是從特定塊的快照打開的那一刻起,直到它關閉,Debezium 執行重複資料删除步驟以解決具有相同主鍵的事件之間的沖突。

對于每個資料集合,Debezium 發出兩種類型的事件,并将它們的記錄存儲在單個目标 Kafka 主題中。它直接從表中捕獲的快照記錄作為

READ

操作發出。同時,随着使用者不斷更新資料集合中的記錄,事務日志也更新以反映每次送出,Debezium 會針對每次更改發出

UPDATE

或操作。

DELETE

當快照視窗打開時,Debezium 開始處理快照塊,它将快照記錄傳遞到記憶體緩沖區。在快照視窗期間,

READ

緩沖區中事件的主鍵與傳入流事件的主鍵進行比較。如果未找到比對項,則将流式事件記錄直接發送到 Kafka。如果 Debezium 檢測到比對,它會丢棄緩沖的

READ

事件,并将流式記錄寫入目标主題,因為流式事件在邏輯上取代了靜态快照事件。塊的快照視窗關閉後,緩沖區僅包含

READ

不存在相關事務日志事件的事件。Debezium 将這些剩餘

READ

事件發送到集合的 Kafka 主題。

連接配接器對每個快照塊重複該過程。

觸發增量快照

目前,啟動增量快照的唯一方法是将臨時快照信号發送到源資料庫上的信令集合。

INSERT

您将信号作為 SQL查詢送出給集合。Debezium 檢測到信号集合中的變化後,它會讀取信号,并運作請求的快照操作。

您送出的查詢指定要包含在快照中的集合,并且可以選擇指定快照操作的類型。目前,快照操作的唯一有效選項是預設值

incremental

.

要指定要包含在快照中的集合,請提供一個

data-collections

列出集合的數組,例如,

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的

data-collections

數組沒有預設值。如果

data-collections

數組為空,Debezium 檢測到不需要任何操作并且不執行快照。

如果要包含在快照中的集合

.

的名稱在資料庫、模式或表的名稱中包含點 (),則要将集合添加到

data-collections

數組中,必須用雙引号對名稱的每個部分進行轉義. 例如,要包含存在于

**public**

架構中且名稱為 的表

**My.Table**

,請使用以下格式:

**"public"."My.Table"**

.

先決條件

  • 信令已啟用。
    • 源資料庫上存在信令資料集合,連接配接器配置為捕獲它。
    • 信令資料集合在

      signal.data.collection

      屬性中指定。

程式

  1. 發送 SQL 查詢以将臨時增量快照請求添加到信令集合:

    例如,

    id

    指令中的、

    type

    、參數的值

    data

    對應于信令集合的字段。

    以下集合描述了這些參數:

    價值 描述

    myschema.debezium_signal

    指定源資料庫上的信令集合的完全限定名稱

    ad-hoc-1

    id

    參數指定一個任意字元串,該字元串被配置設定為

    id

    信号請求的辨別符。 使用此字元串将日志消息辨別到信令集合中的條目。Debezium 不使用此字元串。相反,在快照期間,Debezium 會生成自己的

    id

    字元串作為水印信号。

    execute-snapshot

    指定

    type

    參數指定信号要觸發的操作。

    data-collections

    信号字段的必需元件,

    data

    它指定要包含在快照中的集合名稱數組。 該數組按集合的完全限定名稱列出集合,使用的格式與您在

    signal.data.collection

    配置屬性中指定連接配接器的信号集合的名稱相同。

    incremental

    信号字段的可選

    type

    元件

    data

    ,指定要運作的快照操作的種類。 目前,唯一有效的選項是預設值

    incremental

    . 在您送出給信号集合的 SQL 查詢中指定一個

    type

    值是可選的。 如果您未指定值,則連接配接器将運作增量快照。

以下示例顯示了連接配接器捕獲的增量快照事件的 JSON。

示例:增量快照事件消息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 
    },
    "op":"r", 
    "ts_ms":"1620393591654",
    "transaction":null
}
           
物品 字段名稱 描述
1

snapshot

指定要運作的快照操作的類型。 目前,唯一有效的選項是預設值

incremental

. 在您送出給信号集合的 SQL 查詢中指定一個

type

值是可選的。 如果您未指定值,則連接配接器将運作增量快照。
2

op

指定事件類型。 快照事件的值為

r

,表示

READ

操作。
目前僅對單個副本集部署支援增量快照。此限制将在下一個版本中删除。

流式傳輸更改

副本集的連接配接器任務記錄偏移量後,它使用偏移量來确定 oplog 中應該開始流式傳輸更改的位置。然後該任務(取決于配置)連接配接到副本集的主節點或連接配接到副本集範圍的更改流并從該位置開始流式傳輸更改。它處理所有建立、插入和删除操作,并将它們轉換為 Debezium更改事件。每個更改事件都包括在 oplog 中找到操作的位置,連接配接器會定期将此記錄為最近的偏移量。記錄偏移量的時間間隔由 控制

offset.flush.interval.ms

,這是 Kafka Connect 工作器配置屬性。

當連接配接器正常停止時,會記錄最後處理的偏移量,以便在重新啟動時,連接配接器将準确地從中斷處繼續。但是,如果連接配接器的任務意外終止,那麼這些任務可能在它最後一次記錄偏移量之後但在記錄最後一個偏移量之前已經處理并生成了事件;重新啟動時,連接配接器從最後記錄的偏移量開始,可能會生成一些之前在崩潰之前生成的相同僚件。

當一切都在名義上運作時,Kafka 消費者實際上隻會看到每條消息一次*。但是,當出現問題時,Kafka 隻能保證消費者至少會看到每條消息一次*。是以,您的消費者需要預期會多次看到消息。

如上所述,連接配接器任務總是使用副本集的主節點從 oplog 流式傳輸更改,確定連接配接器盡可能看到最新的操作,并且可以以比輔助節點更低的延遲捕獲更改改為使用。當副本集選擇一個新的主節點時,連接配接器立即停止流式傳輸更改,連接配接到新的主節點,并開始從同一位置的新主節點流式傳輸更改。同樣,如果連接配接器在與副本內建員通信時遇到任何問題,它會嘗試通過使用指數退避來重新連接配接,以免淹沒副本集,并且一旦連接配接,它就會從上次中斷的地方繼續流式傳輸更改。這樣,

總而言之,MongoDB 連接配接器在大多數情況下都會繼續運作。通信問題可能會導緻連接配接器等待問題解決。

主題名稱

MongoDB 連接配接器将針對每個集合中的文檔的所有插入、更新和删除操作的事件寫入單個 Kafka 主題。Kafka 主題的名稱始終采用logicalName形式。資料庫名稱。collectionName,其中logicalName是使用配置屬性指定的連接配接器的邏輯名稱, databaseName是發生操作的資料庫的名稱,collectionName是受影響文檔所在的MongoDB 集合的名稱。

mongodb.name

例如,考慮一個 MongoDB 副本集,其

inventory

資料庫包含四個集合:

products

products_on_hand

customers

orders

。如果監控此資料庫的連接配接器的邏輯名稱為

fulfillment

,則連接配接器将在以下四個 Kafka 主題上産生事件:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

請注意,主題名稱不包含副本集名稱或分片名稱。是以,對分片集合(每個分片包含集合文檔的一個子集)的所有更改都轉到同一個 Kafka 主題。

您可以設定 Kafka 以根據需要自動建立主題。如果沒有,那麼您必須在啟動連接配接器之前使用 Kafka 管理工具來建立主題。

分區

MongoDB 連接配接器沒有明确決定如何為事件劃分主題。相反,它允許 Kafka 确定如何根據事件鍵對主題進行分區。

Partitioner

您可以通過在 Kafka Connect 工作程式配置中定義實作的名稱來更改 Kafka 的分區邏輯。

Kafka 僅維護寫入單個主題分區的事件的總順序。按鍵對事件進行分區确實意味着具有相同鍵的所有事件總是進入同一個分區。這確定了特定文檔的所有事件總是完全有序的。

交易中繼資料

Debezium 可以生成表示事務中繼資料邊界的事件并豐富更改資料事件消息。

Debezium 接收交易中繼資料的時間限制Debezium 僅注冊和接收部署連接配接器後發生的事務的中繼資料。部署連接配接器之前發生的事務的中繼資料不可用。

對于每筆交易

BEGIN

END

,Debezium 都會生成一個包含以下字段的事件:

  • status

    BEGIN

    或者

    END

  • id

    唯一交易辨別符的字元串表示。
  • event_count

    (用于

    END

    活動)

    事務發出的事件總數。

  • data_collections

    (用于

    END

    活動)

    一對

    data_collection

    和的數組

    event_count

    ,提供源自給定資料集合的更改所發出的事件數。

以下示例顯示了一條典型消息:

{
  "status": "BEGIN",
  "id": "1462833718356672513",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "1462833718356672513",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "rs0.testDB.collectiona",
      "event_count": 1
    },
    {
      "data_collection": "rs0.testDB.collectionb",
      "event_count": 1
    }
  ]
}
           

除非通過

transaction.topic

選項覆寫,否則事務事件将寫入名為

*database.server.name*.transaction

.

更改資料事件豐富

啟用事務中繼資料後,資料消息

Envelope

會增加一個新

transaction

字段。此字段以字段組合的形式提供有關每個事件的資訊:

  • id

    唯一交易辨別符的字元串表示。
  • total_order

    該事件在事務生成的所有事件中的絕對位置。
  • data_collection_order

    事務發出的所有事件中事件的每個資料收集位置。

以下是消息外觀的示例:

{
  "patch": null,
  "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"[email protected]\"}",
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "1462833718356672513",
    "total_order": "1",
    "data_collection_order": "1"
  }
}
           

資料更改事件

Debezium MongoDB 連接配接器為每個插入、更新或删除資料的文檔級操作生成資料更改事件。每個事件都包含一個鍵和一個值。鍵和值的結構取決于更改的集合。

Debezium 和 Kafka Connect 是圍繞連續的事件消息流設計的。但是,這些事件的結構可能會随着時間的推移而發生變化,這對于消費者來說可能難以處理。為了解決這個問題,每個事件都包含其内容的模式,或者,如果您使用的是模式系統資料庫,消費者可以使用它從系統資料庫中擷取模式的模式 ID。這使得每個事件都是獨立的。

以下骨架 JSON 顯示了更改事件的基本四個部分。但是,如何配置您選擇在應用程式中使用的 Kafka Connect 轉換器決定了這四個部分在更改事件中的表示。僅當您将轉換器配置為生成該字段時,該

schema

字段才處于更改事件中。同樣,僅當您将轉換器配置為生成它時,事件鍵和事件有效負載才在更改事件中。如果您使用 JSON 轉換器并将其配置為生成所有四個基本更改事件部分,則更改事件具有以下結構:

{
 "schema": { 
   ...
  },
 "payload": { 
   ...
 },
 "schema": { 
   ...
 },
 "payload": { 
   ...
 },
}
           
物品 字段名稱 描述
1

schema

第一個

schema

字段是事件鍵的一部分。它指定了一個 Kafka Connect 模式,該模式描述了事件鍵

payload

部分中的内容。換句話說,第一個

schema

字段描述了已更改文檔的鍵的結構。
2

payload

第一個

payload

字段是事件鍵的一部分。它具有前一個字段描述的結構,

schema

并且包含已更改文檔的鍵。
3

schema

第二個

schema

字段是事件值的一部分。它指定了描述事件值

payload

部分内容的 Kafka Connect 模式。換句話說,第二個

schema

描述了被改變的文檔的結構。通常,此模式包含嵌套模式。
4

payload

第二個

payload

字段是事件值的一部分。它具有前一個字段描述的結構,

schema

并且包含已更改文檔的實際資料。

預設情況下,連接配接器流将事件記錄更改為名稱與事件的原始集合相同的主題。請參閱主題名稱。

MongoDB 連接配接器確定所有 Kafka Connect 模式名稱都遵循Avro 模式名稱格式。這意味着邏輯伺服器名稱必須以拉丁字母或下劃線開頭,即 az、AZ 或 _。邏輯伺服器名稱中的每個剩餘字元以及資料庫和集合名稱中的每個字元都必須是拉丁字母、數字或下劃線,即 az、AZ、0-9 或 _。如果存在無效字元,則将其替換為下劃線字元。如果邏輯伺服器名稱、資料庫名稱或集合名稱包含無效字元,并且将名稱彼此區分開來的唯一字元無效并是以替換為下劃線,這可能會導緻意外沖突。

更改事件鍵

更改事件的鍵包含更改文檔鍵的模式和更改文檔的實際鍵。對于給定的集合,模式及其相應的有效負載都包含一個

id

字段。此字段的值是文檔的辨別符,表示為從MongoDB 擴充 JSON 序列化嚴格模式派生的字元串。

考慮一個邏輯名稱為 的連接配接器、一個包含資料庫

fulfillment

的副本集和一個包含如下文檔的集合。

inventory``customers

示例文檔

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "[email protected]"
}
           

示例更改事件鍵

捕獲對集合的更改的每個更改事件

customers

都具有相同的事件鍵架構。隻要

customers

集合具有前面的定義,捕獲

customers

集合更改的每個更改事件都具有以下關鍵結構。在 JSON 中,它看起來像這樣:

{
  "schema": { 
    "type": "struct",
    "name": "fulfillment.inventory.customers.Key", 
    "optional": false, 
    "fields": [ 
      {
        "field": "id",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": { 
    "id": "1004"
  }
}
           
物品 字段名稱 描述
1

schema

鍵的模式部分指定了描述鍵

payload

部分内容的 Kafka Connect 模式。
2

fulfillment.inventory.customers.Key

定義密鑰有效負載結構的架構名稱。此架構描述了已更改文檔的鍵的結構。鍵架構名稱的格式為connector-name。資料庫名稱。集合名稱。

Key

. 在這個例子中:

fulfillment

是生成此事件的連接配接器的名稱。

inventory

是包含已更改集合的資料庫。

customers

是包含已更新文檔的集合。
3

optional

payload

訓示事件鍵是否必須在其字段中包含值。在此示例中,密鑰的有效負載中的值是必需的。當文檔沒有鍵時,鍵的有效負載字段中的值是可選的。
4

fields

指定 中預期的

payload

每個字段,包括每個字段的名稱、類型以及是否需要。
5

payload

包含為其生成此更改事件的文檔的鍵。在此示例中,鍵包含一個

id

類型

string

為 的字段

1004

此示例使用具有整數辨別符的文檔,但任何有效的 MongoDB 文檔辨別符都以相同的方式工作,包括文檔辨別符。對于文檔辨別符,事件鍵的

payload.id

值是一個字元串,它将更新文檔的原始

_id

字段表示為使用嚴格模式的 MongoDB 擴充 JSON 序列化。下表提供了如何

_id

表示不同類型字段的示例。

類型 MongoDB

_id

價值
密鑰的有效載荷
整數 1234

{ "id" : "1234" }

漂浮 12.34

{ "id" : "12.34" }

細繩 “1234”

{ "id" : "\"1234\"" }

文檔

{ "hi" : "kafka", "nums" : [10.0, 100.0, 1000.0] }

{ "id" : "{\"hi\" : \"kafka\", \"nums\" : [10.0, 100.0, 1000.0]}" }

對象辨別

ObjectId("596e275826f08b2730779e1f")

{ "id" : "{\"$oid\" : \"596e275826f08b2730779e1f\"}" }

二進制

BinData("a2Fma2E=",0)

{ "id" : "{\"$binary\" : \"a2Fma2E=\", \"$type\" : \"00\"}" }

更改事件值

更改事件中的值比鍵複雜一些。與鍵一樣,值也有一個

schema

部分和一個

payload

部分。該

schema

部分包含描述該部分

Envelope

結構的架構

payload

,包括其嵌套字段。建立、更新或删除資料的操作的更改事件都有一個帶有信封結構的值負載。

考慮用于顯示更改事件鍵示例的相同示例文檔:

示例文檔

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "[email protected]"
}
           

針對每個事件類型描述了更改此文檔的更改事件的值部分:

  • 建立事件
  • 更新事件
  • 删除事件
  • 墓碑事件

建立事件

以下示例顯示了連接配接器為在

customers

集合中建立資料的操作生成的更改事件的值部分:

{
    "schema": { 
      "type": "struct",
      "fields": [
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json", 
          "version": 1,
          "field": "after"
        },
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json",
          "version": 1,
          "field": "patch"
        },
        {
          "type": "struct",
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "version"
            },
            {
              "type": "string",
              "optional": false,
              "field": "connector"
            },
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_ms"
            },
            {
              "type": "boolean",
              "optional": true,
              "default": false,
              "field": "snapshot"
            },
            {
              "type": "string",
              "optional": false,
              "field": "db"
            },
            {
              "type": "string",
              "optional": false,
              "field": "rs"
            },
            {
              "type": "string",
              "optional": false,
              "field": "collection"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "ord"
            },
            {
              "type": "int64",
              "optional": true,
              "field": "h"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.mongo.Source", 
          "field": "source"
        },
        {
          "type": "string",
          "optional": true,
          "field": "op"
        },
        {
          "type": "int64",
          "optional": true,
          "field": "ts_ms"
        }
      ],
      "optional": false,
      "name": "dbserver1.inventory.customers.Envelope" 
      },
    "payload": { 
      "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"[email protected]\"}", 
      "patch": null,
      "source": { 
        "version": "1.9.5.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 31,
        "h": 1546547425148721999
      },
      "op": "c", 
      "ts_ms": 1558965515240 
    }
  }
           
物品 字段名稱 描述
1

schema

值的架構,它描述了值的有效負載的結構。在連接配接器為特定集合生成的每個更改事件中,更改事件的值架構都是相同的。
2

name

在該

schema

部分中,每個

name

字段都為值的有效負載中的字段指定架構。

io.debezium.data.Json

是有效負載的

after

patch

filter

字段的架構。此架構特定于

customers

集合。建立事件是唯一一種包含

after

字段的事件。一個更新事件包含一個

filter

字段和一個

patch

字段。删除事件包含一個字段

filter

,但不包含

after

字段或

patch

字段。
3

name

io.debezium.connector.mongo.Source

是有效負載

source

字段的架構。此模式特定于 MongoDB 連接配接器。連接配接器将它用于它生成的所有事件。
4

name

dbserver1.inventory.customers.Envelope

是有效負載整體結構的架構,其中

dbserver1

是連接配接器名稱,

inventory

是資料庫,

customers

是集合。此架構特定于集合。
5

payload

該值的實際資料。這是更改事件提供的資訊。 看起來事件的 JSON 表示比它們描述的文檔大得多。這是因為 JSON 表示必須包括消息的模式和有效負載部分。但是,通過使用Avro 轉換器,您可以顯着減小連接配接器流式傳輸到 Kafka 主題的消息的大小。
6

after

一個可選字段,指定事件發生後文檔的狀态。在此示例中,該

after

字段包含新文檔的

_id

first_name

last_name

email

字段的值。該

after

值始終是一個字元串。按照慣例,它包含文檔的 JSON 表示。MongoDB的OPLOG條目僅包含用于_Create_事件的文檔的完整狀态,以及設定為選項

update

時的事件;in other words, a create event is the only kind of event that contains an after field, when the option is set either to or .

capture.mode``change_streams_update_full``capture.mode``oplog``change_streams

7

source

描述事件源中繼資料的必填字段。此字段包含可用于将此事件與其他事件進行比較的資訊,包括事件的來源、事件發生的順序以及事件是否屬于同一事務的一部分。源中繼資料包括:Debezium 版本。生成事件的連接配接器的名稱。MongoDB 副本集的邏輯名稱,形成生成事件的命名空間,并用于連接配接器寫入的 Kafka 主題名稱。包含新文檔的集合和資料庫的名稱。如果事件是快照的一部分。在資料庫中進行更改時的時間戳以及時間戳内事件的序号。MongoDB操作的唯一辨別,取決于MongoDB的版本。它要麼是

h

oplog 事件中的字段,要麼是一個名為 的字段

stxnid

,它表示來自 oplog 事件的

lsid

txnNumber

字段(僅限 oplog 捕獲模式)。MongoDB 會話的唯一辨別符

lsid

和事務編号

txnNumber

,以防更改在事務内部執行(僅限更改流捕獲模式)。
8

op

強制字元串,描述導緻連接配接器生成事件的操作類型。在本例中,

c

表示該操作建立了一個文檔。有效值為:

c

= 建立

u

= 更新

d

= 删除

r

= 讀取(僅适用于快照)
9

ts_ms

顯示連接配接器處理事件的時間的可選字段。該時間基于運作 Kafka Connect 任務的 JVM 中的系統時鐘。 在

source

對象中,

ts_ms

訓示在資料庫中進行更改的時間。通過比較 for

payload.source.ts_ms

的值

payload.ts_ms

,您可以确定源資料庫更新和 Debezium 之間的延遲。

更新事件

Oplog 捕獲模式(舊版)

示例集合中更新的更改事件的值與該集合的建立

customers

事件具有相同的模式。同樣,事件值的有效負載具有相同的結構。但是,事件值有效負載在更新事件中包含不同的值。更新事件沒有值。相反,它具有以下兩個字段:

after

  • patch

    是一個字元串字段,包含幂等更新操作的 JSON 表示
  • filter

    是一個字元串字段,其中包含更新選擇條件的 JSON 表示形式。該

    filter

    字元串可以包含多個分片集合的分片鍵字段。

customers

以下是連接配接器為集合中的更新生成的事件中的更改事件值示例:

{
    "schema": { ... },
    "payload": {
      "op": "u", 
      "ts_ms": 1465491461815, 
      "patch": "{\"$set\":{\"first_name\":\"Anne Marie\"}}", 
      "filter": "{\"_id\" : {\"$numberLong\" : \"1004\"}}", 
      "source": { 
        "version": "1.9.5.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }
           
物品 字段名稱 描述
1

op

強制字元串,描述導緻連接配接器生成事件的操作類型。在此示例中,

u

表示操作更新了文檔。
2

ts_ms

顯示連接配接器處理事件的時間的可選字段。該時間基于運作 Kafka Connect 任務的 JVM 中的系統時鐘。 在

source

對象中,

ts_ms

訓示在資料庫中進行更改的時間。通過比較 for

payload.source.ts_ms

的值

payload.ts_ms

,您可以确定源資料庫更新和 Debezium 之間的延遲。
3

patch

包含對文檔的實際 MongoDB 幂等更改的 JSON 字元串表示。在此示例中,更新将

first_name

字段更改為新值。 更新事件值不包含字段

after

4

filter

包含用于辨別要更新的文檔的 MongoDB 選擇标準的 JSON 字元串表示。
5

source

描述事件源中繼資料的必填字段。此字段包含與同一集合的建立事件相同的資訊,但值不同,因為此事件來自 oplog 中的不同位置。源中繼資料包括:Debezium 版本。生成事件的連接配接器的名稱。MongoDB 副本集的邏輯名稱,形成生成事件的命名空間,并用于連接配接器寫入的 Kafka 主題名稱。包含更新文檔的集合和資料庫的名稱。如果事件是快照的一部分。在資料庫中進行更改時的時間戳以及時間戳内事件的序号。MongoDB操作的唯一辨別,取決于MongoDB的版本。它要麼是

h

oplog 事件中的字段,要麼是一個名為 的字段

stxnid

,它表示來自 oplog 事件的

lsid

txnNumber

字段。
在 Debezium 更改事件中,MongoDB 提供

patch

字段的内容。該字段的格式取決于 MongoDB 資料庫的版本。是以,當您更新到較新的 MongoDB 資料庫版本時,請為格式可能發生的變化做好準備。本文檔中的示例來自 MongoDB 3.4,在您的應用程式中,事件格式可能會有所不同。
在 MongoDB 的 oplog 中,更新事件不包含更改文檔的前後狀态**。是以,Debezium 連接配接器無法提供此資訊。但是,Debezium 連接配接器在建立和讀取事件中提供文檔的起始狀态。流的下遊消費者可以通過保持每個文檔的最新狀态并将新事件中的狀态與儲存的狀态進行比較來重建文檔狀态。Debezium 連接配接器無法保持此狀态。

更改流捕獲模式

示例集合中更新的更改事件的值與該集合的建立

customers

事件具有相同的模式。同樣,事件值的有效負載具有相同的結構。但是,事件值有效負載在更新事件中包含不同的值。更新事件僅在設定為選項時才具有值。在這種情況下,有一個帶有一些附加字段的新結構化字段:

after``capture.mode``change_streams_update_full``updateDescription

  • updatedFields

    是一個字元串字段,其中包含更新的文檔字段的 JSON 表示及其值
  • removedFields

    是從文檔中删除的字段名稱清單
  • truncatedArrays

    是文檔中被截斷的數組清單

customers

以下是連接配接器為集合中的更新生成的事件中的更改事件值示例:

{
    "schema": { ... },
    "payload": {
      "op": "u", 
      "ts_ms": 1465491461815, 
      "after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"[email protected]\"}", 
      "updateDescription": {
        "removedFields": null,
        "updatedFields": "{\"first_name\": \"Anne Marie\"}", 
        "truncatedArrays": null
      },
      "source": { 
        "version": "1.9.5.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 1,
        "h": null,
        "tord": null,
        "stxnid": null,
        "lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
        "txnNumber":1
      }
    }
  }
           
物品 字段名稱 描述
1

op

強制字元串,描述導緻連接配接器生成事件的操作類型。在此示例中,

u

表示操作更新了文檔。
2

ts_ms

顯示連接配接器處理事件的時間的可選字段。該時間基于運作 Kafka Connect 任務的 JVM 中的系統時鐘。 在

source

對象中,

ts_ms

訓示在資料庫中進行更改的時間。通過比較 for

payload.source.ts_ms

的值

payload.ts_ms

,您可以确定源資料庫更新和 Debezium 之間的延遲。
3

after

包含實際 MongoDB 文檔的 JSON 字元串表示。 如果捕獲模式未設定為,則更新事件值不包含字段

after``change_streams_update_full

4

updatedFields

包含文檔的更新字段值的 JSON 字元串表示形式。在此示例中,更新将

first_name

字段更改為新值。
5

source

描述事件源中繼資料的必填字段。此字段包含與同一集合的建立事件相同的資訊,但值不同,因為此事件來自 oplog 中的不同位置。源中繼資料包括:Debezium 版本。生成事件的連接配接器的名稱。MongoDB 副本集的邏輯名稱,形成生成事件的命名空間,并用于連接配接器寫入的 Kafka 主題名稱。包含更新文檔的集合和資料庫的名稱。如果事件是快照的一部分。在資料庫中進行更改時的時間戳以及時間戳内事件的序号。

lsid

MongoDB 會話和事務号的唯一辨別符,

txnNumber

以防更改在事務内執行。
事件中的

after

值應作為文檔的時間點值處理。該值不是動态計算的,而是從集合中擷取的。是以,如果多個更新一個接一個地緊随其後,則所有更新更新事件将包含相同的

after

值,該值将表示存儲在文檔中的最後一個值。如果您的應用程式依賴于漸變進化,那麼您應該

updateDescription

隻依賴。

删除事件

删除更改事件中的值與同一集合的建立和更新

schema

事件具有相同的部分。删除事件中的部分包含與同一集合的建立和更新事件不同的值。特别是,删除事件既不包含值也不包含一個或多個值。以下是集合中文檔的删除事件示例:

payload``after``patch``updateDescription``customers

{
    "schema": { ... },
    "payload": {
      "op": "d", 
      "ts_ms": 1465495462115, 
      "filter": "{\"_id\" : {\"$numberLong\" : \"1004\"}}", 
      "source": { 
        "version": "1.9.5.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }
           
物品 字段名稱 描述
1

op

描述操作類型的強制字元串。

op

字段值為

d

,表示該文檔已被删除。
2

ts_ms

顯示連接配接器處理事件的時間的可選字段。該時間基于運作 Kafka Connect 任務的 JVM 中的系統時鐘。 在

source

對象中,

ts_ms

訓示在資料庫中進行更改的時間。通過比較 for

payload.source.ts_ms

的值

payload.ts_ms

,您可以确定源資料庫更新和 Debezium 之間的延遲。
3

filter

包含用于辨別要删除的文檔的 MongoDB 選擇标準的 JSON 字元串表示形式(僅限 oplog 捕獲模式)。
4

source

描述事件源中繼資料的必填字段。此字段包含與同一集合的建立或更新事件相同的資訊,但值不同,因為此事件來自 oplog 中的不同位置。源中繼資料包括:Debezium 版本。生成事件的連接配接器的名稱。MongoDB 副本集的邏輯名稱,形成生成事件的命名空間,并用于連接配接器寫入的 Kafka 主題名稱。包含已删除文檔的集合和資料庫的名稱。如果事件是快照的一部分。在資料庫中進行更改時的時間戳以及時間戳内事件的序号。MongoDB操作的唯一辨別,取決于MongoDB的版本。它要麼是

h

oplog 事件中的字段,要麼是一個名為 的字段

stxnid

,它表示來自 oplog 事件的

lsid

txnNumber

字段(僅限 oplog 捕獲模式)。MongoDB 會話的唯一辨別符

lsid

和事務編号

txnNumber

,以防更改在事務内部執行(僅限更改流捕獲模式)。

MongoDB 連接配接器事件旨在與Kafka 日志壓縮一起使用。隻要至少保留每個鍵的最新消息,日志壓縮就可以删除一些較舊的消息。這讓 Kafka 可以回收存儲空間,同時確定主題包含完整的資料集并可用于重新加載基于鍵的狀态。

墓碑事件

唯一辨別的文檔的所有 MongoDB 連接配接器事件都具有完全相同的鍵。删除文檔時,删除事件值仍然适用于日志壓縮,因為 Kafka 可以删除所有具有相同鍵的早期消息。但是,要讓 Kafka 删除所有具有該鍵的消息,消息值必須是

null

. 為了實作這一點,在 Debezium 的 MongoDB 連接配接器發出删除事件後,連接配接器會發出一個特殊的墓碑事件,該事件具有相同的鍵但有一個

null

值。墓碑事件通知 Kafka 可以删除具有相同密鑰的所有消息。

設定 MongoDB

MongoDB 連接配接器使用 MongoDB 的 oplog/change 流來捕獲更改,是以連接配接器僅适用于 MongoDB 副本集或分片叢集,其中每個分片都是一個單獨的副本集。有關設定副本集或分片叢集的資訊,請參閱 MongoDB 文檔。此外,請務必了解如何使用副本集啟用通路控制和身份驗證。

您還必須有一個具有适當角色的 MongoDB 使用者來讀取

admin

可以讀取 oplog 的資料庫。此外,使用者還必須能夠讀取分片

config

叢集的配置伺服器中的資料庫,并且必須具有

listDatabases

特權操作。當使用更改流(預設)時,使用者還必須具有叢集範圍的權限操作

find

changeStream

.

雲中的 MongoDB

您可以将 MongoDB 的 Debezium 連接配接器與MongoDB Atlas一起使用。将 Debezium 連接配接到 MongoDB Atlas 時,啟用其中之一

capture modes

基于更改流,而不是 oplog。請注意,MongoDB Atlas 僅支援通過 SSL 的安全連接配接,即

+mongodb.ssl.enabled

連接配接器選項必須設定為

true

.

部署

要部署 Debezium MongoDB 連接配接器,請安裝 Debezium MongoDB 連接配接器存檔,配置連接配接器,然後通過将其配置添加到 Kafka Connect 來啟動連接配接器。

先決條件

  • 已安裝Apache Zookeeper、Apache Kafka和Kafka Connect。
  • MongoDB 已安裝并設定為與 Debezium 連接配接器一起使用。

程式

  1. 下載下傳 連接配接器的插件存檔,
  2. 将 JAR 檔案提取到您的 Kafka Connect 環境中。
  3. 将包含 JAR 檔案的目錄添加到Kafka Connect 的

    plugin.path

    .
  4. 重新啟動 Kafka Connect 程序以擷取新的 JAR 檔案。

如果您正在使用不可變容器,請參閱Debezium 的Apache Zookeeper、Apache Kafka 和 Kafka Connect 容器映像,其中 MongoDB 連接配接器已安裝并準備好運作。

您還可以在 Kubernetes 和 OpenShift 上運作 Debezium。

Debezium教程将引導您使用這些圖像,這是了解 Debezium 的好方法。

MongoDB 連接配接器配置示例

以下是連接配接器執行個體的配置示例,該執行個體從位于

rs0

192.168.99.100 的端口 27017 的 MongoDB 副本集中捕獲資料,我們在邏輯上将其命名為

fullfillment

. 通常,您通過設定可用于連接配接器的配置屬性,在 JSON 檔案中配置 Debezium MongoDB 連接配接器。

您可以選擇為特定的 MongoDB 副本集或分片叢集生成事件。或者,您可以過濾掉不需要的集合。

{
  "name": "inventory-connector", 
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "mongodb.hosts": "rs0/192.168.99.100:27017", 
    "mongodb.name": "fullfillment", 
    "collection.include.list": "inventory[.]*" 
  }
}
           
當我們向 Kafka Connect 服務注冊連接配接器時的名稱。
MongoDB 連接配接器類的名稱。
用于連接配接到 MongoDB 副本集的主機位址。
MongoDB 副本集的邏輯名稱,形成生成事件的命名空間,并用于連接配接器寫入的所有 Kafka 主題的名稱、Kafka Connect 模式名稱以及 Avro 時相應 Avro 模式的命名空間使用轉換器。
與要監視的所有集合的集合命名空間(例如,.)比對的正規表達式清單。這是可選的。

有關可以為 Debezium MongoDB 連接配接器設定的配置屬性的完整清單,請參閱MongoDB 連接配接器配置屬性。

您可以使用指令将此配置發送

POST

到正在運作的 Kafka Connect 服務。該服務記錄配置并啟動一個執行以下操作的連接配接器任務:

  • 連接配接到 MongoDB 副本集或分片叢集。
  • 為每個副本集配置設定任務。
  • 如有必要,執行快照。
  • 讀取 oplog/change 流。
  • 流将事件記錄更改為 Kafka 主題。

添加連接配接器配置

要開始運作 Debezium MongoDB 連接配接器,請建立一個連接配接器配置,并将該配置添加到您的 Kafka Connect 叢集。

先決條件

  • MongoDB 設定為使用 Debezium 連接配接器。
  • Debezium MongoDB 連接配接器已安裝。

程式

  1. 為 MongoDB 連接配接器建立配置。
  2. 使用Kafka Connect REST API将該連接配接器配置添加到您的 Kafka Connect 叢集。

結果

連接配接器啟動後,它會完成以下操作:

  • 對 MongoDB 副本集中的集合執行一緻的快照。
  • 讀取副本集的 oplogs/change 流。
  • 為每個插入、更新和删除的文檔生成更改事件。
  • 流将事件記錄更改為 Kafka 主題。

連接配接器屬性

Debezium MongoDB 連接配接器具有許多配置屬性,您可以使用這些屬性為您的應用程式實作正确的連接配接器行為。許多屬性都有預設值。有關屬性的資訊組織如下:

  • 必需的 Debezium MongoDB 連接配接器配置屬性
  • 進階 Debezium MongoDB 連接配接器配置屬性

除非預設值可用,否則需要以下配置屬性。

财産 預設 描述

name

連接配接器的唯一名稱。嘗試使用相同名稱再次注冊将失敗。(所有 Kafka Connect 連接配接器都需要此屬性。)

connector.class

連接配接器的 Java 類的名稱。始終

io.debezium.connector.mongodb.MongoDbConnector

為 MongoDB 連接配接器使用值。

mongodb.hosts

副本集中 MongoDB 伺服器的主機名和端口對的逗号分隔清單(以“主機”或“主機:端口”的形式)。該清單可以包含單個主機名和端口對。如果

mongodb.members.auto.discover

設定為

false

,則主機和端口對應以副本集名稱為字首(例如,

rs0/localhost:27017

)。+必須提供目前的主位址。此限制将在下一個 Debezium 版本中删除。

mongodb.name

辨別此連接配接器監控的連接配接器和/或 MongoDB 副本集或分片叢集的唯一名稱。每台伺服器最多應該由一個 Debezium 連接配接器監控,因為該伺服器名稱作為所有持久 Kafka 主題的字首,這些主題來自 MongoDB 副本集或叢集。僅使用字母數字字元、連字元、點和下劃線來構成名稱。邏輯名稱在所有其他連接配接器中應該是唯一的,因為該名稱用作命名從該連接配接器接收記錄的 Kafka 主題的字首。+不要更改此屬性的值。如果您更改名稱值,在重新啟動後,連接配接器不會繼續向原始主題發出事件,而是向名稱基于新值的主題發出後續事件。

mongodb.user

連接配接到 MongoDB 時要使用的資料庫使用者的名稱。僅當 MongoDB 配置為使用身份驗證時才需要這樣做。

mongodb.password

連接配接 MongoDB 時使用的密碼。僅當 MongoDB 配置為使用身份驗證時才需要這樣做。

mongodb.authsource

admin

包含 MongoDB 憑據的資料庫(身份驗證源)。僅當 MongoDB 配置為對另一個身份驗證資料庫使用身份驗證而不是

admin

.

mongodb.ssl.enabled

false

連接配接器将使用 SSL 連接配接到 MongoDB 執行個體。

mongodb.ssl.invalid.hostname.allowed

false

啟用 SSL 時,此設定控制在連接配接階段是否禁用嚴格的主機名檢查。如果

true

連接配接不會阻止中間人攻擊。

database.include.list

空字元串 一個可選的逗号分隔的正規表達式清單,比對要監控的資料庫名稱;任何未包含在其中的資料庫名稱都将

database.include.list

被排除在監控之外。預設情況下,所有資料庫都受到監視。不得與

database.exclude.list

.

database.exclude.list

空字元串 一個可選的以逗号分隔的正規表達式清單,比對要從監控中排除的資料庫名稱;任何未包含在其中的資料庫名稱

database.exclude.list

都會受到監控。不得與

database.include.list

.

collection.include.list

空字元串 一個可選的以逗号分隔的正規表達式清單,比對要監控的 MongoDB 集合的完全限定命名空間;任何未包含在其中的集合都将

collection.include.list

被排除在監控之外。每個辨別符的格式為databaseName。集合名稱。預設情況下,連接配接器将監視除

local

admin

資料庫中的所有集合之外的所有集合。不得與

collection.exclude.list

.

collection.exclude.list

空字元串 一個可選的以逗号分隔的正規表達式清單,它與要從監控中排除的 MongoDB 集合的完全限定名稱空間比對;任何未包含在其中的集合

collection.exclude.list

都會受到監控。每個辨別符的格式為databaseName。集合名稱。不得與

collection.include.list

.

snapshot.mode

initial

指定在連接配接器啟動時運作快照的條件。預設值為initial,并指定連接配接器在未找到偏移量或 oplog/change 流不再包含先前偏移量時讀取快照。never選項指定連接配接器不應該使用快照,而是連接配接器應該繼續跟蹤日志。

capture.mode

change_streams_update_full

指定用于從 MongoDB 伺服器捕獲更改的方法。預設值為change_streams_update_full,并指定連接配接器通過 MongoDB 更改流機制捕獲更改,并且更新事件應包含完整文檔。change_streams模式将使用相同的捕獲方法,但更新事件不會包含完整的文檔。oplog模式指定直接通路MongoDB oplog ;這是傳統方法,不應用于新的連接配接器執行個體。

snapshot.include.collection.list

中指定的所有集合

collection.include.list

一個可選的、以逗号分隔的正規表達式清單,與

collection.include.list

您要為其拍攝快照的指定模式的名稱相比對。

field.exclude.list

空字元串 應從更改事件消息值中排除的字段的完全限定名稱的可選逗号分隔清單。字段的完全限定名稱的格式為databaseName。集合名稱。字段名。nestedFieldName,其中databaseName和collectionName可能包含比對任何字元的通配符 (*)。

field.renames

空字元串 應用于重命名更改事件消息值中的字段的字段的完全限定替換的可選逗号分隔清單。字段的完全限定替換形式為databaseName。集合名稱。字段名。nestedFieldName : newNestedFieldName,其中databaseName和collectionName可以包含比對任何字元的通配符 (*),冒号 (😃 用于确定字段的重命名映射。下一個字段替換應用于清單中上一個字段替換的結果,是以在重命名同一路徑中的多個字段時請記住這一點。

tasks.max

1

應為此連接配接器建立的最大任務數。MongoDB 連接配接器将嘗試為每個副本集使用單獨的任務,是以在将連接配接器與單個 MongoDB 副本集一起使用時,預設值是可以接受的。将連接配接器與 MongoDB 分片叢集一起使用時,我們建議指定一個等于或大于叢集中分片數量的值,以便每個副本集的工作可以由 Kafka Connect 配置設定。

snapshot.max.threads

1

正整數值,指定用于執行副本集中集合的初始同步的最大線程數。預設為 1。

tombstones.on.delete

true

控制删除事件後是否有墓碑事件。

true

- 删除操作由删除事件和随後的墓碑事件表示。

false

- 隻發出一個删除事件。 删除源記錄後,發出 tombstone 事件(預設行為)允許 Kafka 完全删除與已删除行的鍵相關的所有事件,以防主題啟用日志壓縮。

snapshot.delay.ms

連接配接器在啟動後拍攝快照之前應等待的時間間隔(以毫秒為機關); 可用于在叢集中啟動多個連接配接器時避免快照中斷,這可能導緻連接配接器重新平衡。

snapshot.fetch.size

指定在拍攝快照時應從每個集合中一次性讀取的最大文檔數。連接配接器将分批讀取該大小的集合内容。 預設為 0,表示伺服器選擇了合适的 fetch 大小。

schema.name.adjustment.mode

歐元 指定應如何調整架構名稱以與連接配接器使用的消息轉換器相容。可能的設定:

avro

用下劃線替換不能在 Avro 類型名稱中使用的字元。

none

不應用任何調整。

以下進階配置屬性具有良好的預設值,适用于大多數情況,是以很少需要在連接配接器的配置中指定。

财産 預設 描述

max.batch.size

2048

正整數值,指定在此連接配接器的每次疊代期間應處理的每批事件的最大大小。預設為 2048。

max.queue.size

8192

正整數值,指定阻塞隊列可以儲存的最大記錄數。當 Debezium 讀取從資料庫流式傳輸的事件時,它會将事件放入阻塞隊列中,然後再将它們寫入 Kafka。在連接配接器接收消息的速度快于将消息寫入 Kafka 的速度或 Kafka 不可用時,阻塞隊列可以為從資料庫讀取更改事件提供背壓。當連接配接器定期記錄偏移量時,将忽略隊列中儲存的事件。始終将 的值設定

max.queue.size

為大于 的值

max.batch.size

max.queue.size.in.bytes

一個長整數值,指定阻塞隊列的最大容量(以位元組為機關)。預設情況下,沒有為阻塞隊列指定卷限制。要指定隊列可以使用的位元組數,請将此屬性設定為正長值。 如果

max.queue.size

也設定了,當隊列的大小達到任一屬性指定的限制時,寫入隊列将被阻止。例如,如果設定

max.queue.size=1000

, 和

max.queue.size.in.bytes=5000

,則在隊列包含 1000 條記錄或隊列中的記錄量達到 5000 位元組後阻止寫入隊列。

poll.interval.ms

1000

正整數值,指定連接配接器在每次疊代期間應等待新更改事件出現的毫秒數。預設為 1000 毫秒或 1 秒。

connect.backoff.initial.delay.ms

1000

正整數值,指定在第一次連接配接嘗試失敗或沒有可用的主節點後嘗試重新連接配接主節點時的初始延遲。預設為 1 秒(1000 毫秒)。

connect.backoff.max.delay.ms

1000

正整數值,指定在重複連接配接嘗試失敗或沒有可用的主節點後嘗試重新連接配接主節點時的最大延遲。預設為 120 秒(120,000 毫秒)。

connect.max.attempts

16

正整數值,指定在發生異常和任務被中止之前與副本集主節點的最大失敗連接配接嘗試次數。預設為 16,預設為 16,

connect.backoff.initial.delay.ms

導緻

connect.backoff.max.delay.ms

在失敗前嘗試超過 20 分鐘。

mongodb.members.auto.discover

true

布爾值,指定 ‘mongodb.hosts’ 中的位址是否是應用于發現叢集或副本集的所有成員的種子(

true

),或者是否

mongodb.hosts

應按原樣使用(

false

)中的位址。預設值是

true

并且應該在所有情況下使用,除了 MongoDB前面有代理的情況。

source.struct.version

v2

source

CDC 事件中塊的架構版本。Debezium 0.10 對塊的結構進行了一些重大更改,

source

以統一所有連接配接器的暴露結構。 通過将此選項設定為

v1

可以生成早期版本中使用的結構。請注意,不建議使用此設定,并計劃在未來的 Debezium 版本中删除此設定。

heartbeat.interval.ms

控制發送心跳消息的頻率。 此屬性包含以毫秒為機關的時間間隔,該時間間隔定義連接配接器将消息發送到心跳主題的頻率。這可用于監視連接配接器是否仍在從資料庫接收更改事件。如果在較長時間内僅更改未捕獲集合中的記錄,您還應該利用心跳消息。在這種情況下,連接配接器将繼續從資料庫中讀取 oplog/change 流,但永遠不會向 Kafka 發出任何更改消息,這反過來意味着沒有偏移量更新送出給 Kafka。這将導緻 oplog 檔案被輪換,但連接配接器不會注意到它,是以在重新啟動時某些事件不再可用,這導緻需要重新執行初始快照。将此參數設定 為根本不發送心跳消息。 預設禁用。

heartbeat.topics.prefix

__debezium-heartbeat

控制向其發送心跳消息的主題的命名。 主題是根據模式命名的

<heartbeat.topics.prefix>.<server.name>

sanitize.field.names

true

當連接配接器配置明确指定使用 Avro 的

key.converter

value.converter

參數時,預設為

false

.
字段名稱是否經過清理以符合 Avro 命名要求。有關詳細資訊,請參閱Avro 命名。

skipped.operations

流式傳輸期間将跳過的操作類型的逗号分隔清單。操作包括:

c

插入/建立、

u

更新和

d

删除。預設情況下,不會跳過任何操作。

snapshot.collection.filter.overrides

控制哪些集合項包含在快照中。此屬性僅影響快照。以databaseName.collectionName形式指定以逗号分隔的集合名稱清單。對于您指定的每個集合,還要指定另一個配置屬性:. 例如,其他配置屬性的名稱可能是:. 将此屬性設定為僅檢索快照中所需的項目的有效過濾器表達式。當連接配接器執行快照時,它隻檢索與過濾器表達式比對的項目。

snapshot.collection.filter.overrides.*databaseName*.*collectionName*``snapshot.collection.filter.overrides.customers.orders

provide.transaction.metadata

false

當設定為

true

Debezium 時,會生成帶有事務邊界的事件,并使用事務中繼資料豐富資料事件信封。有關其他詳細資訊,請參閱事務中繼資料。

transaction.topic

${database.server.name}.transaction

控制連接配接器向其發送事務中繼資料消息的主題的名稱。占位符

${database.server.name}

可用于引用連接配接器的邏輯名稱(請參閱邏輯連接配接器名稱);預設為

${database.server.name}.transaction

,例如

dbserver1.transaction

retriable.restart.connector.wait.ms

10000(10 秒) 發生可重試錯誤後重新啟動連接配接器之前等待的毫秒數。

mongodb.poll.interval.ms

30000

連接配接器輪詢新的、删除的或更改的副本集的時間間隔。

mongodb.connect.timeout.ms

10000(10 秒) 在中止新的連接配接嘗試之前驅動程式将等待的毫秒數。

mongodb.socket.timeout.ms

在發生逾時之前,套接字上的發送/接收可能需要的毫秒數。值 禁用此行為。

mongodb.server.selection.timeout.ms

30000(30 秒) 驅動程式在逾時并引發錯誤之前等待選擇伺服器的毫秒數。

cursor.max.await.time.ms

指定 oplog/change 流遊标在導緻執行逾時異常之前等待伺服器産生結果的最大毫秒數。值 訓示使用伺服器/驅動程式預設等待逾時。

signal.data.collection

無預設值 用于向連接配接器發送信号的資料集合的完全限定名稱。使用以下格式指定集合名稱:

*<databaseName>*.*<collectionName>*

incremental.snapshot.chunk.size

1024

連接配接器在增量快照塊期間擷取并讀入記憶體的最大文檔數。增加塊大小提供了更高的效率,因為快照運作的快照查詢更少,但更大的大小。然而,更大的塊大小也需要更多的記憶體來緩沖快照資料。将塊大小調整為在您的環境中提供最佳性能的值。

監控

除了 Zookeeper、Kafka 和 Kafka Connect 擁有的對 JMX 名額的内置支援之外,Debezium MongoDB 連接配接器還有兩種名額類型。

  • 快照名額在執行快照時提供有關連接配接器操作的資訊。
  • 當連接配接器捕獲更改和流式處理更改事件記錄時,流式處理名額提供有關連接配接器操作的資訊。

Debezium監控文檔提供了有關如何使用 JMX 公開這些名額的詳細資訊。

快照名額

MBean是. _

debezium.mongodb:type=connector-metrics,context=snapshot,server=*<mongodb.server.name>*,task=*<task.id>*

除非快照操作處于活動狀态,或者自上次連接配接器啟動以來已發生快照,否則不會公開快照名額。

下表列出了可用的快照名額。

屬性 類型 描述

LastEvent

string

連接配接器讀取的最後一個快照事件。

MilliSecondsSinceLastEvent

long

自連接配接器讀取并處理最新事件以來的毫秒數。

TotalNumberOfEventsSeen

long

自上次啟動或重置以來,此連接配接器已看到的事件總數。

NumberOfEventsFiltered

long

已被連接配接器上配置的包含/排除清單過濾規則過濾的事件數。

CapturedTables

string[]

連接配接器捕獲的表清單。

QueueTotalCapacity

int

用于在快照程式和主 Kafka Connect 循環之間傳遞事件的隊列長度。

QueueRemainingCapacity

int

用于在快照程式和主 Kafka Connect 循環之間傳遞事件的隊列的可用容量。

TotalTableCount

int

快照中包含的表總數。

RemainingTableCount

int

快照尚未複制的表數。

SnapshotRunning

boolean

快照是否已啟動。

SnapshotAborted

boolean

快照是否已中止。

SnapshotCompleted

boolean

快照是否完成。

SnapshotDurationInSeconds

long

到目前為止,快照已拍攝的總秒數,即使未完成也是如此。

RowsScanned

Map<String, Long>

包含為快照中的每個表掃描的行數的映射。在處理過程中,表格會逐漸添加到地圖中。每掃描 10,000 行并在完成表時更新。

MaxQueueSizeInBytes

long

隊列的最大緩沖區(以位元組為機關)。

max.queue.size.in.bytes

如果設定為正長值,則此名額可用。

CurrentQueueSizeInBytes

long

隊列中的目前記錄量(以位元組為機關)。

Debezium MongoDB 連接配接器還提供以下自定義快照名額:

屬性 類型 描述

NumberOfDisconnects

long

資料庫斷開連接配接的次數。

流媒體名額

MBean是. _

debezium.mongodb:type=connector-metrics,context=streaming,server=*<mongodb.server.name>*,task=*<task.id>*

下表列出了可用的流式名額。

屬性 類型 描述

LastEvent

string

連接配接器讀取的最後一個流事件。

MilliSecondsSinceLastEvent

long

自連接配接器讀取并處理最新事件以來的毫秒數。

TotalNumberOfEventsSeen

long

自上次啟動或名額重置以來,此連接配接器已看到的事件總數。

TotalNumberOfCreateEventsSeen

long

自上次啟動或名額重置以來,此連接配接器已看到的建立事件總數。

TotalNumberOfUpdateEventsSeen

long

自上次啟動或名額重置以來,此連接配接器已看到的更新事件總數。

TotalNumberOfDeleteEventsSeen

long

自上次啟動或名額重置以來,此連接配接器已看到的删除事件總數。

NumberOfEventsFiltered

long

已被連接配接器上配置的包含/排除清單過濾規則過濾的事件數。

CapturedTables

string[]

連接配接器捕獲的表清單。

QueueTotalCapacity

int

用于在流媒體和主 Kafka Connect 循環之間傳遞事件的隊列長度。

QueueRemainingCapacity

int

隊列的空閑容量,用于在流媒體和 Kafka Connect 主循環之間傳遞事件。

Connected

boolean

表示連接配接器目前是否連接配接到資料庫伺服器的标志。

MilliSecondsBehindSource

long

上次更改事件的時間戳與處理它的連接配接器之間的毫秒數。這些值将包含運作資料庫伺服器和連接配接器的機器上的時鐘之間的任何差異。

NumberOfCommittedTransactions

long

已送出的已處理事務數。

SourceEventPosition

Map<String, String>

最後接收到的事件的坐标。

LastTransactionId

string

最後處理的事務的事務辨別符。

MaxQueueSizeInBytes

long

隊列的最大緩沖區(以位元組為機關)。

max.queue.size.in.bytes

如果設定為正長值,則此名額可用。

CurrentQueueSizeInBytes

long

隊列中的目前記錄量(以位元組為機關)。

Debezium MongoDB 連接配接器還提供以下自定義流名額:

屬性 類型 描述

NumberOfDisconnects

long

資料庫斷開連接配接的次數。

NumberOfPrimaryElections

long

主節點選舉次數。

MongoDB 連接配接器常見問題

Debezium 是一個分布式系統,可以捕獲多個上遊資料庫中的所有更改,并且永遠不會錯過或丢失任何事件。當系統正常運作并得到仔細管理時,Debezium 會為每個更改事件提供一次傳遞。

如果發生故障,系統不會丢失任何事件。但是,當它從故障中恢複時,它可能會重複一些更改事件。在這種情況下,像 Kafka 一樣,Debezium至少提供一次更改事件的傳遞。

本節的其餘部分描述了 Debezium 如何處理各種故障和問題。

配置和啟動錯誤

在以下情況下,連接配接器嘗試啟動失敗,在日志中報告錯誤或異常,并停止運作:

  • 連接配接器的配置無效。
  • 連接配接器無法使用指定的連接配接參數成功連接配接到 MongoDB。

失敗後,連接配接器會嘗試使用指數退避重新連接配接。您可以配置重新連接配接嘗試的最大次數。

在這些情況下,錯誤将包含有關問題的更多詳細資訊以及可能的建議解決方法。更正配置或解決 MongoDB 問題後,可以重新啟動連接配接器。

MongoDB 變得不可用

一旦連接配接器運作,如果任何 MongoDB 副本集的主節點變得不可用或無法通路,連接配接器将反複嘗試重新連接配接到主節點,使用指數退避來防止網絡或伺服器飽和。如果在可配置的連接配接嘗試次數後主節點仍然不可用,則連接配接器将失敗。

重新連接配接的嘗試由三個屬性控制:

  • connect.backoff.initial.delay.ms

    - 第一次嘗試重新連接配接之前的延遲,預設為 1 秒(1000 毫秒)。
  • connect.backoff.max.delay.ms

    - 嘗試重新連接配接之前的最大延遲,預設為 120 秒(120,000 毫秒)。
  • connect.max.attempts

    - 産生錯誤前的最大嘗試次數,預設為 16。

每個延遲是先前延遲的兩倍,直到最大延遲。在給定預設值的情況下,下表顯示了每次失敗的連接配接嘗試的延遲以及失敗前的總累積時間。

重新連接配接嘗試次數 嘗試前的延遲,以秒為機關 嘗試前的總延遲,以分鐘和秒為機關
1 1 00:01
2 2 00:03
3 4 00:07
4 8 00:15
5 16 00:31
6 32 01:03
7 64 02:07
8 120 04:07
9 120 06:07
10 120 08:07
11 120 10:07
12 120 12:07
13 120 14:07
14 120 16:07
15 120 18:07
16 120 20:07

Kafka Connect 程序正常停止

如果 Kafka Connect 正在分布式模式下運作,并且 Kafka Connect 程序正常停止,那麼在關閉該程序之前,Kafka Connect 會将所有程序的連接配接器任務遷移到該組中的另一個 Kafka Connect 程序,并且新的連接配接器任務将準确地從先前任務停止的地方開始。在連接配接器任務正常停止并在新程序上重新啟動時,處理會有短暫的延遲。

如果組僅包含一個程序并且該程序正常停止,則 Kafka Connect 将停止連接配接器并記錄每個副本集的最後偏移量。重新啟動後,副本集任務将完全從中斷的地方繼續。

Kafka Connect 程序崩潰

如果 Kafka 連接配接器程序意外停止,那麼它正在運作的任何連接配接器任務都将終止,而不會記錄它們最近處理的偏移量。當 Kafka Connect 在分布式模式下運作時,它将在其他程序上重新啟動這些連接配接器任務。但是,MongoDB 連接配接器将從早期程序記錄的最後一個偏移量恢複,這意味着新的替換任務可能會生成一些在崩潰之前處理的相同更改事件。重複事件的數量取決于偏移重新整理周期和崩潰前的資料量變化。

因為在故障恢複期間某些事件可能會重複,是以消費者應該始終預期某些事件可能會重複。Debezium 的變化是幂等的,是以一系列事件總是導緻相同的狀态。Debezium 還在每個更改事件消息中包含有關事件起源的源特定資訊,包括 MongoDB 事件的唯一事務辨別符 (

h

) 和時間戳 (

sec

ord

)。消費者可以跟蹤這些值中的其他值,以了解它是否已經看到特定事件。

卡夫卡變得不可用

當連接配接器生成更改事件時,Kafka Connect 架構使用 Kafka 生産者 API 在 Kafka 中記錄這些事件。Kafka Connect 還将按照您在 Kafka Connect 工作程式配置中指定的頻率定期記錄這些更改事件中出現的最新偏移量。如果 Kafka 代理變得不可用,運作連接配接器的 Kafka Connect 工作程序将簡單地重複嘗試重新連接配接到 Kafka 代理。換句話說,連接配接器任務将簡單地暫停,直到可以重建立立連接配接,此時連接配接器将準确地從中斷的地方恢複。

連接配接器長時間停止

如果連接配接器正常停止,則可以繼續使用副本集,并且任何新的更改都會記錄在 MongoDB 的 oplog 中。當連接配接器重新啟動時,它将為上次停止的每個副本集恢複流式更改,記錄連接配接器停止時所做的所有更改的更改事件。如果連接配接器停止的時間足夠長,以至于 MongoDB 從其 oplog 中清除了一些連接配接器尚未讀取的操作,那麼在啟動時連接配接器将執行快照。

正确配置的 Kafka 叢集能夠提供巨大的吞吐量。Kafka Connect 是使用 Kafka 最佳實踐編寫的,如果有足夠的資源,也将能夠處理非常大量的資料庫更改事件。正因為如此,當連接配接器在一段時間後重新啟動時,它很可能會趕上資料庫,盡管速度将取決于 Kafka 的功能和性能以及對 MongoDB 中的資料所做的更改量。

如果連接配接器保持停止的時間足夠長,MongoDB 可能會清除舊的 oplog 檔案,并且連接配接器的最後位置可能會丢失。在這種情況下,當配置了初始快照模式(預設)的連接配接器最終重新啟動時,MongoDB 伺服器将不再具有起始點,連接配接器将失敗并報錯。

MongoDB 丢失寫入

在某些故障情況下,MongoDB 可能會丢失送出,進而導緻 MongoDB 連接配接器無法捕獲丢失的更改。例如,如果主節點在應用更改并将更改記錄到其 oplog 後突然崩潰,則 oplog 可能在輔助節點讀取其内容之前變得不可用。是以,被選為新主節點的輔助節點可能會丢失其 oplog 中的最新更改。

目前,在 MongoDB 中沒有辦法防止這種副作用。

繼續閱讀