本文整理自 Dell 科技集團進階軟體研發經理孫偉在 4 月 17 日 上海站 Flink Meetup 分享的《Iceberg 和對象存儲建構資料湖方案》,文章内容為:
- 資料湖和 Iceberg 簡介
- 對象存儲支撐 Iceberg 資料湖
- 示範方案
- 存儲優化的一些思考
GitHub 位址
https://github.com/apache/flink歡迎大家給 Flink 點贊送 star~
一、資料湖和 Iceberg 簡介
1. 資料湖生态

如上圖所示,對于一個成熟的資料湖生态而言:
- 首先我們認為它底下應具備海量存儲的能力,常見的有對象存儲,公有雲存儲以及 HDFS;
- 在這之上,也需要支援豐富的資料類型,包括非結構化的圖像視訊,半結構化的 CSV、XML、Log,以及結構化的資料庫表;
- 除此之外,需要高效統一的中繼資料管理,使得計算引擎可以友善地索引到各種類型資料來做分析。
- 最後,我們需要支援豐富的計算引擎,包括 Flink、Spark、Hive、Presto 等,進而友善對接企業中已有的一些應用架構。
2. 結構化資料在資料湖上的應用場景
上圖為一個典型的資料湖上的應用場景。
資料源上可能會有各種資料,不同的資料源和不同格式。比如說事物資料,日志,埋點資訊,IOT 等。這些資料經過一些流然後進入計算平台,這個時候它需要一個結構化的方案,把資料組織放到一個存儲平台上,然後供後端的資料應用進行實時或者定時的查詢。
這樣的資料庫方案它需要具備哪些特征呢?
- 首先,可以看到資料源的類型很多,是以需要支援比較豐富的資料 Schema 的組織;
- 其次,它在注入的過程中要支撐實時的資料查詢,是以需要 ACID 的保證,確定不會讀到一些還沒寫完的中間狀态的髒資料;
- 最後,例如日志這些有可能臨時需要改個格式,或者加一列。類似這種情況,需要避免像傳統的數倉一樣,可能要把所有的資料重新提出來寫一遍,重新注入到存儲;而是需要一個輕量級的解決方案來達成需求。
Iceberg 資料庫的定位就在于實作這樣的功能,于上對接計算平台,于下對接存儲平台。
3. 結構化資料在資料湖上的典型解決方案
對于資料結構化組織,典型的解決方式是用資料庫傳統的組織方式。
如上圖所示,上方有命名空間,資料庫表的隔離;中間有多個表,可以提供多種資料 Schema 的儲存;底下會放資料,表格需要提供 ACID 的特性,也支援局部 Schema 的演進。
4. Iceberg 表資料組織架構
- 快照 Metadata:表格 Schema、Partition、Partition spec、Manifest List 路徑、目前快照等。
- Manifest List:Manifest File 路徑及其 Partition,資料檔案統計資訊。
- Manifest File:Data File 路徑及其每列資料上下邊界。
- Data File:實際表内容資料,以 Parque,ORC,Avro 等格式組織。
接下來具體看一下 Iceberg 是如何将資料組織起來的。如上圖所示:
- 可以看到右邊從資料檔案開始,資料檔案存放表内容資料,一般支援 Parquet、ORC、Avro 等格式;
- 往上是 Manifest File,它會記錄底下資料檔案的路徑以及每列資料的上下邊界,友善過濾查詢檔案;
-
再往上是 Manifest List,它來連結底下多個 Manifest File,同時記錄 Manifest File 對應的分區範圍資訊,也是為了友善後續做過濾查詢;
Manifest List 其實已經表示了快照的資訊,它包含當下資料庫表所有的資料連結,也是 Iceberg 能夠支援 ACID 特性的關鍵保障。
有了快照,讀資料的時候隻能讀到快照所能引用到的資料,還在寫的資料不會被快照引用到,也就不會讀到髒資料。多個快照會共享以前的資料檔案,通過共享這些 Manifest File 來共享之前的資料。
-
再往上是快照中繼資料,記錄了目前或者曆史上表格 Scheme 的變化、分區的配置、所有快照 Manifest File 路徑、以及目前快照是哪一個。
同時,Iceberg 提供命名空間以及表格的抽象,做完整的資料組織管理。
5. Iceberg 寫入流程
上方為 Iceberg 資料寫入的流程圖,這裡用計算引擎 Flink 為例。
- 首先,Data Workers 會從中繼資料上讀出資料進行解析,然後把一條記錄交給 Iceberg 存儲;
- 與常見的資料庫一樣,Iceberg 也會有預定義的分區,那些記錄會寫入到各個不同的分區,形成一些新的檔案;
- Flink 有個 CheckPoint 機制,檔案到達以後,Flink 就會完成這一批檔案的寫入,然後生成這一批檔案的清單,接着交給 Commit Worker;
- Commit Worker 會讀出目前快照的資訊,然後與這一次生成的檔案清單進行合并,生成一個新的 Manifest List 以及後續中繼資料的表檔案的資訊,之後進行送出,成功以後就形成一個新的快照。
6. Iceberg 查詢流程
上方為 Iceberg 資料查詢流程。
- 首先是 Flink Table scan worker 做一個 scan,scan 的時候可以像樹一樣,從根開始,找到目前的快照或者使用者指定的一個曆史快照,然後從快照中拿出目前快照的 Manifest List 檔案,根據當時儲存的一些資訊,就可以過濾出滿足這次查詢條件的 Manifest File;
- 再往下經過 Manifest File 裡記錄的資訊,過濾出底下需要的 Data Files。這個檔案拿出來以後,再交給 Recorder reader workers,它從檔案中讀出滿足條件的 Recode,然後傳回給上層調用。
這裡可以看到一個特點,就是在整個資料的查詢過程中沒有用到任何 List,這是因為 Iceberg 完整地把它記錄好了,整個檔案的樹形結構不需要 List,都是直接單路徑指向的,是以查詢性能上沒有耗時 List 操作,這點對于對象存儲比較友好,因為對象存儲在 List 上面是一個比較耗資源的操作。
7. Iceberg Catalog 功能一覽
Iceberg 提供 Catalog 用良好的抽象來對接資料存儲和中繼資料管理。任何一個存儲,隻要實作 Iceberg 的 Catalog 抽象,就有機會跟 Iceberg 對接,用來組織接入上面的資料湖方案。
如上圖所示,Catalog 主要提供幾方面的抽象。
- 它可以對 Iceberg 定義一系列角色檔案;
- 它的 File IO 都是可以定制,包括讀寫和删除;
- 它的命名空間和表的操作 (也可稱為中繼資料操作),也可以定制;
- 包括表的讀取 / 掃描,表的送出,都可以用 Catalog 來定制。
這樣可以提供靈活的操作空間,友善對接各種底下的存儲。
二、對象存儲支撐 Iceberg 資料湖
1. 目前 Iceberg Catalog 實作
目前社群裡面已經有的 Iceberg Catalog 實作可分為兩個部分,一是資料 IO 部分,二是中繼資料管理部分。
如上圖所示,其實缺少面向私有對象存儲的 Catalog 實作,S3A 理論上可以接對象存儲,但它用的是檔案系統語義,不是天然的對象存儲語義,模拟這些檔案操作會有額外的開銷,而我們想實作的是把資料和中繼資料管理全部都交給一個對象存儲,而不是分離的設計。
2. 對象存儲和 HDFS 的比較
這裡存在一個問題,在有 HDFS 的情況下,為什麼還要用對象存儲?
如下所示,我們從各個角度将對象存儲和 HDFS 進行對比。
總結下來,我們認為:
- 對象存儲在叢集擴充性,小檔案友好,多站點部署和低存儲開銷上更加有優勢;
- HDFS 的好處就是提供追加上傳和原子性 rename,這兩個優勢正是 Iceberg 需要的。
下面對兩個存儲各自的優勢進行簡單闡述。
1)比較之:叢集擴充性
- HDFS 架構是用單個 Name Node 儲存所有中繼資料,這就決定了它單節點的能力有限,是以在中繼資料方面沒有橫向擴充能力。
- 對象存儲一般采用哈希方式,把中繼資料分隔成各個塊,把這個塊交給不同 Node 上面的服務來進行管理,天然地它中繼資料的上限會更高,甚至在極端情況下可以進行 rehash,把這個塊切得更細,交給更多的 Node 來管理中繼資料,達到擴充能力。
2)比較之:小檔案友好
如今在大資料應用中,小檔案越來越常見,并逐漸成為一個痛點。
-
HDFS 基于架構的限制,小檔案存儲受限于 Name Node 記憶體等資源,雖然 HDFS 提供了 Archive 的方法來合并小檔案,減少對 Name Node 的壓力,但這需要額外增加複雜度,不是原生的。
同樣,小檔案的 TPS 也是受限于 Name Node 的處理能力,因為它隻有單個 Name Node。對象存儲的中繼資料是分布式存儲和管理,流量可以很好地分布到各個 Node 上,這樣單節點就可以存儲海量的小檔案。
- 目前,很多對象存儲提供多媒體,分層加速,可以提升小檔案的性能。
3)比較之:多站點部署
- 對象存儲支援多站點部署
- 全局命名空間
- 支援豐富的規則配置
- 對象存儲的多站點部署能力适用于兩地三中心多活的架構,而 HDFS 沒有原生的多站點部署能力。雖然目前看到一些商業版本給 HDFS 增加了多站點負責資料的能力,但由于它的兩個系統可能是獨立的,是以并不能支撐真正的全局命名空間下多活的能力。
4)比較之:低存儲開銷
- 對于存儲系統來說,為了适應随機的硬體故障,它一般會有副本機制來保護資料。
- 常見的如三副本,把資料存三份,然後分開儲存到三個 Node 上面,存儲開銷是三倍,但是它可以同時容忍兩個副本遇到故障,保證資料不會丢失。
- 另一種是 Erasure Coding,通常稱為 EC。以 10+2 舉例,它把資料切成 10 個資料塊,然後用算法算出兩個代碼塊,一共 12 個塊。接着分布到四個節點上,存儲開銷是 1.2 倍。它同樣可以容忍同時出現兩個塊故障,這種情況可以用剩餘的 10 個塊算出所有的資料,這樣減少存儲開銷,同時達到故障容忍程度。
-
HDFS 預設使用三副本機制,新的 HDFS 版本上已經支援 EC 的能力。經過研究,它是基于檔案做 EC,是以它對小檔案有天然的劣勢。因為如果小檔案的大小小于分塊要求的大小時,它的開銷就會比原定的開銷更大,因為兩個代碼塊這邊是不能省的。在極端情況下,如果它的大小等同于單個代碼塊的大小,它就已經等同于三副本了。
同時,HDFS 一旦 EC,就不能再支援 append、hflush、hsync 等操作,這會極大地影響 EC 能夠使用的場景。對象存儲原生支援 EC,對于小檔案的話,它内部會把小檔案合并成一個大的塊來做 EC,這樣確定資料開銷方面始終是恒定的,基于預先配置的政策。
3. 對象存儲的挑戰:資料的追加上傳
在 S3 協定中,對象在上傳時需要提供大小。
以 S3 标準為例,對象存儲跟 Iceberg 對接時,S3 标準對象存儲不支援資料追加上傳的接口,協定要求上傳檔案時提供檔案大小。是以在這種情況下,對于這種流式的 File IO 傳入,其實不太友好。
1)解決方案一:S3 Catalog 資料追加上傳 - 小檔案快取區域/記憶體
對于一些小檔案,流式傳入的時候就寫入到本地緩存 / 記憶體,等它完全寫完後,再把它上傳到對象存儲裡。
2)解決方法二:S3 Catalog 資料追加上傳 - MPU 分段上傳大檔案
對于大檔案,會用到 S3 标準定義的 MPU 分段上傳。
它一般分為幾個步驟:
- 第一步先建立初始化的 MPU,拿到一個 Upload ID,然後給每一個分段賦予一個 Upload ID 以及一個編号,這些分塊就可以并行上傳;
- 在上傳完成以後,還需要一步 Complete 操作,這樣相當于通知系統,它會把基于同一個 Upload ID 以及所有的編号,從小到大排起來,組成一個大檔案;
- 把機制運用到資料追加上傳場景,正常實作就是寫入一個檔案,把檔案緩存到本地,當達到分塊要求大小時,就可以把它進行初始化 MPU,把它的一個分塊開始上傳。後面每一個分塊也是一樣的操作,直到最後一個分塊上傳完,最後再調用一個完成操作來完成上傳。
MPU 有優點也有缺點:
- 缺點是 MPU 的分片數量有上限,S3 标準裡可能隻有 1 萬個分片。想支援大檔案的話,這個分塊就不能太小,是以對于小于分塊的檔案,依然是要利用前面一種方法進行緩存上傳;
- MPU 的優點在于并行上傳的能力。假設做一個異步的上傳,檔案在緩存達到以後,不用等上一個分塊上傳成功,就可以繼續緩存下一個,之後開始上傳。目前面注入的速度足夠快時,後端的異步送出就變成了并行操作。利用這個機制,它可以提供比單條流上傳速度更快的上傳能力。
4. 對象存儲的挑戰:原子送出
下一個問題是對象存儲的原子送出問題。
前面提到在資料注入的過程中,最後的送出其實分為幾步,是一個線性事務。首先它要讀到目前的快照版本,然後把這一次的檔案清單合并,接着送出自己新的版本。這個操作類似于我們程式設計裡常見的 “i=i+1”,它不是一個原子操作,對象存儲的标準裡也沒有提供這個能力。
上圖是并發送出元資訊的場景。
- 這裡 Commit Worker 1 拿到了 v006 版本,然後合并自己的檔案,送出 v007 成功。
- 此時還有另一個 Commit Worker 2,它也拿到了 v006,然後合并出來,且也要提供 v007。此時我們需要一個機制告訴它 v007 已經沖突,不能上傳,然後讓它自己去 Retry。Retry 以後取出新的 v007 合并,然後送出給 v008。
這是一個典型的沖突場景,這裡需要一套機制,因為如果它不能檢測到自己是一個沖突的情況的話,再送出 v007 會把上面 v007 覆寫,會導緻上一次送出的所有資料都丢失。
如上圖所示,我們可以使用一個分布式鎖的機制來解決上述問題。
- 首先,Commit Worker 1 拿到 v006,然後合并檔案,在送出之前先要擷取這一把鎖,拿到鎖以後判斷目前快照版本。如果是 v006,則 v007 能送出成功,送出成功以後再解鎖。
- 同樣,Commit Worker 2 拿到 v006 合并以後,它一開始拿不到鎖,要等 Commit Worker 1 釋放掉這個鎖以後才能拿到。等拿到鎖再去檢查的時候,會發現目前版本已經是 v007,與自己的 v007 有沖突,是以這個操作一定會失敗,然後它就會進行 Retry。
這是通過鎖來解決并發送出的問題。
5. Dell EMC ECS 的資料追加上傳
基于 S3 标準的對象存儲和 Iceberg 問題的解決方案存在一些問題,例如性能損失,或者需要額外部署鎖服務等。
Dell EMC ECS 也是個對象存儲,基于這個問題有不一樣的解答,它基于 S3 的标準協定有一些擴充,可以支援資料的追加上傳。
它的追加上傳與 MPU 不同的地方在于,它沒有分塊大小的限制。分塊可以設定得比較小一點,上傳後内部就會串聯起來,依然是一個有效的檔案。
追加上傳和 MPU 這兩者可以在一定程度上适應不同的場景。
MPU 有加速上傳能力,追加上傳在速度在不是很快的情況下,性能也是足夠用,而且它沒有 MPU 的初始化和合并的操作,是以兩者在性能上能夠适應不同場景進行使用。
6. Dell EMC ECS 在并發送出下的解決方案
ECS 對象存儲還提供了一個 If-Match 的語義,在微軟的雲存儲以及谷歌的雲存儲上都有這樣一個接口能力。
- If-Match 就是說在 Commit Worker 1 送出拿到 v006 的時候,同時拿到了檔案的 eTag。送出的時候會帶上 eTag,系統需要判斷要覆寫檔案的 eTag 跟目前這個檔案真實 eTag 是否相同,如果相同就允許這次覆寫操作,那麼 v007 就能送出成功;
- 另一種情況,是 Commit Worker 2 也拿到了 v006 的 eTag,然後上傳的時候發現拿到 eTag 跟目前系統裡檔案不同,則會傳回失敗,然後觸發 Retry。
這個實作是和鎖機制一樣的效果,不需要外部再重新部署鎖服務來保證原子送出的問題。
7. S3 Catalog - 統一存儲的資料
回顧一下,上方我們解決了檔案 IO 中上傳資料 IO 的問題,和解決了中繼資料表格的原子送出問題。
解決這些問題以後,就可以把資料以及中繼資料的管理全部都交到對象存儲,不再需要額外部署中繼資料服務,做到真正統一資料存儲的概念。
三、示範方案
如上所示,示範方案用到了 Pravega,可以簡單了解為 Kafka 的一個替代,但是對它進行了性能優化。
在這個例子中,我們會把資料注入 Pravega 的流裡,然後 Flink 會從 Pravega 中讀出資料進行解析,然後存入 Iceberg 組織。Iceberg 利用 ECS Catalog,直接對接對象存儲,這裡面沒有任何其他部署,最後用 Flink 讀出這個資料。
四、存儲優化的一些思考
上圖為目前 Iceberg 支援的資料組織結構,可以看到它直接 Parquet 檔案存在存儲裡面。
我們的想法是如果這個湖跟中繼資料的湖其實是一個湖,有沒有可能生成的 Parquet 檔案跟源檔案存在很大的資料備援度,是否可以減少備援資訊的存儲。
比如最極端的情況,源檔案的一個資訊記錄在 Iceberg 中,就不存這個 Parquet 資料檔案。當要查詢的時候,通過定制 File IO,讓它根據原檔案在記憶體中實時生成一個類似于 Parquet 的格式,送出給上層應用查詢,就可以達到一樣的效果。
但是這種方式,局限于對存儲的成本有很高的要求,但是對查詢的性能要求卻不高的情況。能夠實作這個也要基于 Iceberg 好的抽象,因為它的檔案中繼資料和 File IO 都是抽象出來的,可以把源檔案拆進去,讓它以為這是一個 Parquet 檔案。
進一步思考,能否優化查詢性能,同時節省存儲空間。
比如預計算一下,把源檔案某些常用的列拿出來,然後統計資訊到 Iceberg 中,在讀的時候利用源檔案和雲計算的檔案,可以很快查詢到資訊,同時又節省了不常用的資料列存儲空間。
這是比較初步的想法,如果能夠實作,則用 Iceberg 不僅可以索引結構化的 Parquet 檔案格式,甚至可以索引一些半結構化、結構化的資料,通過臨時的計算來解決上層的查詢任務,變成一個更完整的 Data Catalog。
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99元試用
實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc