天天看點

如何建構批流一體資料融合平台的一緻性語義保證?

作者:陳肅

整理:周奇,Apache Flink 社群志願者

本文根據陳肅老師在 Apache Kafka x Flink Meetup 深圳站的分享整理而成,文章首先将從資料融合角度,談一下 DataPipeline 對批流一體架構的看法,以及如何設計和使用一個基礎架構。其次,資料的一緻性是進行資料融合時最基礎的問題。如果資料無法實作一緻,即使同步再快,支援的功能再豐富,都沒有意義。

另外,DataPipeline 目前使用的基礎架構為 Kafka Connect。為實作一緻性的語義保證,我們做了一些額外工作,希望對大家有一定的參考意義。

最後,會提一些我們在應用 Kafka Connect 架構時,遇到的一些現實的工程問題,以及應對方法。盡管大家的場景、環境和資料量級不同,但也有可能會遇到這些問題。希望對大家的工作有所幫助。

如何建構批流一體資料融合平台的一緻性語義保證?

一、批流一體架構

批和流是資料融合的兩種應用形态

下圖來自 Flink 官網。傳統的資料融合通常基于批模式。在批的模式下,我們會通過一些周期性運作的 ETL JOB,将資料從關系型資料庫、檔案存儲向下遊的目标資料庫進行同步,中間可能有各種類型的轉換。

如何建構批流一體資料融合平台的一緻性語義保證?

另一種是 Data Pipeline 模式。與批模式相比相比, 其最核心的差別是将批量變為實時:輸入的資料不再是周期性的去擷取,而是源源不斷的來自于資料庫的日志、消息隊列的消息。進而通過一個實時計算引擎,進行各種聚合運算,産生輸出結果,并且寫入下遊。

現代的一些處理架構,包括 Flink、Kafka Streams、Spark,或多或少都能夠支援批和流兩種概念。隻不過像 Kafka,其原生就是為流而生,是以如果基于 Kafka Connect 做批流一體,你可能需要對批量的資料處理做一些額外工作,這是我今天重點要介紹的。

資料融合的基本問題

如果問題簡化到你隻有一張表,可能是一張 MySQL 的表,裡面隻有幾百萬行資料,你可能想将其同步到一張 Hive 表中。基于這種情況,大部分問題都不會遇到。因為結構是确定的,資料量很小,且沒有所謂的并行化問題。

如何建構批流一體資料融合平台的一緻性語義保證?

但在一個實際的企業場景下,如果做一個資料融合系統,就不可避免要面臨幾方面的挑戰:

第一,“動态性”

資料源會不斷地發生變化,主要歸因于:表結構的變化,表的增減。針對這些情況,你需要有一些相應的政策進行處理。

第二,“可伸縮性”

任何一個分布式系統,必須要提供可伸縮性。因為你不是隻同步一張表,通常會有大量資料同步任務在進行着。如何在一個叢集或多個叢集中進行統一的排程,保證任務并行執行的效率,這是一個要解決的基本問題。

第三,“容錯性”

在任何環境裡你都不能假定伺服器是永遠在正常運作的,網絡、磁盤、記憶體都有可能發生故障。這種情況下一個 Job 可能會失敗,之後如何進行恢複?狀态能否延續?是否會産生資料的丢失和重複?這都是要考慮的問題。

第四,“異構性”

當我們做一個資料融合項目時,由于源和目的地是不一樣的,比如,源是 MySQL,目的地是 Oracle,可能它們對于一個字段類型定義的标準是有差别的。在同步時,如果忽略這些差異,就會造成一系列的問題。

第五,“一緻性”

一緻性是資料融合中最基本的問題,即使不考慮資料同步的速度,也要保證資料一緻。資料一緻性的底線為:資料先不丢,如果丢了一部分,通常會導緻業務無法使用;在此基礎上更好的情況是:源和目的地的資料要完全一緻,即所謂的端到端一緻性,如何做到呢?

Lambda 架構是批流一體化的必然要求

目前在做這樣的平台時,業界比較公認的有兩種架構:一種是 Lambda 架構,Lambda 架構的核心是按需使用批量和流式的處理架構,分别針對批式和流式資料提供相應的處理邏輯。最終通過一個服務層進行對外服務的輸出。

為什麼我們認為 Lambda 架構是批流一體化的必然要求?這好像看起來是沖突的(與之相對,還有一種架構叫 Kappa 架構,即用一個流式處理引擎解決所有問題)。

如何建構批流一體資料融合平台的一緻性語義保證?

實際上,這在很大程度來自于現實中使用者的需求。DataPipeline 在剛剛成立時隻有一種模式,隻支援實時流同步,在我們看來這是未來的一種趨勢。

但後來發現,很多客戶實際上有批量同步的需求。比如,銀行在每天晚上可能會有一些月結、日結,證券公司也有類似的結算服務。基于一些曆史原因,或出于對性能、資料庫配置的考慮,可能有的資料庫本身不能開 change log。是以實際上并不是所有情況下都能從源端擷取實時的流資料。

考慮到上述問題,我們認為一個産品在支撐資料融合過程中,必須能同時支撐批量和流式兩種處理模式,且在産品裡面出于性能和穩定性考慮提供不同的處理政策,這才是一個相對來說比較合理的基礎架構。

資料融合的 Ad-Hoc 模式

具體到做這件事,還可以有兩種基礎的應用模式。假如我需要将資料從 MySQL 同步到 Hive,可以直接建立一個 ETL 的 JOB(例如基于 Flink),其中封裝所有的處理邏輯,包括從源端讀取資料,然後進行變換寫入目的地。在将代碼編譯好以後,就可以放到 Flink 叢集上運作,得到想要的結果。這個叢集環境可以提供所需要的基礎能力,剛才提到的包括分布式,容錯等。

如何建構批流一體資料融合平台的一緻性語義保證?

資料融合的 MQ 模式

另一種模式是 ETL JOB 本身輸入輸出實際上都是面對消息隊列的,實際上這是現在最常使用的一種模式。在這種模式下,需要通過一些獨立的資料源和目的地連接配接器,來完成資料到消息隊列的輸入和輸出。ETL JOB 可以用多種架構實作,包括 Flink、Kafka Streams 等,ETL JOB 隻和消息隊列發生資料交換。

如何建構批流一體資料融合平台的一緻性語義保證?

DP 選擇 MQ 模式的理由

DataPipeline 選擇 MQ 模式,主要有幾點考慮:

第一,在我們産品應用中有一個非常常見的場景:要做資料的一對多分發。資料要進行一次讀取,然後分發到各種不同的目的地,這是一個非常适合消息隊列使用的分發模型。

第二,有時會對一次讀取的資料加不同的處理邏輯,我們希望這種處理不要重新對源端産生一次讀取。是以在多數情況下,都需将資料先讀到消息隊列,然後再配置相應的處理邏輯。

第三,Kafka Connect 就是基于 MQ 模式的,它有大量的開源連接配接器。基于 Kafka Connect 架構,我們可以重用這些連接配接器,節省研發的投入。

第四,當你把資料抽取跟寫入目的地,從處理邏輯中獨立出來之後,便可以提供更強大的內建能力。因為你可以在消息隊列上內建更多的處理邏輯,而無需考慮重新寫整個 Job。

如何建構批流一體資料融合平台的一緻性語義保證?

相應而言,如果你選擇将 MQ 作為所有 JOB 的傳輸通道,就必須要克服幾個缺點:

第一,所有資料的吞吐都經過 MQ,是以 MQ 會成為一個吞吐瓶頸。

第二,因為是一個完全的流式架構,是以針對批量同步,你需要引入一些邊界消息來實作一些批量控制。

第三,Kafka 是一個有持久化能力的消息隊列,這意味着資料留存是有極限的。比如,你将源端的讀到 Kafka Topic 裡面,Topic 不會無限的大,有可能會造成資料容量超限,導緻一些資料丢失。

第四,當批量同步在中間因為某種原因被打斷,無法做續傳時,你需要進行重傳。在重傳過程中,首先要将資料進行清理,如果基于消息隊列模式,清理過程就會帶來額外的工作。你會面臨兩個困境:要麼清空原有的消息隊列,要麼你創造新的消息隊列。這肯定不如像直接使用一些批量同步架構那樣來的直接。

二、一緻性語義保證

使用者需求

先簡單介紹一下使用者對于資料同步方面的一些基本要求:

第一種需求,批量同步需要以一種事務性的方式完成同步

無論是同步一整塊的曆史資料,還是同步某一天的增量,該部分資料到目的地,必須是以事務性的方式出現的。而不是在同步一半時,資料就已經在目的地出現了,這可能會影響下遊的一些計算邏輯。

第二種需求,流式資料盡可能快的完成同步

大家都希望越快越好,但相應的,同步的越快,吞吐量有可能因為你的參數設定出現相應的下降,這可能需要有一個權衡。

第三種需求,批量和流式可能共存于一個 JOB

作為一個資料融合産品,當使用者在使用DataPipeline時,通常需要将存量資料同步完,後面緊接着去接增量。然後存量與增量之間需要進行一個無縫切換,中間的資料不要丢、也不要多。

**第四種需求,按需靈活選擇一緻性語義保證

**

DataPipeline 作為一個産品,在客戶的環境中,我們無法對客戶資料本身的特性提出強制要求。我們不能要求客戶資料一定要有主鍵或者有唯一性的索引。是以在不同場景下,對于一緻性語義保證,使用者的要求也不一樣的:

比如在有主鍵的場景下,一般我們做到至少有一次就夠了,因為在下遊如果對方也是一個類似于關系型資料庫這樣的目的地,其本身就有去重能力,不需要在過程中間做一個強一緻的保證。但是,如果其本身沒有主鍵,或者其下遊是一個檔案系統,如果不在過程中間做額外的一緻性保證,就有可能在目的地産生多餘的資料,這部分資料對于下遊可能會造成非常嚴重的影響。

資料一緻性的鍊路視角

如果要解決端到端的資料一緻性,我們要處理好幾個基本環節:

**第一,在源端做一個一緻性抽取

一緻性抽取是什麼含義?即當資料從通過資料連接配接器寫入到 MQ 時,和與其對應的 offset 必須是以事務方式進入 MQ 的。

第二,一緻性處理

如果大家用過 Flink,Flink 提供了一個端到端一緻性處理的能力,它是内部通過 checkpoint 機制,并結合 Sink 端的二階段送出協定,實作從資料讀取處理到寫入的一個端到端事務一緻性。其它架構,例如 Spark Streaming 和 Kafka Streams 也有各自的機制來實作一緻性處理。

第三,一緻性寫入

在 MQ 模式下,一緻性寫入,即 consumer offset 跟實際的資料寫入目的時,必須是同時持久化的,要麼全都成功,要麼全部失敗。

如何建構批流一體資料融合平台的一緻性語義保證?

第四,一緻性銜接

在 DataPipeline 的産品應用中,曆史資料與實時資料的傳輸有時需要在一個任務中共同完成。是以産品本身需要有這種一緻性銜接的能力,即曆史資料和流式資料,必須能夠在一個任務中,由程式自動完成它們之間的切換。

Kafka Connect 的一緻性保證

Kafka Connect 如何保證資料同步的一緻性?就目前版本,Kafka Connect 隻能支援端到端的 at least once,核心原因在于,在 Kafka Connect 裡面,其 offset 的持久化與資料發送本身是異步完成的。這在很大程度上是為了提高其吞吐量考慮,但相應産生的問題是,如果使用 Kafka Connect,架構本身隻能為你提供 at least once 的語義保證。

在該模式下,如果沒有通過主鍵或下遊應用進行額外地去重,同步過程當中的資料會在極端情況下出現重複,比如源端發送出一批資料已經成功,但 offset 持久化失敗了,這樣在任務恢複之後,之前已經發送成功的資料會再次重新發送一批,而下遊對這種現象完全是不知情的。目的端也是如此,因為 consumer 的 offset 也是異步持久化,就會到導緻有可能資料已經持久化到 Sink,但實際上 consumer offset 還沒有推進。這是我們在應用原生的 Kafka Connect 架構裡遇到最大的兩個問題。

如何建構批流一體資料融合平台的一緻性語義保證?

三、DP 的解決之道

二階段送出協定

DataPipeline 如何解決上述問題?首先,需要用協定的方式保證每一步都做成事務。一旦做成事務,由于每個環節都是解耦的,其最終資料就可以保證一緻性。下圖為二階段送出協定的最基礎版本,接下來為大家簡單介紹一下。

如何建構批流一體資料融合平台的一緻性語義保證?

首先,在二階段送出協定中,對于分布式事務的參與方,在 DataPipeline 的場景下為資料寫入與 offset 寫入,這是兩個獨立元件。兩者之間的寫入操作由 Coordinator 進行協調。第一步是一個 prepare 階段,每一個參與方會将資料寫入到自己的目的地,具體持久化的位置取決于具體應用的實作。

第二步,當 prepare 階段完成之後,Coordinator 會向所有參與者發出 commit 指令,所有參與者在完成 commit 之後,會發出一個 ack,Coordinator 收到 ack 之後,事務就完成了。如果出現失敗,再進行相應的復原操作。其實在分布式資料庫的設計領域中,單純應用一個二階段送出協定會出現非常多的問題,例如 Coordinator 本身如果不是高可用的,在過程當中就有可能出現事務不一緻的問題。

是以應用二階段送出協定,最核心的問題是如何保證 Coordinator 高可用。所幸在大家耳熟能詳的各種架構裡,包括 Kafka 和 Flink,都能夠通過分布式一緻協定實作 Coordinator 高可用,這也是為什麼我們能夠使用二階段送出來保證事務性。

Kafka 事務消息原理

關于 Kafka 事務消息原理,網上有很多資料,在此簡單說一下能夠達到的效果。Kafka 通過二階段送出協定,最終實作了兩個最核心的功能。

第一,一緻性抽取

上文提到資料要被發送進 Kafka,同時 offset 要被持久化到 Kafka,這是對兩個不同 Topic 的寫入。通過利用 Kafka 事務性消息,我們能夠保證 offset 的寫入和資料的發送是一個事務。如果 offset 沒有持久化成功,下遊是看不到這批資料的,這批資料實際上最終會被丢棄掉。

如何建構批流一體資料融合平台的一緻性語義保證?

是以對于源端的發送,我們對 Kafka Connect 的 Source Worker 做了一些改造,讓其能夠提供兩種模式,如果使用者的資料本身是具備主鍵去重能力的,就可以繼續使用 Kafka Connect 原生的模式。

如果使用者需要強一緻時,首先要開啟一個源端的事務發送功能,這就實作了源端的一緻性抽取。其可以保證資料進 Kafka 一端不會出現資料重複。這裡有一個限制,即一旦要開啟一緻性抽取,根據 Kafka 必須要将 ack 設定成 all,這意味着一批資料有多少個副本,其必須能夠在所有的副本所在的 broker 都已經應答的情況下,才可以開始下一批資料的寫入。盡管會造成一些性能上的損失,但為了實作強一緻,你必須要接受這一事實。

**第二,一緻性處理

事務性消息最早就是為 Kafka Streams 設計和準備的。可以寫一段 Kafka Streams 應用,從 Kafka 裡讀取資料,然後完成轉化邏輯,進而将結果再輸出回 Kafka。Sink 端再從 Kafka 中消費資料,寫入目的地。

資料一緻性寫入

之前簡要談了一下二階段送出協定的原理,DataPipeline 實作的方式不算很深奧,基本是業界的一種統一方式。其中最核心的點是,我們将 consumer offset 管理從 Kafka Connect 架構中獨立出來,實作事務一緻性送出。另外,在 Sink 端封裝了一個類似于 Flink 的 TwoPhaseCommitSinkFunction 方式,其定義了 Sink 若要實作一個二階段送出所必須要實作的一些功能。

如何建構批流一體資料融合平台的一緻性語義保證?

DataPipeline 将 Sink Connector 分為兩類,一類是 Connector 本身具備了事務能力,比如絕大部分的關系型資料庫,隻需将 offset 跟資料同時持久化到目的地即可。額外的可能需要有一張 offset 表來記錄送出的 offset。還有一類 Sink 不具備事務性能力,類似像 FTP、OSS 這些對象存儲,我們需要去實作一個二階段送出協定,最終才能保證 Sink 端的資料能夠達到一緻性寫入。

資料一緻性銜接

關于批量資料與實時資料如何銜接的問題,主要有兩個關鍵點:

第一,當開始進行一個批量資料同步時,以關系型資料庫為例,你應該拿到當時一個整體資料的 Snapshot,并在一個事務中同時記錄當時對應的日志起始值。以 MySQL 為例,當要擷取一個 Binlog 起始偏移量時,需要開啟一個 START TRANSACTION WITH CONSISTENT SNAPSHOT,這樣才能保證完成全量之後,後期的讀取增量日志同步不會産生重複資料。

第二,如果采用增量同步模式,則必須根據實際的資料業務領域,采用一種比較靈活的增量表達式,才能避免讀到寫到一半的資料。比如在你的資料中,其 ID 是一個完全自增,沒有任何重複的可能,此時隻需每次單純的大于上一次同步的最後一條記錄即可。

但如果是一個時間戳,無論精度多高,都有可能在資料庫産生相同的時間戳,是以安全的做法是每次疊代時,取比目前時間稍微少一點,保證留出一個安全時間,比如五秒甚至一分鐘,這樣你永遠不會讀到一些時間戳可能會産生沖突的這部分資料,避免遺漏資料。這是一個小技巧,但如果沒有注意,在使用過程中就會産生各種各樣的問題。

還有一點是上面提及的,如何能夠在一個流式架構實作批量同步的一緻性,對于所有的流式架構,需要引入一些邊界條件來标志着一次批量同步的開始和結束。DataPipeline 在每次批量發送開始和結束後,會引入一些控制量信号,然後在 Sink端進行相應處理。同樣為了保證事務一緻性,在 Sink 端處理這種批量同步時,依然要做一些類似于二階段送出這樣的方式,避免在一些極端情況下出現資料不一緻的問題。

四、問題和思考

上文介紹的是 DataPipeline 如何基于 Kafka Connect 做事務同步一緻性的方案。

DataPipeline 在使用 Kafka Connect 過程中遇到過一些問題,目前大部分已經有一些解決方案,還有少量問題,可能需要未來采用新的方法/架構才能夠更好的解決。

第一,反壓的問題

Kafka Connect 設計的邏輯是希望實作源端和目的端完全解耦,這種解偶本身是一個很好的特性。但也帶來一些問題,源和目的地的 task 完全不知道彼此的存在。剛才我提到 Kafka 有容量限制,不能假定在一個客戶環境裡面,會給你無限的磁盤來做緩沖。通常我們在客戶那邊預設 Topic 為 100G 的容量。如果源端讀的過快,大量資料會在 Kafka 裡堆積,目的端沒有及時消費,就有可能出現資料丢失,這是一個非常容易出現的問題。

怎麼解決?DataPipeline 作為一個産品,在 Kafka Connect 之上,做了控制層,控制層中有像 Manager 這樣的邏輯元件,會監控每一個 Topic 消費的 lag,當達到一定門檻值時,會對源端進行限速,保證源和目的地盡可能比對。

第二,資源隔離

Connect Worker 叢集無法對 task 進行資源預留,多個 task 并行運作會互相影響。Worker 的 rest 接口是隊列式的,單個叢集任務過多會導緻啟停緩慢。

我們正在考慮利用外部的資源排程架構,例如 K8s 進行 worker 節點管理;以及通過路由規則将不同優先級任務運作在不同的 worker 叢集上,實作預配置設定和共享資源池的靈活配置。

第三,Rebalance

在 2.3 版本以前,Kafka Connect 的 task rebalance 采用 stop-the-world 模式,牽一發動全身。在 2.3 版本之後,已經做了非常大優化,改為了具有粘性的 rebalance。是以如果使用 Kafka Connect,強烈推薦一定要更新到 2.3 以上的版本,也就是目前的最新版本。

五、未來演進路線

基于 MQ 模式的架構,針對大批量資料的同步,實際上還是容易出現性能瓶頸。主要瓶頸是在 MQ 的叢集,我們并不能在客戶環境裡無限優化 Kafka 叢集的性能,因為客戶提供的硬體資源有限。是以一旦客戶給定了硬體資源,Kafka 吞吐的上限就變為一個固定值。是以針對批量資料的同步,可能未來會考慮用記憶體隊列替代 MQ。

同時,會采用更加靈活的 Runtime,主要是為了解決剛才提到的預配置設定資源池和共享資源池的統一管理問題。

另外,關于資料品質管理,實際上金融類客戶對資料品質的一緻性要求非常高。是以對于一些對資料品質要求非常高的客戶,我們考慮提供一些後校驗功能,尤其是針對批量同步。

▼ Apache Flink 社群推薦 ▼

Apache Flink 及大資料領域頂級盛會 Flink Forward Asia 2019 重磅開啟,大會議程精彩上線,了解 Flink Forward Asia 2019 的更多資訊,請檢視:

https://developer.aliyun.com/special/ffa2019

首屆 Apache Flink 極客挑戰賽重磅開啟,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點選:

https://tianchi.aliyun.com/markets/tianchi/flink2019