天天看點

看CarbonData如何用四招助力Apache Spark

摘要:CarbonData 在 Apache Spark 和存儲系統之間起到中介服務的作用,為 Spark 提供的4個重要功能。

本文分享自華為雲社群《Make Apache Spark better with CarbonData》,原文作者:大資料修行者 。

Spark 無疑是一個強大的處理引擎和一個用于更快處理的分布式叢集計算架構。不幸的是,Spark在一些方面也存在不足。如果我們将 Apache Spark 與 Apache CarbonData 結合使用,它可以克服這些不足:

1. 不支援 ACID transaction

2. 沒有quality enforcement

3. 小檔案問題

4. 低效的data skipping

什麼是ACID?

看CarbonData如何用四招助力Apache Spark

Spark和ACID

ATOMICITY

ACID 中的 A 代表原子性。基本上,這意味着要麼全部成功要麼全部失敗。是以,當您使用 spark data frame writer API時,它應該寫入完整資料或不寫入任何資料。讓我們快速檢視 Spark 文檔。根據 Spark 文檔:“It is important to realize that these save mode (overwrite) do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data.”

看CarbonData如何用四招助力Apache Spark

雖然整個情況看起來有點可怕,但實際上并沒有那麼糟糕。 Spark dataframe API在内部執行作業級送出,這有助于實作一定程度的原子性,這與使用 Hadoop 的 FileOutputCommitter 的“append”模式一起工作。但是,預設實作會帶來性能開銷,尤其是在使用雲存儲 [S3/OBS] 而不是 HDFS 時。

我們現在可以運作以下代碼來證明 Spark overwrite 不是原子的,它可能導緻資料損壞或資料丢失。代碼的第一部分模仿作業 1,它建立 100 條記錄并将其儲存到 ACIDpath 目錄中。代碼的第二部分模仿作業 2,它試圖覆寫現有資料,但在操作過程中引發異常。這兩項工作的結果是資料丢失。最後,我們丢失了第一個作業建立的資料。

看CarbonData如何用四招助力Apache Spark

由于異常,作業級送出不會發生,是以不會儲存新檔案。由于 Spark 删除了舊檔案,我們丢失了現有資料。 Spark data frame writer API 不是原子的,但它的行為類似于追加操作的原子操作。

CONSISTENCY

分布式系統通常建立在可用性較低的機器之上。一緻性是高可用性系統中的一個關鍵問題。如果所有節點同時看到并傳回相同的資料,則系統是一緻的。有幾種一緻性模型,分布式系統中最常用的一種是強一緻性、弱一緻性和最終一緻性。我們了解到,Spark writer API 的覆寫模式會先删除舊檔案,然後再放置新檔案。是以,在這兩種狀态之間,會有一段時間沒有資料可用。如果我們的工作失敗,那麼我們将丢失資料。這意味着這兩個操作之間沒有平滑的事務。這是 Spark 覆寫操作的典型原子性問題。而這個問題也破壞了資料的一緻性。 Spark API 缺乏一緻性。是以,Spark 寫模式不支援一緻性。

Isolation and Durability in Spark

隔離意味着分離。與任何其他并發操作分離。假設我們正在寫入尚未送出的資料集,并且有另一個并發程序正在讀取/寫入同一資料集。根據隔離特性,在這種情況下,不應影響他人。典型的資料庫會提供不同的隔離級别,例如已送出讀和可序列化。雖然 Spark 有任務級送出和作業級送出,但由于寫操作缺乏原子性,Spark 無法提供适當的隔離。

最後,Durability 是系統儲存的已送出狀态/資料,這樣即使在出現故障和系統重新開機的情況下,資料也能以正确的狀态使用。持久性由存儲層提供,在 Spark 應用程式的情況下,它是 HDFS 和 S3/OBS 的作用。然而,當 Spark 由于缺乏原子性而沒有提供适當的送出時,如果沒有适當的送出,我們就不能指望持久性。

如果我們仔細觀察,所有這些 ACID 屬性都是互相關聯的。由于缺乏原子性,我們失去了一緻性和隔離性,由于缺乏隔離性,我們失去了持久性。

Lack of Schema Enforcement

我們知道 Spark 在讀取時意味着 Schema。是以,當我們寫入任何資料時,如果有任何模式不比對,它不會抛出異常。讓我們試着用一個例子來了解這一點。讓我們有一個包含以下記錄的輸入數組。下面的程式将讀取 csv 并轉換為 DF

看CarbonData如何用四招助力Apache Spark
看CarbonData如何用四招助力Apache Spark

該程式從 CSV 檔案中讀取,以鑲木地闆格式寫回并顯示資料。輸出如下

看CarbonData如何用四招助力Apache Spark

讓我們讀取另一個輸入 CSV 檔案,其中“Cost”列具有十進制值而不是整數(如下所示),并對上述檔案執行追加操作

看CarbonData如何用四招助力Apache Spark

在這種情況下,我們的程式将讀取 CSV,毫無例外地寫入 Parquet 格式。當我們想要顯示/顯示資料幀時,我們的程式将抛出錯誤

看CarbonData如何用四招助力Apache Spark

這是因為 Spark 在寫操作期間從不驗證模式。 “Cost”列的模式在第一次加載期間被推斷為整數,在第二次寫入期間,它會毫無問題地附加雙精度型資料。當我們讀取附加資料并調用操作時,由于模式不相容,它會引發錯誤。

How to overcome the above drawbacks of Spark

如果我們使用 Apache Spark 将 CarbonData 作為存儲解決方案的附加層插入,則可以管理上述問題。

看CarbonData如何用四招助力Apache Spark

What is CarbonData

由于 Hadoop 分布式檔案系統 (HDFS) 和對象存儲類似于檔案系統,是以它們不是為提供事務支援而設計的。在分布式處理環境中實作事務是一個具有挑戰性的問題。例如,實施通常必須考慮鎖定對存儲系統的通路,這是以整體吞吐量性能為代價的。 Apache CarbonData 等存儲解決方案通過将這些事務語義和規則推送到檔案格式本身或中繼資料和檔案格式組合中,有效地解決了資料湖的這些 ACID 要求。 CarbonData 在 Apache Spark 和存儲系統之間起到中介服務的作用。現在,遵守 ACID 的責任由 CarbonData 負責。底層存儲系統可以是 HDFS、華為 OBS、Amazon S3 或 Azure Blob Storage 之類的任何東西。 CarbonData 為 Spark 提供的幾個重要功能是:

1. ACID transactions.

2. Schema enforcement/Schema validation.

3. Enables Updates, Deletes and Merge.

4. Automatic data indexing.

CarbonData in Apache Spark: ACID

看CarbonData如何用四招助力Apache Spark

在上面的代碼片段中,代碼的第一部分模仿了 job-1,建立了 100 條記錄并将其儲存到 ACIDpath 目錄中。代碼的第二部分模仿 job-2,它試圖覆寫現有資料但在操作過程中抛出異常。

這兩項工作的結果是資料丢失。最後,我們丢失了第一個作業建立的資料。現在讓我們更改如下所示的代碼以使用 CarbonData。

看CarbonData如何用四招助力Apache Spark

執行第一個作業并計算行數。正如預期的那樣,您将獲得 100 行。

如果您檢查資料目錄,您将看到一個snappy compressed CarbonData 檔案。該資料檔案以列式編碼格式儲存 100 行。您還将看到一個包含 tablestatus 檔案的中繼資料目錄。現在執行第二個作業。你對第二份工作有什麼期望?如前所述,這項工作應該嘗試做以下事情。

1. 删除之前的檔案。

2. 建立一個新檔案并開始寫入記錄。

3. 在作業中間抛出運作時異常。

由于異常,作業級别送出不會發生,我們丢失了上述觀察到的現有資料在沒有使用 CarbonData 的情況下。

但是現在如果你執行第二個作業,你仍然會得到一個異常。然後,計算行數。您得到的輸出為 100,并且不會丢失舊記錄。看起來 CarbonData 已經使 Overwrite 原子化了。我們看一下資料目錄,你會發現兩個 CarbonData 檔案。

看CarbonData如何用四招助力Apache Spark

一個檔案由第一個作業建立,另一個檔案由作業 2 建立。作業 2 沒有删除舊檔案,而是直接建立了一個新檔案并開始向新檔案寫入資料。這種方法使舊資料狀态保持不變。這就是為什麼我們沒有丢失舊資料的原因,因為舊檔案保持不變。新的不完整檔案也在那裡,但不讀取新的不完整檔案中的資料。該邏輯隐藏在中繼資料目錄中,并使用 tablestatus 檔案進行管理。第二個作業無法在 tablestatus 檔案中建立成功的條目,因為它在中間失敗了。讀取 API 不會讀取 tablestatus 檔案中的條目被标記為删除的檔案。

看CarbonData如何用四招助力Apache Spark

這一次,讓我們無一例外地編寫代碼邏輯,用50條記錄覆寫舊的100條記錄。

看CarbonData如何用四招助力Apache Spark

Now the record count shows 50. As expected. So, you have overwritten the older data set of 100 rows with a new data set of 50 rows.

看CarbonData如何用四招助力Apache Spark

CarbonData 将中繼資料管理引入 Apache Spark 并使 Spark 資料編寫器 API 具有原子性,進而解決了資料一緻性問題。一旦一緻性問題得到解決,CarbonData 将能夠提供更新和删除功能。

Spark With CarbonData: Schema Enforcement

讓我們考慮一個簡單的使用者場景,其中資料分多批到達以進行轉換。這裡為了簡單起見,讓我們假設隻有 2 批資料,第二批資料攜帶一些與第一批資料不同類型的列資料。

看CarbonData如何用四招助力Apache Spark

為了開始實驗,讓我們從表 1 中讀取資料,并使用和不使用 CarbonData 寫入資料。我們能夠使用“Overwrite”模式在有和沒有 CarbonData 的情況下寫入資料。

看CarbonData如何用四招助力Apache Spark

現在讓我們讀取具有成本列類型的雙類型資料的第二個表,然後将資料幀寫入 Parquet 和 CarbonTables(注意:_c2 是整數類型,我們正在嘗試附加雙類型資料)。使用 parquet 附加模式不比對的資料沒有問題,但是當程式嘗試将相同的資料附加到 CarbonData 表時,它會抛出錯誤:

看CarbonData如何用四招助力Apache Spark

是以,基于上述實驗,我們可以看到 CarbonData 在寫入底層存儲之前驗證模式,這意味着 CarbonData 在寫入時使用模式驗證。如果類型不相容,則 CarbonData 将取消交易。這将有助于在開始時跟蹤問題,而不是與好的資料混淆,然後嘗試找出根本原因。

英文連結:https://brijoobopanna.medium.com/making-apache-spark-better-with-carbondata-d37f98d235de

作者: Brijoobopanna

點選關注,第一時間了解華為雲新鮮技術~

繼續閱讀