天天看點

基于 Kafka 與 Debezium 建構實時資料同步

基于 Kafka 與 Debezium 建構實時資料同步

在進行架構轉型與分庫分表之前,我們一直采用非常典型的單體應用架構:主服務是一個 Java WebApp,使用 Nginx 并選擇 Session Sticky 分發政策做負載均衡和會話保持;背後是一個 MySQL 主執行個體,接了若幹 Slave 做讀寫分離。在整個轉型開始之前,我們就知道這會是一塊難啃的硬骨頭:我們要在全線業務飛速地擴張疊代的同時完成架構轉型,因為這是實實在在的”給高速行駛的汽車換輪胎”。

為了最大限度地減少服務拆分與分庫分表給業務帶來的影響(不影響業務開發也是架構轉型的前提),我們采用了一種溫和的漸進式拆分方案:

對于每塊需要拆分的領域,首先拆分出子服務,并将所有該領域的資料庫操作封裝為 RPC 接口;

将其它所有服務中對該領域資料表的操作替換為 RPC 調用;

拆分該領域的資料表,使用資料同步保證舊庫中的表與新表資料一緻;

将該子服務中的資料庫操作逐漸遷移到新表,分批上線;

全部遷移完成後,切斷同步,該服務拆分結束。

這種方案能夠做到平滑遷移,但其中卻有幾個棘手的問題:

舊表新表的資料一緻性如何保證?

如何支援異構遷移?(由于舊表的設計往往非常範式化,是以拆分後的新表會增加很多來自其它表的備援列)

如何保證資料同步的實時性?(往往會先遷移讀操作到新表,這時就要求舊表的寫操作必須準實時地同步到新表)

典型的解決方案有兩種:

雙寫(dual write): 即所有寫入操作同時寫入舊表和新表,這種方式可以完全控制應用代碼如何寫資料庫,聽上去簡單明了。但它會引入複雜的分布式一緻性問題:要保證新舊庫中兩張表資料一緻,雙寫操作就必須在一個分布式事務中完成,而分布式事務的代價太高了。

資料變更抓取(change data capture, CDC): 通過資料源的事務日志抓取資料源變更,這能解決一緻性問題(隻要下遊能保證變更應用到新庫上)。它的問題在于各種資料源的變更抓取沒有統一的協定,如 MySQL 用 Binlog,PostgreSQL 用 Logical decoding 機制,MongoDB 裡則是 oplog。

最終我們選擇使用資料變更抓取實作資料同步與遷移,一是因為資料一緻性的優先級更高,二是因為開源社群的多種元件能夠幫助我們解決沒有統一協定帶來的 CDC 子產品開發困難的問題。在明确要解決的問題和解決方向後,我們就可以着手設計整套架構了。

隻有一個 CDC 子產品當然是不夠的,因為下遊的消費者不可能随時就位等待 CDC 子產品的推送。是以我們還需要引入一個變更分發平台,它的作用是:

提供變更資料的堆積能力;

支援多個下遊消費者按不同速度消費;

解耦 CDC 子產品與消費者;

另外,我們還需要确定一套統一的資料格式,讓整個架構中的所有元件能夠高效而安全地通信。

現在我們可以正式介紹 Vimur [ˈviːmər] 了,它是一套實時資料管道,設計目标是通過 CDC 子產品抓取業務資料源變更,并以統一的格式釋出到變更分發平台,所有消費者通過用戶端庫接入變更分發平台擷取實時資料變更。

我們先看一看這套模型要如何才解決上面的三個問題:

一緻性:資料變更分發給下遊應用後,下遊應用可以不斷重試保證變更成功應用到目标資料源——這個過程要真正實作一緻性還要滿足兩個前提,一是從資料變更抓取子產品投遞到下遊應用并消費這個過程不能丢資料,也就是要保證至少一次傳遞;二是下遊應用的消費必須是幂等的。

異構遷移:異構包含多種含義:表的 Schema 不同、表的實體結構不同(單表到分片表)、資料庫不同(如 MySQL -> EleasticSearch) ,後兩者隻要下遊消費端實作對應的寫入接口就能解決;而 Schema 不同,尤其是當新庫的表聚合了多張舊庫的表資訊時,就要用反查源資料庫或 Stream Join 等手段實作。

實時性:隻要保證各子產品的資料傳輸與寫入的效率,該模型便能保證明時性。

可以看到,這套模型本身對各個元件是有一些要求的,我們下面的設計選型也會參照這些要求。

在設計階段,我們調研對比了多個開源解決方案:

databus: Linkedin 的分布式資料變更抓取系統;

Yelp’s data pipeline: Yelp 的資料管道;

Otter: 阿裡開源的分布式資料庫同步系統;

Debezium: Redhat 開源的資料變更抓取元件;

這些解決方案關注的重點各有不同,但基本思想是一緻的:使用變更抓取子產品實時訂閱資料庫變更,并分發到一個中間存儲供下遊應用消費。下面是四個解決方案的對比矩陣:

基于 Kafka 與 Debezium 建構實時資料同步
基于 Kafka 與 Debezium 建構實時資料同步

Linkedin databus 的架構圖

Linkedin databus 的論文有很強的指導性,但它的 MySQL 變更抓取子產品很不成熟,官方支援的是 Oracle,MySQL 隻是使用另一個開源元件 OpenReplicator 做了一個 demo。另一個不利因素 databus 使用了自己實作的一個 Relay 作為變更分發平台,相比于使用開源消息隊列的方案,這對維護和外部內建都不友好。

基于 Kafka 與 Debezium 建構實時資料同步

otter 的架構圖

Otter 和 Canal 在國内相當知名,Canal 還支援了阿裡雲 DRDS 的二級索引建構和小表同步,工程穩定性上有保障。但 Otter 本身無法很好地支援多表聚合到新表,開源版本也不支援同步到分片表當中,能夠采取的一個折衷方案是直接将 Canal 訂閱的變更寫入消息隊列,自己寫下遊程式實作聚合同步等邏輯。該方案也是我們的候選方案。

Yelp’s data pipeline 是一個大而全的解決方案。它使用 Mysql-Streamer(一個通過 binlog 實作的 MySQL CDC 子產品)将所有的資料庫變更寫入 Kafka,并提供了 Schematizer 這樣的 Schema 注冊中心和定制化的 Python 用戶端庫解決通信問題。遺憾的是該方案是 Python 建構的,與我們的 Java 技術棧相性不佳。

最後是 Debezium , 不同于上面的解決方案,它隻專注于 CDC,它的亮點有:

支援 MySQL、MongoDB、PostgreSQL 三種資料源的變更抓取,并且社群正在開發 Oracle 與 Cassandra 支援;

Snapshot Mode 可以将表中的現有資料全部導入 Kafka,并且全量資料與增量資料形式一緻,可以統一處理;

利用了 Kafka 的 Log Compaction 特性,變更資料可以實作”不過期”永久儲存;

利用了 Kafka Connect,自動擁有高可用與開箱即用的排程接口;

社群活躍:Debezium 很年輕,面世不到1年,但它的 Gitter上每天都有百餘條技術讨論,并且有兩位 Redhat 全職工程師進行維護;

最終我們選擇了 Debezium + Kafka 作為整套架構的基礎元件,并以 Apache Avro 作為統一資料格式,下面我們将結合各個子產品的目标與設計闡釋選型動機。

變更資料抓取通常需要針對不同資料源訂制實作,而針對特定資料源,實作方式一般有兩種:

基于自增列或上次修改時間做增量查詢;

利用資料源本身的事務日志或 Slave 同步等機制實時訂閱變更;

第一種方式實作簡單,以 SQL 為例:相信大家都寫過類似的 SQL, 每次查詢時,查詢 [last_query_time, now) 區間内的增量資料,lastmodified 列也可以用自增主鍵來替代。這種方式的缺點是實時性差,對資料庫帶來了額外壓力,并且侵入了表設計 —— 所有要實作變更抓取的表都必須有用于增量查詢的列并且在該列上建構索引。另外,這種方式無法感覺實體删除(Delete), 删除邏輯隻能用一個 delete 列作為 flag 來實作。

第二種方式實作起來相對困難,但它很好地解決了第一種方式的問題,是以前文提到的開源方案也都采用了這種方式。下面我們着重分析在 MySQL 中如何實作基于事務日志的實時變更抓取。

MySQL 的事務日志稱為 binlog,常見的 MySQL 主從同步就是使用 Binlog 實作的:

我們把 Slave 替換成 CDC 子產品,CDC 子產品模拟 MySQL Slave 的互動協定,便能收到 Master 的 binlog 推送:

基于 Kafka 與 Debezium 建構實時資料同步

CDC 子產品解析 binlog,産生特定格式的變更消息,也就完成了一次變更抓取。但這還不夠,CDC 子產品本身也可能挂掉,那麼恢複之後如何保證不丢資料又是一個問題。這個問題的解決方案也是要針對不同資料源進行設計的,就 MySQL 而言,通常會持久化已經消費的 binlog 位點或 Gtid(MySQL 5.6之後引入)來标記上次消費位置。其中更好的選擇是 Gtid,因為該位點對于一套 MySQL 體系(主從或多主)是全局的,而 binlog 位點是單機的,無法支援主備或多主架構。

那為什麼最後選擇了 Debezium 呢?

MySQL CDC 子產品的一個挑戰是如何在 binlog 變更事件中加入表的 Schema 資訊(如标記哪些字段為主鍵,哪些字段可為 null)。Debezium 在這點上處理得很漂亮,它在記憶體中維護了遊戲賣号平台地圖資料庫每張表的 Schema,并且全部寫入一個 backup 的 Kafka Topic 中,每當 binlog 中出現 DDL 語句,便應用這條 DDL 來更新 Schema。而在節點當機,Debezium 執行個體被排程到另一個節點上後,又會通過 backup topic 恢複 Schema 資訊,并從上次消費位點繼續解析 Binlog。

在我們的場景下,另一個挑戰是,我們資料庫已經有大量的現存資料,資料遷移時的現存資料要如何處理。這時,Debezium 獨特的 Snapshot 功能就能幫上忙,它可以實作将現有資料作為一次”插入變更”捕捉到 Kafka 中,是以隻要編寫一次用戶端就能一并處理全量資料與後續的增量資料。

變更分發平台可以有很多種形式,本質上它隻是一個存儲變更的中間件,那麼如何進行選型呢?首先由于變更資料資料量級大,且操作時沒有事務需求,是以先排除了關系型資料庫, 剩下的 NoSQL 如 Cassandra,mq 如 Kafka、RabbitMQ 都可以勝任。其差別在于,消費端到分發平台拉取變更時,假如是 NoSQL 的實作,那麼就能很容易地實作條件過濾等操作(比如某個用戶端隻對特定字段為 true 的消息感興趣); 但 NoSQL 的實作往往會在吞吐量和一緻性上輸給 mq。這裡就是一個設計抉擇的問題,最終我們選擇了 mq,主要考慮的點是:消費端往往是無狀态應用,很容易進行水準擴充,是以假如有條件過濾這樣的需求,我們更希望把這樣的計算壓力放在消費端上。

而在 mq 裡,Kafka 則顯得具有壓倒性優勢。Kafka 本身就有大資料的基因,通常被認為是目前吞吐量最大的消息隊列,同時,使用 Kafka 有一項很适合該場景的特性:Log Compaction。Kafka 預設的過期清理政策(log.cleanup.policy)是delete,也就是删除過期消息,配置為compact則可以啟用 Log Compaction 特性,這時 Kafka 不再删除過期消息,而是對所有過期消息進行”折疊” —— 對于 key 相同的所有消息會,保留最新的一條。

舉個例子,我們對一張表執行下面這樣的操作:對應的在 mq 中的流總共會産生 4 條變更消息,而最下面兩條分别是 id:1 id:2 下的最新記錄,在它們之前的兩條 INSERT 引起的變更就會被 Kafka 删除,最終我們在 Kafka 中看到的就是兩行記錄的最新狀态,而一個持續訂閱該流的消費者則能收到全部4條記錄。

這種行為有一個有趣的名字,流表二相性(Stream Table Durability):Topic 中有無盡的變更消息不斷被寫入,這是流的特質;而 Topic 某一時刻的狀态,恰恰是該時刻對應的資料表的一個快照(參見上面的例子),每條新消息的到來相當于一次 Upsert,這又是表的特性。落到實踐中來講,Log Compaction 對于我們的場景有一個重要應用:全量資料遷移與資料補償,我們可以直接編寫針對每條變更資料的處理程式,就能兼顧全量遷移與之後的增量同步兩個過程;而在資料異常時,我們可以重新回放整個 Kafka Topic —— 該 Topic 就是對應表的快照,針對上面的例子,我們回放時隻會讀到最新的兩條消息,不需要讀全部四條消息也能保證資料正确。

關于 Kafka 作為變更分發平台,最後要說的就是消費順序的問題。大家都知道 Kafka 隻能保證單個 Partition 内消息有序,而對于整個 Topic,消息是無序的。一般的認知是,資料變更的消費為了邏輯的正确性,必須按序消費。按着這個邏輯,我們的 Topic 隻能有單個 Partition,這就大大犧牲了 Kafka 的擴充性與吞吐量。其實這裡有一個誤區,對于資料庫變更抓取,我們隻要保證 同一行記錄的變更有序 就足夠了。還是上面的例子,我們隻需要保證對id:2 這行的 insert 消息先于 update 消息,該行資料最後就是正确的。而實作”同一行記錄變更有序”就簡單多了,Kafka Producer 對帶 key 的消息預設使用 key 的 hash 決定分片,是以隻要用資料行的主鍵作為消息的 key,所有該行的變更都會落到同一個 Parition 上,自然也就有序了。這有一個要求就是 CDC 子產品必須解析出變更資料的主鍵 —— 而這點 Debezium 已經幫助我們解決了。

資料格式的選擇同樣十分重要。首先想到的當然是 json, 目前最常見的消息格式,不僅易讀,開發也都對它十分熟悉。但 json 本身有一個很大的不足,那就是契約性太弱,它的結構可以随意更改:試想假如有一個接口傳回 String,注釋上說這是個json,那我們該怎麼編寫對應的調用代碼呢?是不是需要翻接口文檔,提前獲知這段 json 的 schema,然後才能開始編寫代碼,并且這段代碼随時可能會因為這段 json 的格式改變而 break。

在規模不大的系統中,這個問題并不顯著。但假如在一個擁有上千種資料格式的資料管道上工作,這個問題就會很麻煩,首先當你訂閱一個變更 topic 時,你完全處于懵逼狀态——不知道這個 topic 會給你什麼,當你經過文檔的洗禮與不斷地調試終于寫完了用戶端代碼,它又随時會因為 topic 中的消息格式變更而挂掉。

參考 Yelp 和 Linkedin 的選擇,我們決定使用 Apache Avro 作為統一的資料格式。Avro 依賴模式 Schema 來實作資料結構定義,而 Schema 通常使用 json 格式進行定義,一個典型的 Schema 如下:這裡要介紹一點背景知識,Avro 的一個重要特性就是支援 Schema 演化,它定義了一系列的演化規則,隻要符合該規則,使用不同的 Schema 也能夠正常通信。也就是說,使用 Avro 作為資料格式進行通信的雙方是有自由更疊 Schema 的空間的。

在我們的場景中,資料庫表的 Schema 變更會引起對應的變更資料 Schema 變更,而每次進行資料庫表 Schema 變更就更新下遊消費端顯然是不可能的。是以這時候 Avro 的 Schema 演化機制就很重要了。我們做出約定,同一個 Topic 上傳輸的消息,其 Avro Schema 的變化必須符合演化規則,這麼一來,消費者一旦開始正常消費之後就不會因為消息的 Schema 變化而挂掉。

基于 Kafka 與 Debezium 建構實時資料同步

上圖展現了以變更分發平台(Kafka) 為中心的系統拓撲。其中有一些上面沒有涉及的點:我們使用 Kafka 的 MirrorMaker 解決了跨資料中心問題,使用 Kafka Connect 叢集運作 Debezium 任務實作了高可用與排程能力。

我們再看看 Vimur 是如何解決資料遷移與同步問題的,下圖展示了一次典型的資料同步過程:

下圖是一次典型的資料遷移過程,資料遷移通常伴随着服務拆分與分庫分表:

基于 Kafka 與 Debezium 建構實時資料同步

這裡其實同步任務的編寫是頗有講究的,因為我們一般需要備援很多新的列到新表上,是以單個流中的資料是不夠的,這時有兩種方案:

反查資料庫:邏輯簡單,隻要查詢所需要的備援列即可,但所有相關的列變動都要執行一次反查會對源庫造成額外壓力;Stream Join:Stream Join 通常需要額外存儲的支援,無論用什麼架構實作,最終效果是把反查壓力放到了架構依賴的額外存儲上;這兩種方案見仁見智,Stream Join 邏輯雖然更複雜,但架構本身如 Flink、Kafka Stream 都提供了 DSL 簡化編寫。最終的選型實際上取決于需不需要把反查的壓力分散出去。

Vimur 的另一個深度應用是解決跨庫查詢,分庫分表後資料表 JOIN 操作将很難實作,通常我們都會查詢多個資料庫,然後在代碼中進行 JOIN。這種辦法雖然麻煩,但卻不是不采取的妥協政策(架構來做跨庫 JOIN ,可行但有害,因為有很多性能陷阱必須手動編碼去避免)。然而有些場景這種辦法也很難解決,比如多表 INNER JOIN 後的分頁。這時我們采取的解決方案就是利用 Vimur 的變更資料,将需要 JOIN 的表聚合到搜尋引擎或 NoSQL 中,以文檔的形式提供查詢。

除了上面的應用外,Vimur 還被我們應用于搜尋索引的實時建構、業務事件通知等場景,并計劃服務于緩存重新整理、響應式架構等場景。回顧當初的探索曆程,很多選擇可能不是最好的,但一定是反複實踐後我們認為最适合我們的。假如你也面臨複雜資料層中的資料同步、資料遷移、緩存重新整理、二級索引建構等問題,不妨嘗試一下基于 CDC 的實時資料管道方案。