導讀:今天主要和大家交流的是網易在資料湖 Iceberg 的一些思考與實踐。從網易在資料倉庫建設中遇到的痛點出發,介紹對資料湖 Iceberg 的探索以及實踐之路。
主要内容包括:
- 資料倉庫平台建設的痛點
- 資料湖 Iceberg 的核心原理
- 資料湖 Iceberg 社群現狀
- 網易資料湖 Iceberg 實踐之路
01 資料倉庫平台建設的痛點
痛點一:

我們淩晨一些大的離線任務經常會因為一些原因出現延遲,這種延遲會導緻核心報表的産出時間不穩定,有些時候會産出比較早,但是有時候就可能會産出比較晚,業務很難接受。
為什麼會出現這種現象的發生呢?目前來看大緻有這麼幾點要素:
- 任務本身要請求的資料量會特别大。通常來說一天原始的資料量可能在幾十TB。幾百個分區,甚至上千個分區,五萬+的檔案數這樣子。如果說全量讀取這些檔案的話,幾百個分區就會向 NameNode 發送幾百次請求,我們知道離線任務在淩晨運作的時候,NameNode 的壓力是非常大的。是以就很有可能出現 Namenode 響應很慢的情況,如果請求響應很慢就會導緻任務初始化時間很長。
- 任務本身的 ETL 效率是相對低效的,這個低效并不是說 Spark 引擎低效,而是說我們的存儲在這塊支援的不是特别的好。比如目前我們查一個分區的話是需要将所有檔案都掃描一遍然後進行分析,而實際上我可能隻對某些檔案感興趣。是以相對而言這個方案本身來說就是相對低效的。
- 這種大的離線任務一旦遇到磁盤壞盤或者機器當機,就需要重試,重試一次需要耗費很長的時間比如幾十分鐘。如果說重試一兩次的話這個延遲就會比較大了。
痛點二:
針對一些細瑣的一些問題而言的。這裡簡單列舉了三個場景來分析:
- 不可靠的更新操作。我們經常在 ETL 過程中執行一些 insert overwrite 之類的操作,這類操作會先把相應分區的資料删除,再把生成的檔案加載到分區中去。在我們移除檔案的時候,很多正在讀取這些檔案的任務就會發生異常,這就是不可靠的更新操作。
- 表 Schema 變更低效。目前我們在對表做一些加字段、更改分區的操作其實是非常低效的操作,我們需要把所有的原始資料讀出來,然後在重新寫回去。這樣就會非常耗時,并且低效。
- 資料可靠性缺乏保障。主要是我們對于分區的操作,我們會把分區的資訊分為兩個地方,HDFS 和 Metastore,分别存儲一份。在這種情況下,如果進行更新操作,就可能會出現一個更新成功而另一個更新失敗,會導緻資料不可靠。
痛點三:
基于 Lambda 架建構設的實時數倉存在較多的問題。如上圖的這個架構圖,第一條鍊路是基于 kafka 中轉的一條實時鍊路(延遲要求小于5分鐘),另一條是離線鍊路(延遲大于1小時),甚至有些公司會有第三條準實時鍊路(延遲要求5分鐘~一小時),甚至更複雜的場景。
- 兩條鍊路對應兩份資料,很多時候實時鍊路的處理結果和離線鍊路的處理結果對不上。
- Kafka 無法存儲海量資料, 無法基于目前的 OLAP 分析引擎高效查詢 Kafka 中的資料。
- Lambda 維護成本高。代碼、資料血緣、Schema 等都需要兩套。運維、監控等成本都非常高。
痛點四:
不能友好地支援高效更新場景。大資料的更新場景一般有兩種,一種是 CDC ( Change Data Capture) 的更新,尤其在電商的場景下,将 binlog 中的更新删除同步到 HDFS 上。另一種是延遲資料帶來的聚合後結果的更新。目前 HDFS 隻支援追加寫,不支援更新。是以業界很多公司引入了 Kudu。但是 Kudu 本身是有一些局限的,比如計算存儲沒有做到分離。這樣整個數倉系統中引入了 HDFS、Kafka 以及 Kudu,運維成本不可謂不大。
上面就是針對目前數倉所涉及到的四個痛點的大緻介紹,是以我們也是通過對資料湖的調研和實踐,希望能在這四個方面對數倉建設有所幫助。接下來重點講解下對資料湖的一些思考。
02 資料湖 Iceberg 核心原理
1. 資料湖開源産品調研
資料湖大緻是從19年開始慢慢火起來的,目前市面上核心的資料湖開源産品大緻有這麼幾個:
- DELTA LAKE,在17年的時候 DataBricks 就做了 DELTA LAKE 的商業版。主要想解決的也是基于 Lambda 架構帶來的存儲問題,它的初衷是希望通過一種存儲來把 Lambda 架構做成 kappa 架構。
- Hudi ( Uber 開源 ) 可以支援快速的更新以及增量的拉取操作。這是它最大的賣點之一。
- Iceberg 的初衷是想做标準的 Table Format 以及高效的 ETL。
上圖是來自 Flink 團體針對資料湖方案的一些調研對比,總體來看這些方案的基礎功能相對都還是比較完善的。我說的基礎功能主要包括:
- 高效 Table Schema 的變更,比如針對增減分區,增減字段等功能
- ACID 語義保證
- 同時支援流批讀寫,不會出現髒讀等現象
- 支援 OSS 這類廉價存儲
2. 當然還有一些不同點:
Hudi 的特性主要是支援快速的更新删除和增量拉取。
Iceberg 的特性主要是代碼抽象程度高,不綁定任何的 Engine。它暴露出來了非常核心的表層面的接口,可以非常友善的與 Spark/Flink 對接。然而 Delta 和 Hudi 基本上和 Spark 的耦合很重。如果想接入 Flink,相對比較難。
3. 我們選擇 Iceberg 的原因:
現在國内的實時數倉建設圍繞 Flink 的情況會多一點。是以能夠基于 Flink 擴充生态,是我們選擇 Iceberg 一個比較重要的點。
國内也有很多基于 Iceberg 開發的重要力量,比如騰訊團隊、Flink 官方團隊,他們的資料湖選型也是 Iceberg。目前他們在社群分别主導 update 以及 Flink 的生态對接。
4. 接下來我們重點介紹一下 Iceberg:
這是來自官方對于 Iceberg 的一段介紹,大緻就是 Iceberg 是一個開源的基于表格式的資料湖。關于 table format 再給大家詳細介紹下:
左側圖是一個抽象的資料處理系統,分别由 SQL 引擎、table format、檔案集合以及分布式檔案系統構成。右側是對應的現實中的元件,SQL 引擎比如 HiveServer、Impala、Spark 等等,table format 比如 Metastore 或者 Iceberg,檔案集合主要有 Parquet 檔案等,而分布式檔案系統就是 HDFS。
對于 table format,我認為主要包含4個層面的含義,分别是表 schema 定義(是否支援複雜資料類型),表中檔案的組織形式,表相關統計資訊、表索引資訊以及表的讀寫 API 實作。詳述如下:
- 表 schema 定義了一個表支援字段類型,比如 int、string、long 以及複雜資料類型等。
- 表中檔案組織形式最典型的是 Partition 模式,是 Range Partition 還是 Hash Partition。
- Metadata 資料統計資訊。
- 封裝了表的讀寫 API。上層引擎通過對應的API讀取或者寫入表中的資料。
和 Iceberg 差不多相當的一個元件是 Metastore。不過 Metastore 是一個服務,而 Iceberg 就是一個 jar 包。這裡就 Metastore 和 Iceberg 在表格式的4個方面分别進行一下對比介紹:
① 在 schema 層面上沒有任何差別:
都支援 int、string、bigint 等類型。
② partition 實作完全不同:
兩者在 partition 上有很大的不同:
metastore 中 partition 字段不能是表字段,因為 partition 字段本質上是一個目錄結構,不是使用者表中的一列資料。基于 metastore,使用者想定位到一個 partition 下的所有資料,首先需要在 metastore 中定位出該 partition 對應的所在目錄位置資訊,然後再到 HDFS 上執行list指令擷取到這個分區下的所有檔案,對這些檔案進行掃描得到這個 partition 下的所有資料。
Iceberg 中 partition 字段就是表中的一個字段。Iceberg 中每一張表都有一個對應的檔案中繼資料表,檔案中繼資料表中每條記錄表示一個檔案的相關資訊,這些資訊中有一個字段是 partition 字段,表示這個檔案所在的 partition。
很明顯,Iceberg 表根據 partition 定位檔案相比 metastore 少了一個步驟,就是根據目錄資訊去 HDFS 上執行 list 指令擷取分區下的檔案。
試想,對于一個二級分區的大表來說,一級分區是小時時間分區,二級分區是一個枚舉字段分區,假如每個一級分區下有30個二級分區,那麼這個表每天就會有24 * 30 = 720個分區。基于 Metastore 的 partition 方案,如果一個 SQL 想基于這個表掃描昨天一天的資料的話,就需要向 Namenode 下發720次 list 請求,如果掃描一周資料或者一個月資料,請求數就更是相當誇張。這樣,一方面會導緻 Namenode 壓力很大,一方面也會導緻 SQL 請求響應延遲很大。而基于 Iceberg 的 partition 方案,就完全沒有這個問題。
③ 表統計資訊實作粒度不同:
Metastore 中一張表的統計資訊是表/分區級别粒度的統計資訊,比如記錄一張表中某一列的記錄數量、平均長度、為 null 的記錄數量、最大值最小值等。
Iceberg 中統計資訊精确到檔案粒度,即每個資料檔案都會記錄所有列的記錄數量、平均長度、最大值最小值等。
很明顯,檔案粒度的統計資訊對于查詢中謂詞(即 where 條件)的過濾會更有效果。
④ 讀寫 API 實作不同:
metastore 模式下上層引擎寫好一批檔案,調用 metastore 的 add partition 接口将這些檔案添加到某個分區下。
Iceberg 模式下上層業務寫好一批檔案,調用 iceberg 的 commit 接口送出本次寫入形成一個新的 snapshot 快照。這種送出方式保證了表的 ACID 語義。同時基于 snapshot 快照送出可以實作增量拉取實作。
總結下 Iceberg 相對于 Metastore 的優勢:
- 新 partition 模式:避免了查詢時n次調用 namenode 的 list 方法,降低 namenode 壓力,提升查詢性能
- 新 metadata 模式:檔案級别列統計資訊可以用來根據 where 字段進行檔案過濾,很多場景下可以大大減少掃描檔案數,提升查詢性能
- 新 API 模式:存儲批流一體
-
- 流式寫入-增量拉取(基于 Iceberg 統一存儲模式可以同時滿足業務批量讀取以及增量訂閱需求)
-
- 支援批流同時讀寫同一張表,統一表schema,任務執行過程中不會出現 FileNotFoundException
Iceberg 的提升展現在:
03 資料湖 Iceberg 社群現狀
目前 Iceberg 主要支援的計算引擎包括 Spark 2.4.5、Spark 3.x、Flink 1.11 以及 Presto。同時,一些運維工作比如 snapshot 過期、小檔案合并、增量訂閱消費等功能都可以實作。
對于 Apache Flink 來說,Apache Iceberg 是 delta、iceberg、hudi 三個開源項目中最先完成 Flink 接入的開源項目。通過 Flink 來完成實時導入資料到 Iceberg 資料湖、通過 Flink batch 作業來讀取 Iceberg 資料,這兩個核心功能将在 Apache Iceberg 0.10.0 版本釋出(預計将在10月底釋出)。對 Flink+iceberg 內建工作感興趣的同學,可以參考 Apache Iceberg 社群的使用文檔。
https://github.com/apache/iceberg/blob/master/site/docs/flink.md按照目前的研發進度,我們預計實時寫入和讀取 CDC 資料這個功能,将在 Iceberg 的0.11.0版本釋出。
04 網易資料湖 Iceberg 實踐之路
Iceberg 針對目前的大數量的情況下,可以大大提升 ETL 任務執行的效率,這主要得益于新 Partition 模式下不再需要請求 NameNode 分區資訊,同時得益于檔案級别統計資訊模式下可以過濾很多不滿足條件的資料檔案。
目前 Iceberg 社群僅支援 Spark 2.4.5,我們在這個基礎上做了更多計算引擎的适配工作。主要包括如下:
- 內建 Hive。可以通過 Hive 建立和删除 iceberg 表,通過 HiveSQL 查詢 Iceberg 表中的資料。
- 內建 Impala。使用者可以通過 Impala 建立 iceberg 内表外表,并通過 Impala 查詢 Iceberg 表中的資料。目前該功能已經貢獻給 Impala 社群。
- 內建 Flink。已經實作了 Flink 到 Iceberg 的 sink 實作,業務可以消費 kafka 中的資料将結果寫入到 Iceberg 中。同時我們基于 Flink 引擎實作了小檔案異步合并的功能,這樣可以實作 Flink 一邊寫資料檔案,一邊執行小檔案的合并。基于 Iceberg 的小檔案合并通過 commit 的方式送出,不需要删除合并前的小檔案,也就不會引起讀取任務的任何異常。
作者介紹:
範欣欣,網易大資料技術專家。他與 Apache HBase PMC 成員、阿裡巴巴技術專家胡争合著的新書《HBase 原理與實踐》,這也是業界第一本專門闡述 HBase 原理的書。
更多 Flink 技術交流可加入 Apache Flink 社群釘釘交流群: