在資料中台建設過程中,一個典型的資料內建場景是将 MQ (Message Queue,例如 Kafka、RocketMQ 等)的資料導入到 Hive 中,以供下遊數倉建設以及名額統計。由于 MQ-Hive 是數倉建設第一層,是以對資料的準确性以及實時性要求比較高。
本文主要圍繞 MQ-Hive 場景,針對目前位元組跳動内已有解決方案的痛點,提出基于 Flink 的實時解決方案,并介紹新方案在位元組跳動内部的使用現狀。
已有方案及痛點
位元組跳動内已有解決方案如下圖所示,主要分了兩個步驟:
- 通過 Dump 服務将 MQ 的資料寫入到 HDFS 檔案
- 再通過 Batch ETL 将 HDFS 資料導入到 Hive 中,并添加 Hive 分區

痛點
- 任務鍊較長,原始資料需要經過多次轉換最終才能進入 Hive
- 實時性比較差,Dump Service、Batch ETL 延遲都會導緻最終資料産出延遲
- 存儲、計算開銷大,MQ 資料重複存儲和計算
- 基于原生 Java 打造,資料流量持續增長後,存在單點故障和機器負載不均衡等問題
- 運維成本較高,架構上無法複用公司内 Hadoop/Flink/Yarn 等現有基礎設施
- 不支援異地容災
基于 Flink 實時解決方案
優勢
針對目前公司傳統解決方案的痛點,我們提出基于 Flink 的實時解決方案,将 MQ 的資料實時寫入到 Hive,并支援事件時間以及 Exactly Once 語義。相比老方案,新方案優勢如下所示:
- 基于流式引擎 Flink 開發,支援 Exactly Once 語義
- 實時性更高,MQ 資料直接進入 Hive,無中間計算環節
- 減少中間存儲,整個流程資料隻會落地一次
- 支撐 Yarn 部署模式,友善使用者遷移
- 資源管理彈性,友善擴容以及運維
- 支援雙機房容災
整體架構
整體架構如下圖所示,主要包括 DTS(Data Transmission Service) Source、DTS Core、DTS Sink 三大子產品,具體功能如下:
- DTS Source 接入不同 MQ 資料源,支援 Kafka、RocketMQ 等
- DTS Sink 将資料輸出到目标資料源,支援 HDFS、Hive 等
- DTS Core 貫穿整個資料同步流程,通過 Source 讀取源端資料,經過 DTS Framework 處理,最後通過 Sink 将資料輸出到目标端。
- DTS Framework 內建類型系統、檔案切分、Exactly Once、任務資訊采集、事件時間、髒資料收集等核心功能
- 支援 Yarn 部署模式,資源排程、管理比較彈性
(DTS Dump架構圖)
Exactly Once
Flink 架構通過 Checkpoint 機制,能夠提供 Exactly Once 或者 At Least Once 語義。為了實作 MQ-Hive 全鍊路支援 Exactly-once 語義,還需要 MQ Source、Hive Sink 端支援 Exactly Once 語義。本文通過 Checkpoint + 2PC 協定實作,具體過程如下:
- 資料寫入時,Source 端從上遊 MQ 拉取資料并發送到 Sink 端;Sink 端将資料寫入到臨時目錄中
- Checkpoint Snapshot 階段,Source 端将 MQ Offset 儲存到 State 中;Sink 端關閉寫入的檔案句柄,并儲存目前 Checkpoint ID 到 State 中;
- Checkpoint Complete 階段,Source 端 Commit MQ Offset;Sink 端将臨時目錄中的資料移動到正式目錄下
- Checkpoint Recover 階段,加載最新一次成功的 Checkpoint 目錄并恢複 State 資訊,其中 Source 端将 State 中儲存的 MQ Offset 作為起始位置;Sink 端恢複最新一次成功的 Checkpoint ID,并将臨時目錄的資料移動到正式目錄下
■ 實作優化
在實際使用場景中,特别是大并發場景下,HDFS 寫入延遲容易有毛刺,因為個别 Task Snapshot 逾時或者失敗,導緻整個 Checkpoint 失敗的問題會比較明顯。是以針對 Checkpoint 失敗,提高系統的容錯性以及穩定性就比較重要。
這裡充分利用 Checkpoint ID 嚴格單調遞增的特性,每一次做 Checkpoint 時,目前 Checkpoint ID 一定比以前大,是以在 Checkpoint Complete 階段,可以送出小于等于目前 Checkpoint ID 的臨時資料。具體優化政策如下:
- Sink 端臨時目錄為{dump_path}/{next_cp_id},這裡 next_cp_id 的定義是目前最新的 cp_id + 1
- Checkpoint Snapshot 階段,Sink 端儲存目前最新 cp_id 到 State,同時更新 next_cp_id 為 cp_id + 1
- Checkpoint Complete 階段,Sink 端将臨時目錄中所有小于等于目前 cp_id 的資料移動到正式目錄下
- Checkpoint Recover 階段,Sink 端恢複最新一次成功的 cp_id,并将臨時目錄中小于等于目前 cp_id 的資料移動到正式目錄下
類型系統
由于不同資料源支援的資料類型不一樣,為了解決不同資料源間的資料同步以及不同類型轉換相容的問題,我們支援了 DTS 類型系統,DTS 類型可細化為基礎類型和複合類型,其中複合類型支援類型嵌套,具體轉換流程如下:
- 在 Source 端,将源資料類型,統一轉成系統内部的 DTS 類型
- 在 Sink 端,将系統内部的 DTS 類型轉換成目标資料源類型
- 其中 DTS 類型系統支援不同類型間的互相轉換,比如 String 類型與 Date 類型的互相轉換
(DTS Dump架構圖)
Rolling Policy
Sink 端是并發寫入,每個 Task 處理的流量不一樣,為了避免生成太多的小檔案或者生成的檔案過大,需要支援自定義檔案切分政策,以控制單個檔案的大小。目前支援三種檔案切分政策:檔案大小、檔案最長未更新時間、Checkpoint。
■ 優化政策
Hive 支援 Parquet、Orc、Text 等多種存儲格式,不同的存儲格式資料寫入過程不太一樣,具體可以分為兩大類:
- RowFormat:基于單條寫入,支援按照 Offset 進行 HDFS Truncate 操作,例如 Text 格式
- BulkFormat:基于 Block 寫入,不支援 HDFS Truncate 操作,例如 Parquet、ORC 格式
為了保障 Exactly Once 語義,并同時支援 Parquet、Orc、Text 等多種格式,在每次 Checkpoint 時,強制做檔案切分,保證所有寫入的檔案都是完整的,Checkpoint 恢複時不用做 Truncate 操作。
容錯處理
理想情況下流式任務會一直運作不需要重新開機,但實際不可避免會遇到以下幾個場景:
- Flink 計算引擎更新,需要重新開機任務
- 上遊資料增加,需要調整任務并發度
- Task Failover
■ 并發度調整
目前 Flink 原生支援 State Rescale。具體實作中,在 Task 做 Checkpoint Snapshot 時,将 MQ Offset 儲存到 ListState 中;Job 重新開機後,Job Master 會根據 Operator 并發度,将 ListState 平均配置設定到各個 Task 上。
■ Task Failover
由于網絡抖動、寫入逾時等外部因素的影響,Task 不可避免會出現寫入失敗,如何快速、準确的做 Task Failover 就顯得比較重要。目前 Flink 原生支援多種 Task Failover 政策,本文使用 Region Failover 政策,将失敗 Task 所在 Region 的所有 Task 都重新開機。
異地容災
■ 背景
大資料時代,資料的準确性和實時性顯得尤為重要。本文提供多機房部署及異地容災解決方案,當主機房因為斷網、斷電、地震、火災等原因暫時無法對外提供服務時,能快速将服務切換到備災機房,并同時保障 Exactly Once 語義。
■ 容災元件
整體解決方案需要多個容災元件一起配合實作,容災元件如下圖所示,主要包括 MQ、YARN、HDFS,具體如下:
- MQ 需要支援多機房部署,當主機房故障時,能将 Leader 切換到備機房,以供下遊消費
- Yarn 叢集在主機房、備機房都有部署,以便 Flink Job 遷移
- 下遊 HDFS 需要支援多機房部署,當主機房故障時,能将 Master 切換到備機房
- Flink Job 運作在 Yarn 上,同時任務 State Backend 儲存到 HDFS,通過 HDFS 的多機房支援保障 State Backend 的多機房
■ 容災過程
整體容災過程如下所示:
- 正常情況下,MQ Leader 以及 HDFS Master 部署在主機房,并将資料同步到備機房。同時 Flink Job 運作在主機房,并将任務 State 寫入到 HDFS 中,注意 State 也是多機房部署模式
- 災難情況下,MQ Leader 以及 HDFS Master 從主機房遷移到備災機房,同時 Flink Job 也遷移到備災機房,并通過 State 恢複災難前的 Offset 資訊,以提供 Exactly Once 語義
事件時間歸檔
在數倉建設中,處理時間(Process Time)和事件時間(Event Time)的處理邏輯不太一樣,對于處理時間會将資料寫到目前系統時間所對應的時間分區下;對于事件時間,則是根據資料的生産時間将資料寫到對應時間分區下,本文也簡稱為歸檔。
在實際場景中,不可避免會遇到各種上下遊故障,并在持續一段時間後恢複,如果采用 Process Time 的處理政策,則事故期間的資料會寫入到恢複後的時間分區下,最終導緻分區空洞或者資料漂移的問題;如果采用歸檔的政策,會按照事件時間寫入,則沒有此類問題。
由于上遊資料事件時間會存在亂序,同時 Hive 分區生成後就不應該再繼續寫入,是以實際寫入過程中不可能做到無限歸檔,隻能在一定時間範圍内歸檔。歸檔的難點在于如何确定全局最小歸檔時間以及如何容忍一定的亂序。
■ 全局最小歸檔時間
Source 端是并發讀取,并且一個 Task 可能同時讀取多個 MQ Partition 的資料,對于 MQ 的每一個 Parititon 會儲存目前分區歸檔時間,取分區中最小值作為 Task 的最小歸檔時間,最終取 Task 中最小值,作為全局最小歸檔時間。
■ 亂序處理
為了支援亂序的場景,會支援一個歸檔區間的設定,其中 Global Min Watermark 為全局最小歸檔時間,Partition Watermark 為分區目前歸檔時間,Partition Min Watermark 為分區最小歸檔時間,隻有當事件時間滿足以下條件時,才會進行歸檔:
- 事件時間大于全局最小歸檔時間
- 事件時間大于分區最小歸檔時間
Hive 分區生成
■ 原理
Hive 分區生成的難點在于如何确定分區的資料是否就緒以及如何添加分區。由于 Sink 端是并發寫入,同時會有多個 Task 寫同一個分區資料,是以隻有當所有 Task 分區資料寫入完成,才能認為分區資料是就緒,本文解決思路如下:
- 在 Sink 端,對于每個 Task 儲存目前最小處理時間,需要滿足單調遞增的特性
- 在 Checkpoint Complete 時,Task 上報最小處理時間到 JM 端
- JM 拿到所有 Task 的最小處理時間後,可以得到全局最小處理時間,并以此作為 Hive 分區的最小就緒時間
- 當最小就緒時間更新時,可判斷是否添加 Hive 分區
■ 動态分區
動态分區是根據上遊輸入資料的值,确定資料寫到哪個分區目錄,而不是寫到固定分區目錄,例如 date={date}/hour={hour}/app={app}的場景,根據分區時間以及 app 字段的值确定最終的分區目錄,以實作每個小時内,相同的 app 資料在同一個分區下。
在靜态分區場景下,每個 Task 每次隻會寫入一個分區檔案,但在動态分區場景下,每個 Task 可能同時寫入多個分區檔案。對于 Parque 格式的寫入,會先将資料寫到做本地緩存,然後批次寫入到 Hive,當 Task 同時處理的檔案句柄過多時,容易出現 OOM。為了防止單 Task OOM,會周期性對檔案句柄做探活檢測,及時釋放長時間沒有寫入的檔案句柄。
Messenger
Messenger 子產品用于采集 Job 運作狀态資訊,以便衡量 Job 健康度以及大盤名額建設。
■ 元資訊采集
元資訊采集的原理如下所示,在 Sink 端通過 Messenger 采集 Task 的核心名額,例如流量、QPS、髒資料、寫入 Latency、事件時間寫入效果等,并通過 Messenger Collector 彙總。其中髒資料需要輸出到外部存儲中,任務運作名額輸出到 Grafana,用于大盤名額展示。
■ 髒資料收集
資料內建場景下,不可避免會遇到髒資料,例如類型配置錯誤、字段溢出、類型轉換不相容等場景。對于流式任務來說,由于任務會一直運作,是以需要能夠實時統計髒資料流量,并且将髒資料儲存到外部存儲中以供排查,同時在運作日志中采樣輸出。
■ 大盤監控
大盤名額覆寫全局名額以及單個 Job 名額,包括寫入成功流量和 QPS、寫入 Latency、寫入失敗流量和 QPS、歸檔效果統計等,具體如下圖所示:
未來規劃
基于 Flink 實時解決方案目前已在公司上線和推廣,未來主要關注以下幾個方面:
- 資料內建功能增強,支援更多資料源的接入,支援使用者自定義資料轉換邏輯等
- Data Lake 打通,支援 CDC 資料實時導入
- 流批架構統一,支援全量、增量場景資料內建
- 架構更新,支援更多部署環境,比如 K8S
- 服務化完善,降低使用者接入成本
總結
随着位元組跳動業務産品逐漸多元化快速發展,位元組跳動内部一站式大資料開發平台功能也越來越豐富,并提供離線、實時、全量、增量場景下全域資料內建解決方案,從最初的幾百個任務規模增長到數萬級規模,日處理資料達到 PB 級,其中基于 Flink 實時解決方案目前已在公司内部大力推廣和使用,并逐漸替換老的 MQ-Hive 鍊路。
參考文獻:
- Real-time Exactly-once ETL with Apache Flink http://shzhangji.com/blog/2018/12/23/real-time-exactly-once-etl-with-apache-flink/
- Implementing the Two-Phase Commit Operator in Flink https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
- A Deep Dive into Rescalable State in Apache Flink https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
- Data Streaming Fault Tolerance https://ci.apache.org/projects/flink/flink-docs-release-1.9/internals/stream_checkpointing.html