最近,Delta Lake釋出了0.5.0版本,該版本加入了對Presto和Athena的支援,以及提升了操作的并發性,本文将對Delta Lake 0.5.0版本的變化進行一個簡單的介紹。
Delta Lake 0.5.0釋出的幾個最重要的特性如下:
- 通過使用Manifest檔案能夠,支援其他資料處理引擎,現在能夠使用Scala、Java、Python和SQL的API生成Manifest檔案,并使用該檔案通過Presto和Amazon Athena通路Delta Lake中的表資料,詳細的使用方式見 https://docs.delta.io/0.5.0/presto-integration.html
- 提升了Delta Lake所有操作的并發性,現在可以并發執行更多的Delta Lake操作。通過使用更加精細的沖突檢測政策,Delta Lake的樂觀并發控制得到了有效的改善,這使得我們能夠更加容易地在Delta表上執行複雜的工作流。舉例來說:
- 我們可以在給新分區添加内容的同時,在舊分區上執行delete操作;
- 在不相交的一組分區上同時執行update和merge操作;
- 讓檔案合并和增加檔案内容同時執行
想要擷取更多的資訊,可以參考開源Delta Lake 0.5.0的
release notes。在此部落格文章中,我們将詳細介紹如何使用Presto讀取Delta Lake表、操作并發性的提升以及使用insert-only merge操作來更友善快速地去除重複資料。
使用Presto讀取Delta Lake表
正如這篇文章
Simple, Reliable Upserts and Deletes on Delta Lake Tables using Python APIs所說,Delta Lake 的一些修改資料的操作,如delete操作,是通過給包含删除資料的檔案寫一個新版本,并隻是将舊版本檔案标記為已删除來實作的。Delta Lake采用這種方法的優勢在于能夠讓我們查詢舊版本的資料。如果我們想要了解哪些資料(或行)包含最新的資料,預設情況下我們可以去查詢事務日志。其他資料處理系統,如Presto和Athena想要擷取這些資訊,可以通過讀取Delta Lake生成的一種清單文本檔案——Manifest,該檔案中包含查詢Delta Lake表需要讀取的資料檔案清單。為了實作Presto和Athena讀取Delta Lake表,我們可以通過執行一些Python指令來實作,詳細的内容可以參考
Set up the Presto or Athena to Delta Lake integration and query Delta tables。
生成Delta Lake的Manifest檔案
首先,使用以下代碼片段建立Delta Lake的Manifest檔案:
deltaTable = DeltaTable.forPath(pathToDeltaTable)
deltaTable.generate("symlink_format_manifest")
正如代碼字面意思所示,以上操作将在表根目錄中生成Manifest檔案。如果你根據
這篇文章介紹的内容建立了
departureDelays
表,将會在表根目錄中産生一個新的檔案夾:
$/departureDelays.delta/_symlink_format_manifest
該檔案夾中會有一個名為manifest的文本檔案。如果你檢視manifest檔案的内容(例如使用cat指令),你将能看到類似以下的文本内容,它們訓示了包含最新快照的檔案。
file:$/departureDelays.delta/part-00003-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00006-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00001-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00000-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00000-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00001-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00002-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00007-...-c000.snappy.parquet
建立Presto表以讀取生成的Manifest檔案
接下來的步驟是在Hive Metastore中建立一個外部表,以便Presto(或Athena)可以讀取上一步生成的Manifest檔案,來獲得需要讀取的Parquet檔案,以讀取Delta Lake表的最新快照。需要說明的是,對于Presto,你可以使用Apache Spark或Hive CLI來運作以下指令:
1. CREATE EXTERNAL TABLE departureDelaysExternal ( ... )
2. ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
3. STORED AS INPUTFORMAT
4. OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
5. LOCATION '$/departureDelays.delta/_symlink_format_manifest'
一些重要的說明:
- 第一行所定義的schema必須和Delta Lake中的schema相同
- 第五行需要指向Manifest檔案的位置——
_symlink_format_manifest
Presto(或Athena)需要配置
SymlinkTextInputFormat
才能從Manifest檔案中擷取Parquet資料檔案的清單,而不是使用目錄清單中的檔案。需要說明的是,如果想要使用分區表,需要按照
Configure Presto to read the generated manifests這篇文章進行一些額外的步驟。
更新Manifest檔案
需要注意的是,如果Delata Lake的資料有更新,都需要重新生成Manifest檔案,以便Presto能夠擷取到最新的資料。
操作并發性的提升
在Delta Lake 0.5.0版本,我們能夠同時執行更多的操作。通過更細粒度的沖突檢測,這些最新的更新讓Delta Lake能夠更容易地在Delta Lake表上執行複雜的工作流,例如:
- 可以在給新分區添加内容的同時,在舊分區上執行delete操作;
- 追加檔案内容的同時執行檔案合并操作;
- 在不相交的一組分區上同時執行update和merge操作。
并發追加檔案内容的用例
舉個例子,當我們在執行merge操作的同時,如果有并發的事務向同一個分區寫入記錄,Delta Lake往往會抛出
ConcurrentAppendException
異常。
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
上面的代碼段就有可能會引發沖突,因為即使表已經按照date和country進行了分區,條件仍然不夠明确。問題在于,這個查詢将掃描整個表,進而可能與update任何其他分區的并發操作發生沖突。通過指定
specificDate
和
specificCountry
,以便可以在特定的date和country進行merge操作,現在我們就可以安全地在不同的date和contry同時執行此操作。
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND d.date = '" + specificDate + "' AND d.country = '" + specificCountry + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
以上方法适用于其他所有的Delta Lake操作(如delete、更改中繼資料等)。
并發檔案合并
如果你連續不斷地将資料寫入Delta表,随着時間的流逝,将會累積出大量的檔案。這在流式資料場景中尤為重要,因為此時是以比較小的batch寫入資料的,這将會導緻檔案系統不斷地累積小檔案,随着時間的推移,小檔案的數量會不斷增加,會降低查詢的性能。優化這種場景的一個比較重要的方式就是定期擷取大批量的小檔案,并将其重寫為數量比較小的大檔案(檔案合并)。
過去,在同時進行資料查詢和執行檔案合并時,出現異常的可能性會非常高。但是現在,由于Delta Lake 0.5.0版本的優化改進,我們可以同時執行查詢操作(包括流式查詢)和檔案的合并,并且不會有任何異常産生。舉個例子來說,如果你的表已經進行了分區,并且你隻想基于謂詞對一個分區進行重新分區,則可以使用where來僅讀取該分區,并使用
replaceWhere
回寫該分區:
path = "..."
partition = "year = '2019'"
numFilesPerPartition = 16 # Compact partition of a table to no. of files
(spark.read
.format("delta")
.load(path)
.where(partition)
.repartition(numFilesPerPartition)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.option("replaceWhere", partition)
.save(path))
以上代碼中需要注意的是,僅在沒有資料更改時,才使用
dataChange == false
選項,否則可能會破壞底層資料。
使用Insert-only Merge操作友善快速地去除重複資料
一個場景的
ETL用例是搜集日志,并将其附加到Delta Lake表當中,一個比較常見的問題是資料源會産生重複的日志記錄。通過使用Delta Lake的merge,你可以避免插入這些重複的記錄,例如以下涉及merge以及update航班資料的代碼:
# Merge merge_table with flights
deltaTable.alias("flights") \
.merge(merge_table.alias("updates"),"flights.date = updates.date") \
.whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
.whenNotMatchedInsertAll() \
.execute()
在Delta Lake 0.5.0版本之前,不可能從Delta Lake表中将重複資料作為流進行讀取,因為insert-only merge并不是純粹地将資料追加到表中。
例如,在流查詢中,你可以在
foreachBatch
中執行merge操作來連續不斷地将流資料寫入Delta Lake表當中,并将需要删除的重複資料打上标記。以下PySpark的代碼展示了這個場景:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/aggregates")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
}
# Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream \
.format("delta") \
.foreachBatch(upsertToDelta) \
.outputMode("update") \
.start()
在另一個流式查詢中,你可以從該Delta Lake表中連續讀取需要删除的重複資料。這是可能的,因為insert-only merge操作(在Delta Lake 0.5.0版本引入)隻會将新資料追加到Delta Lake表中。