本文的例子來自 Delta Lake 官方教程 。因為官方教程是基于商業軟體 Databricks Community Edition 建構,雖然教程中使用的軟體特性都是開源 Delta Lake 版本所具備的,但是考慮到國内的網絡環境,注冊和使用 Databricks Community Edition 門檻較高。是以本文嘗試基于開源的 Jupiter Notebook 重新建構這個教程。
準備一個環境安裝 Spark 和 jupyter
本文基于 Linux 建構開發環境,同時使用的軟體比如 conda、jupyter以及 pyspark 等都可以在 Windows 和 MacOS 上找到,理論上來說也完全可以在這兩個系統上完成此教程。
假設系統已經安裝 anaconda 或 miniconda,我們使用 conda 來建構開發環境,可以非常友善的安裝 pyspark 和 jupyter notebook
conda create --name spark
conda activate spark
conda install pyspark
conda install -c conda-forge jupyterlab
環境變量設定
我們在設定一些環境變量之後,就可以使用 pyspark 指令來建立 jupyter notebook 服務
export SPARK_HOME=$HOME/miniconda3/envs/spark/lib/python3.7/site-packages/pyspark
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
啟動服務(注意這裡的參數裡指定了 Delta Lake 的 package,Spark 會幫忙自動下載下傳依賴):
pyspark --packages io.delta:delta-core_2.11:0.5.0
接下去所有代碼在 notebook 裡運作
下載下傳需要 parquet 檔案
%%bash
rm -fr /tmp/delta_demo
mkdir -p /tmp/delta_demo/loans/
wget -O /tmp/delta_demo/loans/SAISEU19-loan-risks.snappy.parquet https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet
ls -al /tmp/delta_demo/loans/
Delta Lake的批流處理
在這裡我們進入正題,開始介紹 Delta Lake 的批流處理能力。
首先,我們通過批處理的形式建立一張 Delta Lake 表,資料來自前面我們下載下傳的 parquet 檔案,可以和友善的把一張 parquet 表轉換為 Delta Lake 表:
import os
import shutil
from pyspark.sql.functions import *
delta_path = "/tmp/delta_demo/loans_delta"
# Delete a new delta table with the parquet file
if os.path.exists(delta_path):
print("Deleting path " + delta_path)
shutil.rmtree(delta_path)
# Create a new delta table with the parquet file
spark.read.format("parquet").load("/tmp/delta_demo/loans") \
.write.format("delta").save(delta_path)
print("Created a Delta table at " + delta_path)
我來查一下這張表,資料量是否正确:
# Create a view on the table called loans_delta
spark.read.format("delta").load(delta_path).createOrReplaceTempView("loans_delta")
print("Defined view 'loans_delta'")
spark.sql("select count(*) from loans_delta").show()
Defined view 'loans_delta'
+--------+
|count(1)|
+--------+
| 14705|
+--------+
接下去我們會使用Spark Streaming流式寫入這張 Delta Lake 表,同時展示 Delta Lake 的 Schema enforcement 能力(本文省略了流式寫 Parquet 表的示範部分,那部分指出了 parquet 檔案的不足,比如無法強制指定 Schema )
import random
from pyspark.sql.functions import *
from pyspark.sql.types import *
def random_checkpoint_dir():
return "/tmp/delta_demo/chkpt/%s" % str(random.randint(0, 10000))
# User-defined function to generate random state
states = ["CA", "TX", "NY", "IA"]
@udf(returnType=StringType())
def random_state():
return str(random.choice(states))
# Generate a stream of randomly generated load data and append to the delta table
def generate_and_append_data_stream_fixed(table_format, table_path):
stream_data = spark.readStream.format("rate").option("rowsPerSecond", 50).load() \
.withColumn("loan_id", 10000 + col("value")) \
.withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \
.withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) \
.withColumn("addr_state", random_state()) \
.select("loan_id", "funded_amnt", "paid_amnt", "addr_state") # *********** FIXED THE SCHEMA OF THE GENERATED DATA *************
query = stream_data.writeStream \
.format(table_format) \
.option("checkpointLocation", random_checkpoint_dir()) \
.trigger(processingTime="10 seconds") \
.start(table_path)
return query
啟動兩個流式作業:
stream_query_1 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)
stream_query_2 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)
因為 Delta Lake 的樂觀鎖機制,多個流可以同時寫入一張表,并保證資料的完整性。
通過批處理的方式來查詢一下目前表中的資料量,我們發現有資料被插入了:
spark.sql("select count(*) from loans_delta").show()
+--------+
|count(1)|
+--------+
| 17605|
+--------+
接下去我們停止所有流的寫入,接下去會展示 Delta Lake 的其他特性
# Function to stop all streaming queries
def stop_all_streams():
# Stop all the streams
print("Stopping all streams")
for s in spark.streams.active:
s.stop()
print("Stopped all streams")
print("Deleting checkpoints")
shutil.rmtree("/tmp/delta_demo/chkpt/", True)
print("Deleted checkpoints")
stop_all_streams()
Schema evolution(Schema演化)
Delta Lake 支援Schema演化,也就是說我們可以增加或改變表字段。接下去的批處理 SQL 會新增加一些資料,同時這些資料比之前的多了一個“closed”字段。我們将新的 DF 配置參數
mergeSchema
為
true
來顯示指明 Delta Lake 表 Schema 的演化:
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']
items = [
(1111111, 1000, 1000.0, 'TX', True),
(2222222, 2000, 0.0, 'CA', False)
]
loan_updates = spark.createDataFrame(items, cols) \
.withColumn("funded_amnt", col("funded_amnt").cast("int"))
loan_updates.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save(delta_path)
來看一下插入新資料之後的表内容,新增加了 closed 字段,之前的老資料行這個字段預設為 null。
spark.read.format("delta").load(delta_path).show()
+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
| 0| 1000| 182.22| CA| null|
| 1| 1000| 361.19| WA| null|
| 2| 1000| 176.26| TX| null|
| 3| 1000| 1000.0| OK| null|
| 4| 1000| 249.98| PA| null|
| 5| 1000| 408.6| CA| null|
| 6| 1000| 1000.0| MD| null|
| 7| 1000| 168.81| OH| null|
| 8| 1000| 193.64| TX| null|
| 9| 1000| 218.83| CT| null|
| 10| 1000| 322.37| NJ| null|
| 11| 1000| 400.61| NY| null|
| 12| 1000| 1000.0| FL| null|
| 13| 1000| 165.88| NJ| null|
| 14| 1000| 190.6| TX| null|
| 15| 1000| 1000.0| OH| null|
| 16| 1000| 213.72| MI| null|
| 17| 1000| 188.89| MI| null|
| 18| 1000| 237.41| CA| null|
| 19| 1000| 203.85| CA| null|
+-------+-----------+---------+----------+------+
only showing top 20 rows
新的資料行具有 closed 字段:
spark.read.format("delta").load(delta_path).filter(col("closed") == True).show()
+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|1111111| 1000| 1000.0| TX| true|
+-------+-----------+---------+----------+------+
Delta Lake 表的删除操作
除了正常的插入操作,Delta Lake 還支援 update 和 delete 等功能,可以更新表格内容。下面展示删除操作,我們希望删除表格中貸款已經被完全還清的記錄。下面幾條指令可以簡單和清晰的展示删除過程。
首先,我們看看符合條件的記錄有多少條:
spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show()
+--------+
|count(1)|
+--------+
| 5134|
+--------+
然後,我們執行一個 delete 指令:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, delta_path)
deltaTable.delete("funded_amnt = paid_amnt")
最後,我們看一下删除後的結果,發現符合條件的記錄都已被删除:
spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show()
+--------+
|count(1)|
+--------+
| 0|
+--------+
版本曆史和回溯
Delta Lake 還具有很強大曆史版本記錄和回溯功能。
history()
方法清晰的展示了剛才那張表的修改記錄,包括最後一次 Delete 操作。
deltaTable.history().show()
+-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version| timestamp|userId|userName| operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
| 10|2020-02-22 22:14:06| null| null| DELETE|[predicate -> ["(...|null| null| null| 9| null| false|
| 9|2020-02-22 22:13:57| null| null| WRITE|[mode -> Append, ...|null| null| null| 8| null| true|
| 8|2020-02-22 22:13:52| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 6| null| true|
| 7|2020-02-22 22:13:50| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 6| null| true|
| 6|2020-02-22 22:13:42| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 4| null| true|
| 5|2020-02-22 22:13:40| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 4| null| true|
| 4|2020-02-22 22:13:32| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 2| null| true|
| 3|2020-02-22 22:13:30| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 2| null| true|
| 2|2020-02-22 22:13:22| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 1| null| true|
| 1|2020-02-22 22:13:20| null| null|STREAMING UPDATE|[outputMode -> Ap...|null| null| null| 0| null| true|
| 0|2020-02-22 22:13:18| null| null| WRITE|[mode -> ErrorIfE...|null| null| null| null| null| true|
+-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
如果我們希望看一下剛才删除操作前的資料表狀态,可以很友善的回溯到前一個快照點,并進行再次查詢(我們可以看到被删除的記錄又出現了)。
previousVersion = deltaTable.history(1).select("version").collect()[0][0] - 1
spark.read.format("delta") \
.option("versionAsOf", previousVersion) \
.load(delta_path) \
.createOrReplaceTempView("loans_delta_pre_delete") \
spark.sql("SELECT COUNT(*) FROM loans_delta_pre_delete WHERE funded_amnt = paid_amnt").show()
+--------+
|count(1)|
+--------+
| 5134|
+--------+
結論
本文通過 jupyter notebook 工具示範了 Delta Lake 的官方教程,你可以在本文末尾下載下傳到完整的 notebook 檔案。