天天看點

Apache Flink 在汽車之家的應用與實踐

本文整理自汽車之家實時計算平台負責人邸星星在 Flink Forward Asia 2020 分享的議題《Apache Flink 在汽車之家的應用及實踐》。主要内容包括:
  1. 背景及現狀
  2. AutoStream 平台
  3. 基于 Flink 的實時生态建設
  4. 後續規劃

GitHub 位址

https://github.com/apache/flink

歡迎大家給 Flink 點贊送 star~

一、背景及現狀

1. 第一階段

在 2019 年之前,汽車之家的大部分實時業務都是運作在 Storm 之上的。Storm 作為早期主流的實時計算引擎,憑借簡單的 Spout 和 Bolt 程式設計模型以及叢集本身的穩定性,俘獲了大批使用者,我們在 2016 年搭建了 Storm 平台。

Apache Flink 在汽車之家的應用與實踐

随着實時計算的需求日漸增多,資料規模逐漸增大,Storm 在開發及維護成本上都凸顯了不足,這裡列舉幾個痛點:

  1. 開發成本高

    我們一直是用的 Lambda 架構,會用 T+1 的離線資料修正實時資料,即最終以離線資料為準,是以計算口徑實時要和離線完全保持一緻,實時資料開發的需求文檔就是離線的 SQL,實時開發人員的核心工作就是把離線的 SQL 翻譯成 Storm 代碼,期間雖然封裝了一些通用的 Bolt 來簡化開發,但把離線動辄幾百行的 SQL 精準地翻譯成代碼還是很有挑戰的,并且每次運作都要經過打包、上傳、重新開機的一系列的繁瑣操作,調試成本很高。

  2. 計算低效

    Storm 對狀态支援的不好,通常需要借助 Redis、HBase 這類 kv 存儲維護中間狀态,我們之前是強依賴 Redis。比如常見的計算 UV 的場景,最簡單的辦法是使用 Redis 的 sadd 指令判斷 uid 是否為已經存在,但這種方法會帶來很高的網絡 IO,同時如果沒有提前報備的大促或搞活動導緻流量翻倍的情況,很容易把 Redis 記憶體搞滿,運維同學也會被殺個措手不及。同時 Redis 的吞吐能力也限制了整個作業的吞吐量。

  3. 難以維護、管理

    由于采用編寫 Storm 代碼方式開發,難以分析中繼資料及血緣關系,同時可讀性差,計算口徑不透明,業務交接成本很高。

  4. 對數倉不友好

    資料倉庫團隊是直接對接業務需求的團隊,他們更熟悉基于 Hive 的 SQL 開發模式,通常都不擅長 Storm 作業的開發,這導緻一些原本是實時的需求,隻能退而求其次選擇 T+1 的方式給出資料。

在這個階段,我們支援了最基本的實時計算需求,因為開發門檻比較高,很多實時業務都是由我們平台開發來完成,既做平台,又做資料開發,精力分散很嚴重。

2. 第二階段

Apache Flink 在汽車之家的應用與實踐

我們從 2018 年開始調研 Flink 引擎,其相對完備的 SQL 支援,天生對狀态的支援吸引了我們,在經過學習調研後,2019 年初開始設計開發 Flink SQL 平台,并于 2019 年中上線了 AutoStream 1.0 平台。平台上線之初就在倉庫團隊、監控團隊和運維團隊得以應用,能夠快速被使用者主要得益于以下幾點:

  1. 開發、維護成本低:汽車之家大部分的實時任務可以用 Flink SQL + UDF 實作。平台提供常用的 Source 和 Sink,以及業務開發常用的 UDF,同時使用者可以自己編寫 UDF。基于 "SQL + 配置" 的方式完成開發,可以滿足大部分需求。對于自定義任務,我們提供友善開發使用的 SDK,助力使用者快速開發自定義 Flink 任務。平台面向的使用者已經不隻是專業的資料開發人員了,普通開發、 測試、運維人員經過基本的學習都可以在平台上完成日常的實時資料開發工作,實作平台賦能化。資料資産可管理,SQL 語句本身是結構化的,我們通過解析一個作業的 SQL,結合 source、 sink 的 DDL,可以很容易的知道這個作業的上下遊,天然保留血緣關系。
  2. 高性能:Flink 可以完全基于狀态 (記憶體,磁盤) 做計算,對比之前依賴外部存儲做計算的場景,性能提升巨。在 818 活動壓測期間,改造後的程式可以輕松支援原來幾十倍流量的實時計算,且橫向擴充性能十分良好。
  3. 全面的監控報警:使用者将任務托管在平台上,任務的存續由平台負責,使用者專注于任務本身的邏輯開發本身即可。對于 SQL 任務,SQL 的可讀性極高,便于維護;對于自定義任務,基于我們 SDK 開發,使用者可以更專注于梳理業務邏輯上。不論是 SQL 任務還是 SDK,我們都内嵌了大量監控,并與報警平台關聯,友善使用者快速發現分析定位并修複任務,提高穩定性。
  4. 賦能業務:支援數倉分層模型,平台提供了良好的 SQL 支援,數倉人員可以借助 SQL,将離線數倉的建設經驗應用于實時數倉的建設上,自平台上線後,數倉逐漸開始對接實時計算需求。
Apache Flink 在汽車之家的應用與實踐

痛點:

  1. 易用性有待提高,比如使用者無法自助管理 UDF,隻能使用平台内置的 UDF 或者把打好的 jar 包發給平台管理者,通過人工的方式處理上傳問題。
  2. 随着平台作業量的高速增長,平台 on-call 成本非常高。首先我們經常面對一些新使用者的基礎問題:
    1. 平台的使用問題;
    2. 開發過程中遇到的問題,比如為什麼打包報錯;
    3. Flink UI 的使用問題;
    4. 監控圖形的含義,如何配置報警。
    還有一些不太容易快速給出答案的問題:
    1. Jar 包沖突;
    2. 為什麼消費 Kafka 延遲;
    3. 任務為什麼報錯。

    尤其是延遲問題,我們常見的資料傾斜,GC,反壓問題可以直接引導使用者去 Flink UI 和監控圖表上去檢視,但有時候還是需要手動去伺服器上檢視 jmap、jstack 等資訊,有時候還需要生成火焰圖來幫助使用者定位性能問題。

    初期我們沒有和營運團隊合作,完全是我們開發人員直接對接處理這些問題,雖然期間補充了大量的文檔,但是整體上 on-call 成本還是很高。

  3. 在 Kafka 或 Yarn 出現故障時,沒有快速恢複的方案,當面對一些重保業務時,有些捉襟見肘。衆所周知,沒有永遠穩定,不出故障的環境或元件,當有重大故障出現時,需要有快速恢複業務的應對方案。
  4. 資源沒有合理管控,存在比較嚴重的資源浪費的情況。随着使用平台開發任務的使用者不斷增加,平台的作業數也不斷增加。有些使用者不能很好的把控叢集資源的使用,經常出現過多申請資源的問題,導緻作業運作效率低下甚至空閑,造成了資源的浪費。

在 AutoStream1.0 平台這個階段,基于 SQL 開發的方式極大地降低了實時開發的門檻,各業務方可以自己實作實時業務的開發,同時數倉同學經過簡單的學習後,就開始對接實時業務,将我們平台方從大量的業務需求中釋放出來,讓我們可以專心做平台方面的工作。

3. 目前階段

Apache Flink 在汽車之家的應用與實踐

針對上面的幾個方面,我們有針對性行的做了以下幾點更新:

  1. 引入 Jar Service:支援使用者自助上傳 UDF jar 包,并在 SQL 片段中自助引用,實作自助管理 UDF。同時自定義作業也可以配置 Jar Service 中的 Jar,面對多個作業共用同一個 Jar 的場景,使用者隻需要在作業中配置 Jar Service 中的 jar 包路徑就可以,避免每次上線都重複上傳 Jar 的繁瑣操作;
  2. 自助診斷:我們開發了動态調整日志級别、自助檢視火焰圖等功能,友善使用者自己定位問題,減輕我們日常 on-call 成本;
  3. 作業健康檢查功能:從多個次元分析,為每個 Flink 作業打分,每個低分項都相應的給出建議;
  4. Flink 作業級别的快速容災恢複:我們建設了兩套 YARN 環境,每一個 YARN 對應一個單獨的 HDFS,兩個 HDFS 之前通過 SNAPSHOT 方式進行 Checkpoint 資料的雙向複制,同時在平台上增加了切換叢集的功能,在一個 YARN 叢集不可用的情況下,使用者可以自助在平台上,選擇備用叢集的 Checkpoint;
  5. Kafka 多叢集架構支援:使用我們自研的 Kafka SDK,支援快速切換 Kafka 叢集;
  6. 對接預算系統:每個作業占用的資源都直接對應到預算團隊,這樣一定程度上保證資源不會被其他團隊占用,同時每個團隊的預算管理者可以檢視預算使用明細,了解自己的預算支援了團隊内的哪些業務。

目前使用者對平台的使用已經趨于熟悉,同時自助健康檢查和自助診斷等功能的上線,我們平台方的日常 on-call 頻率在逐漸降低,開始逐漸進入平台建設的良性循環階段。

4. 應用場景

Apache Flink 在汽車之家的應用與實踐

汽車之家用于做實時計算的資料主要分為三類:

  1. 用戶端日志,也就是我們内部說的點選流日志,包括使用者端上報的啟動日志、時長日志、PV 日志、點選日志以及各類事件日志,這類主要是使用者行為日志,是我們建設實時數倉中流量寬表、UAS 系統、實時畫像的基礎,在這之上還支援了智能搜尋、智能推薦等線上業務;同時基礎的流量資料也用于支援各業務線的流量分析、實時效果統計,支援日常營運決策。
  2. 服務端日志,包括 nginx 日志、各類後端應用産生的日志、各種中間件的日志。這些日志資料主要用于後端服務的健康監測、性能監控等場景。
  3. 業務庫的實時變更記錄,主要有三種:MySQL 的 binlog,SQLServer 的 CDC,TiDB 的 TiCDC 資料,基于這些實時的資料變更記錄,我們通過對各種内容資料的抽象與規範,建設了内容中台、資源池等基礎服務;也有一些做簡單邏輯的業務資料實時統計場景,結果資料用于實時大屏、羅盤等,做資料展現。

以上這三類資料都會實時寫入 Kafka 叢集,在 Flink 叢集中針對不同場景進行計算,結果資料寫入到 Redis、MySQL、Elasticsearch、HBase、Kafka、Kylin 等引擎中,用于支援上層應用。

下面列舉了一些應用場景:

Apache Flink 在汽車之家的應用與實踐

5. 叢集規模

目前 Flink 叢集伺服器 400+,部署模式為 YARN (80%) 和 Kubernetes,運作作業數 800+,日計算量 1 萬億,峰值每秒處理資料 2000 萬條。

Apache Flink 在汽車之家的應用與實踐

二、AutoStream 平台

1. 平台架構

Apache Flink 在汽車之家的應用與實踐

上面是 AutoStream 平台目前的整體架構,主要是以下幾部分内容:

  1. AutoStream core System

    這是我們平台的核心服務,負責對中繼資料服務、Flink 用戶端服務、Jar 管理服務及互動結果查詢服務進行整合,通過前端頁面把平台功能暴露給使用者。

    主要包括 SQL 和 Jar 作業的管理、庫表資訊的管理、UDF 管理、操作記錄及曆史版本的管理、健康檢查、自助診斷、報警管理等子產品,同時提供對接外部系統的能力,支援其他系統通過接口方式管理庫表資訊、SQL 作業資訊及作業啟停操作等。基于 Akka 任務的生命周期管理和排程系統提供了高效,簡單,低延遲的操作保障,提升了使用者使用的效率和易用性。

  2. 中繼資料服務 (Catalog-like Unified Metastore)

    主要對應 Flink Catalog 的後端實作,除了支援基本的庫表資訊管理外,還支援庫表粒度的權限控制,結合我們自身的特點,支援使用者組級别的授權。

    底層我們提供了 Plugin Catalog 機制,既可以用于和 Flink 已有的 Catalog 實作做內建,也可以友善我們嵌入自定義的 Catalogs,通過 Plugin 機制可以很容易的重用 HiveCatalog,JdbcCatalog 等,進而保證了庫表的周期的一緻性。

    同時中繼資料服務還負責對使用者送出的 DML 語句進行解析,識别目前作業的依賴的表資訊,用于作業的分析及送出過程,同時可以記錄血緣關系。

  3. Jar Service

    平台提供的各類 SDK 在 Jar Service 上進行統一管理,同時使用者也可以在平台上把自定義 Jar、UDF jar 等送出到 Jar Service 上統一管理,然後在作業中通過配置或 DDL 引用。

  4. Flink 用戶端服務 (Customed Flink Job Client)

    負責把平台上的作業轉化成 Flink Job 送出到 Yarn 或 Kubernetes 上,我們在這一層針對 Yarn 和 Kubernetes 做了抽象,統一兩種排程架構的行為,對外暴露統一接口及規範化的參數,弱化 Yarn 和 Kubernetes 的差異,為 Flink 作業在兩種架構上無縫切換打下了良好的基礎。

    每個作業的依賴不盡相同,我們除了對基礎依賴的管理以外,還需要支援個性化的依賴。比如不同版本的 SQL SDK,使用者自助上傳的 Jar、UDF 等,是以不同作業的送出階段需要做隔離。

    我們采用的是 Jar service + 程序隔離的方式,通過和 Jar Service 對接,根據作業的類型和配置,選用相應的 Jar,并且送出單獨的程序中執行,實作實體隔離。

  5. 結果緩存服務 (Result Cache Serivce)

    是一個簡易的緩存服務,用于 SQL 作業開發階段的線上調試場景。當我們分析出使用者的 SQL 語句,将 Select 語句的結果集存入緩存服務中;然後使用者可以在平台上通過選擇 SQL 序号 (每個完整的 SELECT 語句對應一個序号),實時檢視 SQL 對應的結果資料,友善使用者開發與分析問題。

  6. 内置Connectors (Source & Sink)

    最右側的部分主要是各種 Source、Sink 的實作,有一些是重用 Flink 提供的 connector,有一些是我們自己開發的 connector。

    針對每一種 connector 我們都添加了必要 Metric,并配置成單獨的監控圖表,友善使用者了解作業運作情況,同時也為定位問題提供資料依據。

2. 基于 SQL 的開發流程

在平台提供以上功能的基礎上,使用者可以快速的實作 SQL 作業的開發:

  1. 建立一個 SQL 任務;
  2. 編寫 DDL 聲明 Source 和 Sink;
  3. 編寫 DML,完成主要業務邏輯的實作;
  4. 線上檢視結果,若資料符合預期,添加 INSERT INTO 語句,寫入到指定 Sink 中即可。
Apache Flink 在汽車之家的應用與實踐

平台預設會儲存 SQL 每一次的變更記錄,使用者可以線上檢視曆史版本,同時我們會記錄針對作業的各種操作,在作業維護階段可以幫助使用者追溯變更曆史,定位問題。

下面是一個 Demo,用于統計當天的 PV、UV 資料:

Apache Flink 在汽車之家的應用與實踐

3. 基于 Catalog 的中繼資料管理

Apache Flink 在汽車之家的應用與實踐

中繼資料管理的主要内容:

  1. 支援權限控制:除了支援基本的庫表資訊管理外,還支援表粒度的權限控制,結合我們自身的特點,支援使用者組級别的授權;
  2. Plugin Catalog 機制:可以組合多種其他 Catalog 實作,複用已有的 Catalog;
  3. 庫表生命周期行為統一:使用者可以選擇平台上的表和底層存儲的生命周期統一,避免兩邊分别維護,重複建表;
  4. 新老版本完全相容:由于在 AutoStream 1.0 的時候,我們沒有單獨引入 Metastore 服務,此外 1.0 時期的 DDL SQL 解析子產品是自研的元件。是以在建設 MetaStore 服務時,需要考慮曆史作業和曆史庫表資訊相容的問題。
    1. 對于庫表資訊,新的 MetaStore 在底層将新版和舊版的庫表資訊轉換成統一的存儲格式,進而保證了庫表資訊的相容性。
    2. 對于作業,這裡我們通過抽象接口,并分别提供 V1Service 和 V2Service 兩種實作路徑,保證了新老作業在使用者層面的相容。

下面是幾個子產品和 Metastore 互動的示意圖:

Apache Flink 在汽車之家的應用與實踐

4. UDXF 管理

我們引入了 Jar Service 服務用來管理各種 Jar,包括使用者自定義作業、平台内部 SDK 元件、UDXF 等,在 Jar Service 基礎上我們可以很容易的實作 UDXF 的自助管理,在 On k8s 的場景下,我們提供了統一的鏡像,Pod 啟動後會從 Jar Service 下載下傳對應的 Jar 到容器内部,用于支援作業的啟動。

使用者送出的 SQL 中如果包含 Function DDL,我們會在 Job Client Service 中會解析 DDL,下載下傳對應的 Jar 到本地。

為了避免和其他作業有依賴沖突,我們每次都會單獨啟動一個子程序來完成作業送出的操作。UDXF Jar 會被并加入到 classpath 中,我們對 Flink 做了一些修改,作業送出時會把這個 Jar 一并上傳到 HDFS 中;同時 AutoSQL SDK 會根據函數名稱和類名為目前作業注冊 UDF。

Apache Flink 在汽車之家的應用與實踐

5. 監控報警及日志收集

得益于 Flink 完善的 Metric 機制,我們可以友善的添加 Metric,針對 Connector,我們内嵌了豐富的 Metric,并配置了預設的監控看闆,通過看闆可以檢視 CPU、記憶體、JVM、網絡傳輸、Checkpoint、各種 Connector 的監控圖表。同時平台和公司的雲監控系統對接,自動生成預設的報警政策,監控存活狀态、消費延遲等關鍵名額。同時使用者可以在雲監控系統修改預設的報警政策,添加新的報警項實作個性化監控報警。

日志通過雲 Filebeat 元件寫入到 Elasticsearch 叢集,同時開放 Kibana 供使用者查詢。

Apache Flink 在汽車之家的應用與實踐

整體的監控報警及日志收集架構如下:

Apache Flink 在汽車之家的應用與實踐

6. 健康檢查機制

随着作業數的高速增長,出現了很多資源使用不合理的情況,比如前面提到的資源浪費的情況。使用者大多時候都是在對接新需求,支援新業務,很少回過頭來評估作業的資源配置是否合理,優化資源使用。是以平台規劃了一版成本評估的模型,也就是現在說的健康檢查機制,平台每天會針對作業做多元度的健康評分,使用者可以随時在平台上檢視單個作業的得分情況及最近 30 天的得分變化曲線。

低分作業會在使用者登入平台時進行提示,并且定期發郵件提醒使用者進行優化、整改,在優化作業後使用者可以主動觸發重新評分,檢視優化效果。

Apache Flink 在汽車之家的應用與實踐

我們引入了多元度,基于權重的評分政策,針對 CPU、記憶體使用率、是否存在空閑 Slot、GC 情況、Kafka 消費延遲、單核每秒處理資料量等多個次元的名額結合計算拓補圖進行分析評估,最終産生一個綜合分。

每個低分項都會顯示低分的原因及參考範圍,并顯示一些指導建議,輔助使用者進行優化。

我們新增了一個 Metric,用一個 0%~100% 的數字展現 TaskManagner CPU 使用率。這樣使用者可以直覺的評估 CPU 是否存在浪費的情況。

Apache Flink 在汽車之家的應用與實踐

下面是作業評分的大緻流程:首先我們會收集和整理運作作業的基本資訊和 Metrics 資訊。然後應用我們設定好的規則,得到基本評分和基礎建議資訊。最後将得分資訊和建議整合,綜合評判,得出綜合得分和最終的報告。使用者可以通過平台檢視報告。對于得分較低的作業,我們會發送報警給作業的歸屬使用者。

Apache Flink 在汽車之家的應用與實踐

7. 自助診斷

如之前提到的痛點,使用者定位線上問題時,隻能求助于我們平台方,造成我們 on-call 工作量很大,同時使用者體驗也不好,鑒于此,是以我們上線了以下功能:

  1. 動态修改日志級别:我們借鑒了 Storm 的修改日志級别的方式,在 Flink 上實作了類似功能,通過擴充 REST API 和 RPC 接口的方法,支援修改指定 Logger 的到某一日志級别,并支援設定一個過期時間,當過期後,改 Logger 的日志會重新恢複為 INFO 級别;
  2. 支援自助檢視線程棧和堆記憶體資訊:Flink UI 中已經支援線上檢視線程棧 (jstack),我們直接複用了這個接口;還額外增加了檢視堆記憶體 (jmap) 的接口,友善使用者線上檢視;
  3. 支援線上生成、檢視火焰圖:火焰圖是定位程式性能問題的一大利器,我們利用了阿裡的 arthas 元件,為 Flink 增加了線上檢視火焰圖的能力,使用者遇到性能問題時,可以快速評估性能瓶頸。
Apache Flink 在汽車之家的應用與實踐

8. 基于 Checkpoint 複制的快速容災

Apache Flink 在汽車之家的應用與實踐

當實時計算應用在重要業務場景時,單個 Yarn 叢集一旦出現故障且短期内不可恢複,那麼可能會對業務造成較大影響。

在此背景下,我們建設了 Yarn 多叢集架構,兩個獨立的 Yarn 各自對應一套獨立的 HDFS 環境,checkpoint 資料定期在兩個 HDFS 間互相複制。目前 checkpoint 複制的延遲穩定在 20 分鐘内。

同時,在平台層面,我們把切換叢集的功能直接開放給使用者,使用者可以線上檢視 checkpoint 的複制情況,選擇合适的 checkpoint 後 (當然也可以選擇不從 checkpoint 恢複) 進行叢集切換,然後重新開機作業,實作作業在叢集間的相對平滑的遷移。

三、基于 Flink 的實時生态建設

AutoStream 平台的核心場景是支援實時計算開發人員的使用,使實時計算開發變得簡單高效、可監控、易運維。同時随着平台的逐漸完善,我們開始摸索如何對 AutoStream 平台進行重用,如何讓 Flink 應用在更多場景下。重用 AutoStream 有以下幾點優勢:

  1. Flink 本身是優秀的分布式計算架構,有着較高的計算性能,良好的容錯能力和成熟的狀态管理機制,社群蓬勃發展,功能及穩定性有保障;
  2. AutoStream 有着完善的監控和報警機制,作業運作在平台上,無需單獨對接監控系統,同時 Flink 對 Metric 支援很友好,可以友善的添加新的 Metric;
  3. 大量的技術沉澱和營運經驗,通過兩年多的平台建設,我們在 AutoStream 上已經實作了較為完善的 Flink 作業全生命周期的管理,并建設了 Jar Service 等基礎元件,通過簡單的上層接口包裝,就可以對接其他系統,讓其他系統具備實時計算的能力;
  4. 支援 Yarn 和 Kubernetes 部署。
Apache Flink 在汽車之家的應用與實踐

基于以上幾點,我們在建設其他系統時,優先重用 AutoStream 平台,以接口調用的方式進行對接,将 Flink 作業全流程的生命周期,完全托管給 AutoStream 平台,各系統優先考慮實作自身的業務邏輯即可。

我們團隊内的 AutoDTS (接入及分發任務) 和 AutoKafka (Kafka 叢集複制) 系統目前就是依托于 AutoStream 建設的。簡單介紹一下內建的方式,以 AutoDTS 為例:

  1. 把任務 Flink 化,AutoDTS 上的接入、分發任務,都是以 Flink 作業的形式存在;
  2. 和 AutoStream 平台對接,調用接口實作 Flink 作業的建立、修改、啟動、停止等操作。這裡 Flink 作業既可以是 Jar,也可以是 SQL 作業;
  3. AutoDTS 平台根據業務場景,建設個性化的前端頁面,個性化的表單資料,表單送出後,可以将表單資料存儲到 MySQL 中;同時需要把作業資訊以及 Jar 包位址等資訊組裝成 AutoStream 接口定義的格式,通過接口調用在 AutoStream 平台自動生成一個 Flink 任務,同時儲存這個 Flink 任務的 ID;
  4. 啟動 AutoDTS 的一個接入任務,直接調用 AutoStream 接口就實作了作業的啟動。

1. AutoDTS 資料接入分發平台

AutoDTS 系統主要包含兩部分功能:

  1. 資料接入:将資料庫中的變更資料 (Change log) 實時寫入到 Kafka;
  2. 資料分發:将接入到 Kafka 的資料,實時寫入到其他存儲引擎。

1.1 AutoDTS 資料接入

下面是資料接入的架構圖:

Apache Flink 在汽車之家的應用與實踐

我們維護了基于 Flink 的資料接入 SDK 并定義了統一的 JSON 資料格式,也就是說 MySQL Binlog,SQL Server、 TiDB 的變更資料接入到 Kafka 後,資料格式是一緻的,下遊業務使用時,基于統一格式做開發,無需關注原始業務庫的類型。

資料接入到 Kafka Topic 的同時,Topic 會自動注冊為一張 AutoStream 平台上的流表,友善使用者使用。

資料接入基于 Flink 建設還有一個額外的好處,就是可以基于 Flink 的精确一次語義,低成本的實作精确一次資料接入,這對支援資料準确性要求很高的業務來說,是一個必要條件。

目前我們在做把業務表中的全量資料接入 Kafka Topic 中,基于 Kafka 的 compact 模式,可以實作 Topic 中同時包含存量資料和增量資料。這對于資料分發場景來說是十分友好的,目前如果想把資料實時同步到其他存儲引擎中,需要先基于排程系統,接入一次全量資料,然後再開啟實時分發任務,進行變更資料的實時分發。有了 Compact Topic 後,可以省去全量接入的操作。Flink1.12 版本已經對 Compact Topic 做支援,引入 upsert-kafka Connector [1]

[1]

https://cwiki.apache.org/confluence/display/Flink/FLIP-149%3A+Introduce+the+upsert-kafka+Connector

下面是一條樣例資料:

Apache Flink 在汽車之家的應用與實踐

預設注冊到平台上的流表是 Schemaless 的,使用者可以用 JSON 相關的 UDF 擷取其中的字段資料。

Apache Flink 在汽車之家的應用與實踐

下面是使用流表的示例:

Apache Flink 在汽車之家的應用與實踐

1.2 AutoDTS 資料分發

Apache Flink 在汽車之家的應用與實踐

我們已經知道,接入到 Kafka 中的資料是可以當做一張流表來使用的,而資料分發任務本質上是把這個流表的資料寫入到其他存儲引擎,鑒于 AutoStream 平台已經支援多種 Table Sink (Connector),我們隻需要根據使用者填寫的下遊存儲的類型和位址等資訊,就可以通過拼裝 SQL 來實作資料的分發。

通過直接重用 Connector 的方式,最大化的避免了重複開發的工作。

下面是一個分發任務對應的 SQL 示例:

Apache Flink 在汽車之家的應用與實踐

2. Kaka 多叢集架構

Kafka 在實際應用中,有些場景是需要做 Kafka 多叢集架構支援的,下面列舉幾個常見的場景:

  • 資料備援災備,實時複制資料到另一個備用叢集,當一個 Kafka 叢集不可用時,可以讓應用切換到備用叢集,快速恢複業務;
  • 叢集遷移,當機房合同到期,或者上雲時,都需要做叢集的遷移,此時需要把叢集資料整體複制到新機房的叢集,讓業務相對平滑遷移;
  • 讀寫分離場景,使用 Kafka 時,大多數情況都是讀多寫少,為保證資料寫入的穩定性,可以選擇建設 Kafka 讀寫分離叢集。

我們目前建設了 Kafka 多叢集架構,和 Flink 相關的主要有兩塊内容:

  1. Kafka 叢集間資料複制的程式運作在 Flink 叢集中;
  2. 改造了 Flink Kafka Connector,支援快速切換 Kafka 叢集。

2.1 整體架構

Apache Flink 在汽車之家的應用與實踐

先來看一下 Kafka 叢集間的資料複制,這是建設多叢集架構的基礎。我們是使用 MirrorMaker2 來實作資料複制的,我們把 MirrorMaker2 改造成普通的 Flink 作業,運作在 Flink 叢集中。

我們引入了 Route Service 和 Kafka SDK,實作用戶端快速切換通路的 Kafka 叢集。

用戶端需要依賴我們自己釋出的 Kafka SDK,并且配置中不再指定 bootstrap.servers 參數,而是通過設定 cluster.code 參數來聲明自己要通路的叢集。 SDK 會根據 cluster.code 參數,通路 Route Service 擷取叢集真正的位址,然後建立 Producer/Consumer 開始生産/消費資料。

SDK 會監聽路由規則的變化,當需要切換叢集時,隻需要在 Route Service 背景切換路由規則,SDK 發現路由叢集發生變化時,會重新開機 Producer/Consumer 執行個體,切換到新叢集。

如果是消費者發生了叢集切換,由于 Cluster1 和 Cluster2 中 Topic 的 offset 是不同的,需要通過 Offset Mapping Service 來擷取目前 Consumer Group 在 Cluster2 中的 offset,然後從這些 Offset 開始消費,實作相對平滑的叢集切換。

2.2 Kafka 叢集間的資料複制

我們使用 MirrorMaker2 來實作叢集間的資料複制,MirrorMaker2 是 Kafka 2.4 版本引入的,具體以下特性:

  • 自動識别新的 Topic 和 Partition;
  • 自動同步 Topic 配置:Topic 的配置會自動同步到目标叢集;
  • 自動同步 ACL;
  • 提供 Offset 的轉換工具:支援根據源叢集、目标叢集及 Group 資訊,擷取到該 Group 在目标叢集的中對應的 Offset 資訊;
  • 支援擴充黑白名單政策:可以靈活定制,動态生效。

clusters = primary, backup

primary.bootstrap.servers = vip1:9091

backup.bootstrap.servers = vip2:9092

primary->backup.enabled = true

backup->primary.enabled = true

這段配置完成 primary 到 backup 叢集的雙向資料複制,primary 叢集中的 topic1 中的資料會複制到 backup 叢集中的 primary.topic1 這個 Topic 中,目标叢集的Topic 命名規則是 sourceCluster.sourceTopicName,可以通過實作 ReplicationPolicy 接口來自定義命名政策。

Apache Flink 在汽車之家的應用與實踐

2.3 MirrorMaker2 相關的 Topic 介紹

  • 源叢集中的 Topic

    heartbeats:存儲心跳資料;

    mm2-offset-syncs.targetCluster.internal:存儲源叢集 (upstreamOffset) 和目标叢集的 offset(downstreamOffset) 對應關系。

  • 目标叢集中的 Topic

    mm2-configs.sourceCluster.internal:connect 架構自帶,用來存儲配置;

    mm2-offsets.sourceCluster.internal:connect 架構自帶,用來存儲 WorkerSourceTask 目前處理的 offset,mm2 場景下是為了目前資料同步到源叢集 topic partition 的哪一個 offset,這個更像是 Flink 的 checkpoint 概念;

    mm2-status.sourceCluster.internal:connect 架構自帶,用來存儲 connector 狀态。

上面三個用的都是 connect runtime 子產品中的 KafkaBasedLog 工具類,這個工具類可以讀寫一個 compact 模式的 topic 資料,此時 MirrorMaker2 把 topic 當作 KV 存儲使用。

sourceCluster.checkpoints.internal:記錄 sourceCluster consumer group 在目前叢集對應的 offset,mm2 會定期從源 kafka 叢集讀取 topic 對應的 consumer group 送出的 offset, 并寫到目标叢集的 sourceCluster.checkpoints.internal topic 中。

Apache Flink 在汽車之家的應用與實踐

2.4 MirrorMaker2 的部署

下面是 MirrorMaker2 作業運作的流程,在 AutoKafka 平台上建立一個資料複制作業,會調用 AutoStream 平台接口,相應的建立一個 MM2 類型的作業。啟動作業時,會調用 AutoStream 平台的接口把 MM2 作業送出到 Flink 叢集中運作。

Apache Flink 在汽車之家的應用與實踐

2.5 路由服務

Route Service 負責處理用戶端的路由請求,根據用戶端的資訊比對合适的路由規則,将最終路由結果,也就是叢集資訊傳回給用戶端。

支援基于叢集名稱、Topic、Group、ClientID 以及用戶端自定義的參數靈活配置路由規則。

下面的例子就是将 Flink 作業 ID 為 1234 的消費者,路由到 cluster_a1 叢集。

Apache Flink 在汽車之家的應用與實踐

2.6 Kafka SDK

使用原生的 kafka-clients 是無法和 Route Service 進行通信的,用戶端需要依賴我們提供的 Kafka SDK (汽車之家内部開發的 SDK) 能和 Route Service 通信,實作動态路由的效果。

Kafka SDK 實作了 Producer、Consumer 接口,本質是 kafka-clients 的代理,業務做較少的改動就可以引入 Kafka SDK。

業務依賴 Kafka SDK 後,Kafka SDK 會負責和 Route Service 通信,監聽路由變化,當發現路由的叢集發生變化時,會 close 目前的 Producer/Consumer,建立新的 Producer/Consumer,通路新的叢集。

此外 Kafka SDK 還負責将 Producer、Consumer 的 metric 統一上報到雲監控系統的 prometheus,通過檢視平台預先配置好的儀表盤,可以清晰的看到業務的生産、消費情況。

同時 SDK 會收集一些資訊,比如應用名稱、IP 端口、程序号等,這些資訊可以在 AutoKafka 平台上查到,友善我們和使用者共同定位問題。

Apache Flink 在汽車之家的應用與實踐

2.7 Offset Mapping Service

當 Consumer 的路由發生變化并切換叢集時,情況有一些複雜,因為目前 MirrorMaker2 是先把資料從源叢集消費出來,再寫入到目标叢集的,同一條資料可以確定寫入到目标 topic 的相同分區,但是 offset 和源叢集是不同的。

針對這種 offset 不一緻的情況,MirrorMaker2 會消費源叢集的 __consumer_offsets 資料,加上目标叢集對應的 offset,寫入到目标叢集的 sourceCluster.checkpoints.internal topic 中。

同時,源叢集的 mm2-offset-syncs.targetCluster.internal topic 記錄了源叢集和目标叢集 offset 的映射關系,結合這兩個 topic,我們建設了 Offset Mapping Service 來完成目标叢集的 offset 的轉換工作。

是以當 Consumer 需要切換叢集時,會調用 Offset Mapping Service 的接口,擷取到目标叢集的 offsets,然後主動 seek 到這些位置開始消費,這樣實作相對平滑的叢集切換工作。

Apache Flink 在汽車之家的應用與實踐

2.8 Flink 與 Kafka 多叢集架構的內建

由于 Kafka SDK 相容 kafka-clients 的用法,使用者隻需要更換依賴,然後設定 cluster.code、Flink.id 等參數即可。

當 Producer/Consumer 發生叢集切換後,由于建立了新的 Producer/Consumer 執行個體,Kafka 的 metric 資料沒有重新注冊,導緻 metric 資料無法正常上報。我們在 AbstractMetricGroup 類中增加了 unregister 方法,在監聽 Producer/Consumer 的切換事件時,重新注冊 kafka metrics 就可以了。

至此我們完成了 Flink 對 Kafka 多叢集架構的支援。

Apache Flink 在汽車之家的應用與實踐

四、後續規劃

Apache Flink 在汽車之家的應用與實踐
  1. 目前我們支援的資料統計類場景大多是基于流量資料或使用者行為資料的,這些場景對精确一次的語義要求不高,随着目前社群對 Change Log 支援的逐漸完善,同時我們的資料接入體系是支援精确一次語義的,并且正在做業務表全量接入到 Kafka 的功能,是以後續可以實作精确一次的資料統計,支援交易、線索、金融類的統計需求。
  2. 一些公司已經提出湖倉一體的理念,資料湖技術确實可以解決一些原有數倉架構的痛點,比如資料不支援更新操作,無法做到準實時的資料查詢。目前我們在做一些 Flink 和 Iceberg、Hudi 內建的一些嘗試,後續會在公司尋找場景并落地。

熱點推薦

Flink Forward Asia 2021 正式啟動!議題火熱征集中! Flink 1.14 新特性預覽 37 手遊基于 Flink CDC + Hudi 湖倉一體方案實踐

更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群

第一時間擷取最新技術文章和社群動态,請關注公衆号~

Apache Flink 在汽車之家的應用與實踐

活動推薦

阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:

99 元試用

實時計算Flink版

(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包 3 個月及以上還有 85 折優惠!

了解活動詳情:

https://www.aliyun.com/product/bigdata/sc
Apache Flink 在汽車之家的應用與實踐