天天看點

【譯】Delta Lake 0.5.0介紹

原文連結: https://databricks.com/blog/2020/01/29/query-delta-lake-tables-presto-athena-improved-operations-concurrency-merge-performance.html

最近,Delta Lake釋出了0.5.0版本,該版本加入了對Presto和Athena的支援,以及提升了操作的并發性,本文将對Delta Lake 0.5.0版本的變化進行一個簡單的介紹。

Delta Lake 0.5.0釋出的幾個最重要的特性如下:

  • 通過使用Manifest檔案能夠,支援其他資料處理引擎,現在能夠使用Scala、Java、Python和SQL的API生成Manifest檔案,并使用該檔案通過Presto和Amazon Athena通路Delta Lake中的表資料,詳細的使用方式見 https://docs.delta.io/0.5.0/presto-integration.html
  • 提升了Delta Lake所有操作的并發性,現在可以并發執行更多的Delta Lake操作。通過使用更加精細的沖突檢測政策,Delta Lake的樂觀并發控制得到了有效的改善,這使得我們能夠更加容易地在Delta表上執行複雜的工作流。舉例來說:
    1. 我們可以在給新分區添加内容的同時,在舊分區上執行delete操作;
    2. 在不相交的一組分區上同時執行update和merge操作;
    3. 讓檔案合并和增加檔案内容同時執行

想要擷取更多的資訊,可以參考開源Delta Lake 0.5.0的

release notes

。在此部落格文章中,我們将詳細介紹如何使用Presto讀取Delta Lake表、操作并發性的提升以及使用insert-only merge操作來更友善快速地去除重複資料。

使用Presto讀取Delta Lake表

正如這篇文章

Simple, Reliable Upserts and Deletes on Delta Lake Tables using Python APIs

所說,Delta Lake 的一些修改資料的操作,如delete操作,是通過給包含删除資料的檔案寫一個新版本,并隻是将舊版本檔案标記為已删除來實作的。Delta Lake采用這種方法的優勢在于能夠讓我們查詢舊版本的資料。如果我們想要了解哪些資料(或行)包含最新的資料,預設情況下我們可以去查詢事務日志。其他資料處理系統,如Presto和Athena想要擷取這些資訊,可以通過讀取Delta Lake生成的一種清單文本檔案——Manifest,該檔案中包含查詢Delta Lake表需要讀取的資料檔案清單。為了實作Presto和Athena讀取Delta Lake表,我們可以通過執行一些Python指令來實作,詳細的内容可以參考

Set up the Presto or Athena to Delta Lake integration and query Delta tables

生成Delta Lake的Manifest檔案

首先,使用以下代碼片段建立Delta Lake的Manifest檔案:

deltaTable = DeltaTable.forPath(pathToDeltaTable)
deltaTable.generate("symlink_format_manifest")           

正如代碼字面意思所示,以上操作将在表根目錄中生成Manifest檔案。如果你根據

這篇文章介紹的内容建立了

departureDelays

表,将會在表根目錄中産生一個新的檔案夾:

$/departureDelays.delta/_symlink_format_manifest           

該檔案夾中會有一個名為manifest的文本檔案。如果你檢視manifest檔案的内容(例如使用cat指令),你将能看到類似以下的文本内容,它們訓示了包含最新快照的檔案。

file:$/departureDelays.delta/part-00003-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00006-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00001-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00000-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00000-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00001-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00002-...-c000.snappy.parquet
file:$/departureDelays.delta/part-00007-...-c000.snappy.parquet           

建立Presto表以讀取生成的Manifest檔案

接下來的步驟是在Hive Metastore中建立一個外部表,以便Presto(或Athena)可以讀取上一步生成的Manifest檔案,來獲得需要讀取的Parquet檔案,以讀取Delta Lake表的最新快照。需要說明的是,對于Presto,你可以使用Apache Spark或Hive CLI來運作以下指令:

1. CREATE EXTERNAL TABLE departureDelaysExternal ( ... )
2. ROW FORMAT SERDE
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
3. STORED AS INPUTFORMAT
4. OUTPUTFORMAT
   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
5. LOCATION '$/departureDelays.delta/_symlink_format_manifest'           

一些重要的說明:

  • 第一行所定義的schema必須和Delta Lake中的schema相同
  • 第五行需要指向Manifest檔案的位置——

    _symlink_format_manifest

Presto(或Athena)需要配置

SymlinkTextInputFormat

才能從Manifest檔案中擷取Parquet資料檔案的清單,而不是使用目錄清單中的檔案。需要說明的是,如果想要使用分區表,需要按照

Configure Presto to read the generated manifests

這篇文章進行一些額外的步驟。

更新Manifest檔案

需要注意的是,如果Delata Lake的資料有更新,都需要重新生成Manifest檔案,以便Presto能夠擷取到最新的資料。

操作并發性的提升

在Delta Lake 0.5.0版本,我們能夠同時執行更多的操作。通過更細粒度的沖突檢測,這些最新的更新讓Delta Lake能夠更容易地在Delta Lake表上執行複雜的工作流,例如:

  • 可以在給新分區添加内容的同時,在舊分區上執行delete操作;
  • 追加檔案内容的同時執行檔案合并操作;
  • 在不相交的一組分區上同時執行update和merge操作。

并發追加檔案内容的用例

舉個例子,當我們在執行merge操作的同時,如果有并發的事務向同一個分區寫入記錄,Delta Lake往往會抛出

ConcurrentAppendException

異常。

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()           

上面的代碼段就有可能會引發沖突,因為即使表已經按照date和country進行了分區,條件仍然不夠明确。問題在于,這個查詢将掃描整個表,進而可能與update任何其他分區的并發操作發生沖突。通過指定

specificDate

specificCountry

,以便可以在特定的date和country進行merge操作,現在我們就可以安全地在不同的date和contry同時執行此操作。

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND d.date = '" + specificDate + "' AND d.country = '" + specificCountry + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()           

以上方法适用于其他所有的Delta Lake操作(如delete、更改中繼資料等)。

并發檔案合并

如果你連續不斷地将資料寫入Delta表,随着時間的流逝,将會累積出大量的檔案。這在流式資料場景中尤為重要,因為此時是以比較小的batch寫入資料的,這将會導緻檔案系統不斷地累積小檔案,随着時間的推移,小檔案的數量會不斷增加,會降低查詢的性能。優化這種場景的一個比較重要的方式就是定期擷取大批量的小檔案,并将其重寫為數量比較小的大檔案(檔案合并)。

過去,在同時進行資料查詢和執行檔案合并時,出現異常的可能性會非常高。但是現在,由于Delta Lake 0.5.0版本的優化改進,我們可以同時執行查詢操作(包括流式查詢)和檔案的合并,并且不會有任何異常産生。舉個例子來說,如果你的表已經進行了分區,并且你隻想基于謂詞對一個分區進行重新分區,則可以使用where來僅讀取該分區,并使用

replaceWhere

回寫該分區:

path = "..."
partition = "year = '2019'"
numFilesPerPartition = 16   # Compact partition of a table to no. of files

(spark.read
  .format("delta")
  .load(path)
  .where(partition)
  .repartition(numFilesPerPartition)
  .write
  .option("dataChange", "false")
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", partition)
  .save(path))           

以上代碼中需要注意的是,僅在沒有資料更改時,才使用

dataChange == false

選項,否則可能會破壞底層資料。

使用Insert-only Merge操作友善快速地去除重複資料

一個場景的

ETL

用例是搜集日志,并将其附加到Delta Lake表當中,一個比較常見的問題是資料源會産生重複的日志記錄。通過使用Delta Lake的merge,你可以避免插入這些重複的記錄,例如以下涉及merge以及update航班資料的代碼:

# Merge merge_table with flights
deltaTable.alias("flights") \
    .merge(merge_table.alias("updates"),"flights.date = updates.date") \
    .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
    .whenNotMatchedInsertAll() \
    .execute()           

在Delta Lake 0.5.0版本之前,不可能從Delta Lake表中将重複資料作為流進行讀取,因為insert-only merge并不是純粹地将資料追加到表中。

例如,在流查詢中,你可以在

foreachBatch

中執行merge操作來連續不斷地将流資料寫入Delta Lake表當中,并将需要删除的重複資料打上标記。以下PySpark的代碼展示了這個場景:

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()
}

# Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream \
  .format("delta") \
  .foreachBatch(upsertToDelta) \
  .outputMode("update") \
  .start()           

在另一個流式查詢中,你可以從該Delta Lake表中連續讀取需要删除的重複資料。這是可能的,因為insert-only merge操作(在Delta Lake 0.5.0版本引入)隻會将新資料追加到Delta Lake表中。