編譯:辰山,阿裡巴巴計算平台事業部 EMR 進階開發工程師,目前從事大資料存儲方面的開發和優化工作
在實踐經驗中,我們知道資料總是在不斷演變和增長,我們對于這個世界的心智模型必須要适應新的資料,甚至要應對我們從前未知的知識次元。表的 schema 其實和這種心智模型并沒什麼不同,需要定義如何對新的資訊進行分類和處理。
這就涉及到 schema 管理的問題,随着業務問題和需求的不斷演進,資料結構也會不斷發生變化。通過 Delta Lake,能夠很容易包含資料變化所帶來的新的次元,使用者能夠通過簡單的語義來控制表的 schema。相關工具主要包括 Schema 限制(Schema Enforcement)和 Schema 演變(Schema Evolution),前者用以防止使用者髒資料意外污染表,後者用以自動添加适當的新資料列。本文将詳細剖析這兩個工具。
了解表的 Schemas
Apache Spark 的每一個 DataFrame 都包含一個 schema,用來定義資料的形态,例如資料類型、列資訊以及中繼資料。在 Delta Lake 中,表的 schema 通過 JSON 格式存儲在事務日志中。
什麼是 Schema 限制?
Schema 限制(Schema Enforcement),也可稱作 Schema Validation,是 Delta Lake 中的一種保護機制,通過拒絕不符合表 schema 的寫入請求來保證資料品質。類似于一個繁忙的餐廳前台隻接受預定坐席的顧客,這個機制會檢查插入表格的每一列是否符合期望的列(換句話說,就是檢查每個列是否已經“預定坐席”),那些不在期望名單上的寫入将被拒絕。
Schema 限制如何工作?
Delta Lake 對寫入進行 schema 校驗,也就是說所有表格的寫入操作都會用表的 schema 做相容性檢查。如果 schema 不相容,Delta Lake 将會撤銷這次事務(沒有任何資料寫入),并且傳回相應的異常資訊告知使用者。
Delta Lake 通過以下準則判斷一次寫入是否相容,即對寫入的 DataFrame 必須滿足:
• 不能包含目标表 schema 中不存在的列。相反,如果寫入的資料沒有包含所有的列是被允許的,這些空缺的列将會被指派為 null。
• 不能包含與目标表類型不同的列。如果目标表包含 String 類型的資料,但 DataFrame 中對應列的資料類型為 Integer,Schema 限制将會傳回異常,防止該次寫入生效。
• 不能包含隻通過大小寫區分的列名。這意味着不能在一張表中同時定義諸如“Foo”和“foo”的列。不同于 Spark 可以支援大小寫敏感和不敏感(預設為大小寫不敏感)兩種不同的模式,Delta Lake 保留大小寫,但在 schema 存儲上大小寫不敏感。Parquet 在存儲和傳回列資訊上面是大小寫敏感的,是以為了防止潛在的錯誤、資料污染和丢失的問題,Delta Lake 引入了這個限制。
以下代碼展示了一次寫入過程,當添加一次新計算的列到 Delta Lake 表中。
# Generate a DataFrame of loans that we'll append to our Delta Lake table
loans = sql("""
SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
CAST(rand(10) * 10000 * count AS double) AS amount
FROM loan_by_state_delta
""")
# Show original DataFrame's schema
original_loans.printSchema()
"""
root
|-- addr_state: string (nullable = true)
|-- count: integer (nullable = true)
"""
# Show new DataFrame's schema
loans.printSchema()
"""
root
|-- addr_state: string (nullable = true)
|-- count: integer (nullable = true)
|-- amount: double (nullable = true) # new column
"""
# Attempt to append new DataFrame (with new column) to existing table
loans.write.format("delta") \
.mode("append") \
.save(DELTALAKE_PATH)
""" Returns:
A schema mismatch detected when writing to the Delta table.
To enable schema migration, please set:
'.option("mergeSchema", "true")\'
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.
不同于自動添加新的列,Delta Lake 受到 schema 限制并阻止了這次寫入生效。并且為了幫助定位是哪個列造成了不比對,Spark 會在錯誤棧中列印出兩者的 schema 作為對照。
Schema 限制有何作用?
由于 Schema 限制是一種嚴格的校驗,是以可以用于已清洗、轉化完成的資料,保證資料不受污染,可用于生産或者消費。典型的應用場景包括直接用于以下用途的表:
• 機器學習算法
• BI 儀表盤
• 資料分析和可視化工具
• 任何要求高度結構化、強類型、語義 schema 的生産系統
為了準備好最終的資料,很多使用者使用簡單的“多跳”架構來逐漸往表中添加結構。更多相關内容可以參考
Productionizing Machine Learning With Delta Lake.
當然,Schema 限制可以用在整個工作流程的任意地方,不過需要注意的是,有可能因為諸如不經意對寫入資料添加了某個列,導緻寫入流失敗的情況。
防止資料稀釋
看到這,你可能會問,到底需不需要大費周章做 Schema 限制?畢竟,有時候一個意料之外的 schema 不比對問題反而會影響整個工作流,特别是當新手使用 Delta Lake。為什麼不直接讓 schema 接受改變,這樣我們就能任意寫入 DataFrame 了。
俗話說,防患于未然,有些時候,如果不對 schema 進行強制限制,資料類型相容性的問題将會很容易出現,看上去同質的資料源可能包含了邊緣情況、污染列、錯誤變換的映射以及其他可怕的情況都可能會一夜之間污染了原始的表。是以更好的做法應該從根本上阻止這樣的情況發生,通過 Schema 限制就能夠做到,将這類錯誤顯式地傳回進行恰當的處理,而不是讓它潛伏在資料中,看似寫入時非常順利,但埋下了無法預知的隐患。
Schema 限制能夠確定表 schema 不會發生改變,除非你确切地執行了更改操作。它能有效的防止“資料稀釋”——當新的列頻繁添加,原本簡潔的表結構可能因為資料泛濫而失去原有的含義和用處。Schema 限制的設計初衷就是通過設定嚴格的要求來保證品質,確定表資料不受污染。
另一方面,假如經過再三确認之後,确定的确需要添加新的列,那解決方法也非常簡單,也就是下文即将介紹的 Schema 演變!
什麼是 Schema 演變
Schema 演變(Schema Evolution)允許使用者能夠友善地修改表的目前 schema,來适應不斷變化的資料。最常見的用法就是在執行添加和覆寫操作時,自動地添加一個或多個列來适應 schema。
Schema 演變如何工作?
繼續沿用上文的例子,對于之前由于 schema 不比對導緻請求被拒絕的情況,開發人員可以友善地使用 Schema 演變來添加新的列。Schema 演變的使用方式是在 .write 或 .writeStream 的 Spark 指令後面添加上 .option('mergeSchema', 'true')。
# Add the mergeSchema option
loans.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save(DELTALAKE_SILVER_PATH)
可以執行以下 Spark SQL 語句來察看圖表。
# Create a plot with the new column to confirm the write was successful
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10
當然,也可以選擇通過添加 spark.databricks.delta.schema.autoMerge = True 到 Spark 配置檔案中使得該選項對整個 Spark session 生效。不過需要注意的是,這樣使用的話, Schema 限制将不再會對 schema 不比對問題進行報警提示。
通過指定 mergeSchema 選項,所有在輸入 DataFrame 中存在但在目标表中不存在的列都将被作為該事務操作的一部分添加到 schema 末尾。也允許添加嵌套字段,這些字段将被添加到對應列的末尾。
資料科學家可以利用這個選項來添加新的列(例如一個新增的跟蹤名額,或是這個月的銷售資料)到已有的機器學習表中,而不必廢棄現有依賴于舊的列資訊的模型。
以下對表的添加和覆寫操作都是合法的 Schema 演變的操作:
• 添加新列(這是最常用的場景)
• 修改資料類型,Null->其他類型,或者向上類型轉換 Byte->Short->Integer
其他改動都是非法的 Schema 演變操作,需要通過添加 .option("overwriteSchema", "true") 選項來覆寫 schema 以及資料。舉個例子,表原本包含一個類型為 integer 的列“Foo”,而新的 schema 需要改成 string 類型,那麼所有的 Parquet 資料檔案都需要覆寫重寫。包括以下步驟:
• 删除這個列
• 修改列的資料類型
• 修改列名,僅用大小寫區分(例如“Foo”和“foo”)
最後,在 Spark 3.0 中,支援了顯式 DDL(通過 ALTER TABLE 方式),允許使用者能夠對 schema 執行以下操作:
• 添加列
• 修改列注釋
• 設定表的屬性來定義表的行為,例如設定事務日志的保留時間
Schema 演變有何作用?
Schema 演變可以用來顯式地修改表的 schema(而不是意外添加了并不想要的列)。這提供了一種簡單的方式來遷移 schema,因為它能自動添加上正确的列名和資料類型,而不需要進行顯式的定義。
總結
Schema 限制能夠拒絕與表不相容的任何的新的列或者 schema 的改動。通過設定嚴格的限制,資料工程師們可以完全信任他們的資料,進而能夠作出更好的商業決策。
另一方面,schema 演變則對 schema 限制進行了補充,使得一些期望的 schema 變更能夠自動地生效。畢竟,添加一個新的列本就不應該是一件困難的事情。
Schema 限制和 Schema 演變互相補益,合理地結合起來使用将能友善地管理好資料,避免髒資料侵染,保證資料的完整可靠。
原文連結:
https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html相關閱讀推薦:
Delta Lake,讓你從複雜的Lambda架構中解放出來 【譯】Databricks使用Spark Streaming和Delta Lake對流式資料進行資料品質監控介紹 【譯】Delta Lake 0.5.0介紹 Delta Lake - 資料湖的資料可靠性阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。
Apache Spark技術交流社群公衆号,微信掃一掃關注