天天看點

Dive into Delta Lake | Delta Lake 嘗鮮

Dive into Delta Lake | Delta Lake 嘗鮮
Dive into Delta Lake | Delta Lake 嘗鮮
大資料技術與架構
Dive into Delta Lake | Delta Lake 嘗鮮
Dive into Delta Lake | Delta Lake 嘗鮮
大資料真好玩
Dive into Delta Lake | Delta Lake 嘗鮮
Delta Lake 是一個存儲層,為 Apache Spark 和大資料 workloads 提供 ACID 事務能力,其通過寫和快照隔離之間的樂觀并發控制(optimistic concurrency control),在寫入資料期間提供一緻性的讀取,進而為建構在 HDFS 和雲存儲上的資料湖(data lakes)帶來可靠性。Delta Lake 還提供内置資料版本控制,以便輕松復原。

為什麼需要Delta Lake

現在很多公司内部資料架構中都存在資料湖,資料湖是一種大型資料存儲庫和處理引擎。它能夠存儲大量各種類型的資料,擁有強大的資訊處理能力和處理幾乎無限的并發任務或工作的能力,最早由 Pentaho 首席技術官詹姆斯迪克森在2011年的時候提出。雖然資料湖在資料範圍方面邁出了一大步,但是也面臨了很多問題,主要概括如下:

  • 資料湖的讀寫是不可靠的。資料工程師經常遇到不安全寫入資料湖的問題,導緻讀者在寫入期間看到垃圾資料。他們必須建構方法以確定讀者在寫入期間始終看到一緻的資料。
  • 資料湖中的資料品質很低。将非結構化資料轉儲到資料湖中是非常容易的。但這是以資料品質為代價的。沒有任何驗證模式和資料的機制,導緻資料湖的資料品質很差。是以,努力挖掘這些資料的分析項目也會失敗。
  • 随着資料的增加,處理性能很差。随着資料湖中存儲的資料量增加,檔案和目錄的數量也會增加。處理資料的作業和查詢引擎在進行中繼資料操作上花費大量時間。在有流作業的情況下,這個問題更加明顯。
  • 資料湖中資料的更新非常困難。工程師需要建構複雜的管道來讀取整個分區或表,修改資料并将其寫回。這種模式效率低,并且難以維護。

由于存在這些挑戰,許多大資料項目無法實作其願景,有時甚至完全失敗。我們需要一種解決方案,使資料從業者能夠利用他們現有的資料湖,同時確定資料品質。這就是 Delta Lake 産生的背景。

Delta Lake特性

Delta Lake 很好地解決了上述問題,以簡化我們建構資料湖的方式。

Dive into Delta Lake | Delta Lake 嘗鮮

支援ACID事務

Delta Lake 在多并發寫入之間提供 ACID 事務保證。每次寫入都是一個事務,并且在事務日志中記錄了寫入的序列順序。

事務日志跟蹤檔案級别的寫入并使用樂觀并發控制,這非常适合資料湖,因為多次寫入/修改相同的檔案很少發生。在存在沖突的情況下,Delta Lake 會抛出并發修改異常以便使用者能夠處理它們并重試其作業。

Delta Lake 還提供強大的可序列化隔離級别,允許工程師持續寫入目錄或表,并允許消費者繼續從同一目錄或表中讀取。讀者将看到閱讀開始時存在的最新快照。

Schema管理

Delta Lake 自動驗證正在被寫的 DataFrame 模式是否與表的模式相容。

表中存在但 DataFrame 中不存在的列會被設定為 null 如果 DataFrame 中有額外的列在表中不存在,那麼該操作将抛出異常 Delta Lake 具有可以顯式添加新列的 DDL 和自動更新Schema 的能力 可伸縮的中繼資料處理

Delta Lake 将表或目錄的中繼資料資訊存儲在事務日志中,而不是存儲在元存儲(metastore)中。這使得 Delta Lake 能夠在固定的時間内列出大型目錄中的檔案,并且在讀取資料時非常高效。

資料版本

Delta Lake 允許使用者讀取表或目錄之前的快照。當檔案被修改檔案時,Delta Lake 會建立較新版本的檔案并保留舊版本的檔案。當使用者想要讀取舊版本的表或目錄時,他們可以在 Apache Spark 的讀取 API 中提供時間戳或版本号,Delta Lake 根據事務日志中的資訊建構該時間戳或版本的完整快照。這允許使用者重制之前的資料,并在需要時将表還原為舊版本的資料。

統一的批處理和流 sink

除了批處理寫之外,Delta Lake 還可以使用作為 Apache Spark structured streaming 高效的流 sink。再結合 ACID 事務和可伸縮的中繼資料處理,高效的流 sink 現在支援許多接近實時的分析用例,而且無需維護複雜的流和批處理管道。

資料存儲格式采用開源 Apache Parquet

Delta Lake 中的所有資料都是使用 Apache Parquet 格式存儲,使 Delta Lake 能夠利用 Parquet 原生的高效壓縮和編碼方案。

更新和删除

Delta Lake 支援 merge, update 和 delete 等 DML 指令。這使得資料工程師可以輕松地在資料湖中插入/更新和删除記錄。由于 Delta Lake 以檔案級粒度跟蹤和修改資料,是以它比讀取和覆寫整個分區或表更有效。

資料異常處理

Delta Lake 還将支援新的 API 來設定表或目錄的資料異常。工程師能夠設定一個布爾條件并調整報警門檻值以處理資料異常。當 Apache Spark 作業寫入表或目錄時,Delta Lake 将自動驗證記錄,當資料存在異常時,它将根據提供的設定來處理記錄。

相容 Apache Spark API

開發人員可以将 Delta Lake 與他們現有的資料管道一起使用,僅需要做一些細微的修改。

基本使用

Create a table

val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

// 分區表
df.write.format("delta").partitionBy("date").save("/delta/events")           

Read table

val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()           

Update table

// overwrite
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")           
// update
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath("/tmp/delta-table")

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))

// Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

// Upsert (merge) new data
val newData = spark.range(0, 20).as("newData").toDF

deltaTable.as("oldData")
  .merge(
    newData,
    "oldData.id = newData.id")
  .whenMatched
  .update(Map("id" -> col("newData.id")))
  .whenNotMatched
  .insert(Map("id" -> col("newData.id")))
  .execute()

deltaTable.toDF.show()           
// update by expressions
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToEventsTable)

// predicate and update expressions using SQL formatted string
deltaTable.updateExpr(            
  "eventType = 'clck'",
  Map("eventType" -> "'click'")           
// merge
deltaTable
  .as("logs")
  .merge(
    updates.as("updates"),
    "logs.uniqueId = updates.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()           

Delete table

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToEventsTable)

deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.delete($"date" < "2017-01-01")       // predicate using Spark SQL functions and implicits           

流支援

查詢表的舊快照

Delta Lake 時間旅行允許您查詢 Delta Lake 表的舊快照。時間旅行有很多用例,包括:

  • 重新建立分析,報告或輸出(例如,機器學習模型的輸出)。這對于調試或審計非常有用,尤其是在受監管的行業中
  • 編寫複雜的臨時查詢
  • 修複資料中的錯誤
  • 為快速更改的表的一組查詢提供快照隔離 DataFrameReader options 允許從 Delta Lake 表建立一個DataFrame 關聯到表的特定版本,可以使用如下兩種方式:
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")           

對于timestamp_string,僅接受日期或時間戳字元串。例如,2019-01-01 和 2019-01-01 00:00:00.000Z

增加列

當以下任意情況為 true 時,DataFrame 中存在但表中缺少的列将自動添加為寫入事務的一部分:

  • write 或 writeStream 具有 .option("mergeSchema", "true") 添加的列将附加到它們所在的結構的末尾。附加新列時将保留大小寫。

NullType 列

寫入 Delta 時,會從 DataFrame 中删除 NullType 列(因為 Parquet 不支援 NullType)。當收到該列的不同資料類型時,Delta Lake 會将 schema 合并到新資料類型

預設情況下,覆寫表中的資料不會覆寫 schema。使用模式 overwrite 覆寫表而不使用 replaceWhere 時,可能仍希望覆寫正在寫入的資料的 schema。可以通過設定以下内容來選擇替換表的 schema :

df.write.option("overwriteSchema", "true")           

視圖

Transactional meta 實作

在檔案上增加一個日志結構化存儲(transaction log ),該日志有序(ordered)且保持原子性(atomic)。

增加或者删除資料時,都會産生一個描述檔案,采用樂觀并發控制 (optimistic concurrency control) 保證使用者并發操作時資料的一緻性。

Dive into Delta Lake | Delta Lake 嘗鮮

每次表更都生産一個描述檔案,描述檔案的記錄數和曆史版本數量一緻。如圖,delta-table表13個曆史版本就有13個描述檔案。

Dive into Delta Lake | Delta Lake 嘗鮮
Dive into Delta Lake | Delta Lake 嘗鮮
Dive into Delta Lake | Delta Lake 嘗鮮

并發控制

Delta Lake 在讀寫中提供了 ACID 事務保證。這意味着:

  • 跨多叢集的并發寫入,也可以同時修改資料集并檢視表的一緻性快照,這些寫入操作将按照串行執行
  • 在作業執行期間修改了資料,讀取時也能看到一緻性快照。

樂觀并發控制

Delta Lake 使用 optimistic concurrency control 機制提供寫資料時的事務保證,在這種機制下,寫過程包含三個步驟:

  • Write: 通過編寫新資料檔案來進行所有更改
  • Validate and commit: 調用 commit 方法,生成 commit 資訊,生成一個新的遞增1的檔案,如果相同的檔案名已經存在,則報 ConcurrentModificationException。

名詞解釋

ACID ACID 就是指資料庫事務的四個基本要素,對應的是原子性 Atomicity,一緻性 Consistency,隔離性 Isolation 和持久性 Durability。

  • 原子性: 一個事務要麼全部成功,要不全部失敗,事務出現錯誤會被復原到事務開始時候的狀态。
  • 一緻性: 系統始終處于一緻的狀态,所有操作都應該服務現實中的期望。
  • 隔離性: 并發事務不會互相幹擾,事務之間互相隔離。
  • 持久性: 事務結束後就一直儲存在資料庫中,不會被復原。

Snapshot

Snapshot 相當于目前資料的快照。這個快照包括的内容不僅僅隻有一個版本号,還會包括目前快照下的資料檔案,上一個 Snapshot 的操作,以及時間戳和 DeltaLog 的記錄。

MetaData

這裡是指 Delta Table 的中繼資料,包括 id,name,format,建立時間,schema 資訊等等。

事務日志

事務日志的相關代碼主要在 org.apache.spark.sql.delta.DeltaLog 中。這個是 Delta Lake 把對資料/表的操作的記錄日志。

CheckSum

可以說 CheckSum 是一個對象,裡面包含了,目前 SNAPSHOT 下的表的實體大小,檔案數,MetaData 的數量,協定以及事務的數量。這些資訊會轉成 Json 格式,存放在 CheckSumFile 中。

校驗檔案是在 Snapshot 的基礎上計算的,會和各自的事務生死存亡。

Dive into Delta Lake | Delta Lake 嘗鮮
Dive into Delta Lake | Delta Lake 嘗鮮

版權聲明:

本文為大資料技術與架構整理,原作者獨家授權。未經原作者允許轉載追究侵權責任。

編輯|冷眼丶

微信公衆号|import_bigdata

繼續閱讀