天天看點

Delta Lake 平台化實踐(離線篇)

原文連結: https://blog.csdn.net/lsshlsw/article/details/103553289 部落客:breeze_lsw

01

SQL 支援

1.1 DML

背景

delta lake 0.4 隻支援以 api 的方式使用 Delete/Update/Merge Into 等 DML,對習慣了使用 sql 的終端使用者會增加其學習使用成本。

解決方式

下文通過 spark sql extension 以插件化的方式擴充 sql parser ,增加 DML 文法的支援。在 spark 推出 sql extension 功能前,也可以用通過 aspectj 通過攔截 sql 的方式實作增加自定義文法的功能。

1.在自定義擴充 g4 檔案中相應的 antlr4 DML 文法,部分參考了 databricks 商業版的文法

statement
    : DELETE FROM table=qualifiedName tableAlias
        (WHERE where=booleanExpression)?                              #deleteFromTable
    | UPDATE table=qualifiedName tableAlias upset=setClause
        (WHERE where=booleanExpression)?                              #updateTable
    | MERGE INTO target=qualifiedName targetAlias=tableAlias
        USING (source=qualifiedName |
            '(' sourceQuery=query')') sourceAlias=tableAlias
            ON mergeCondition=booleanExpression
            matchedClause*
            notMatchedClause*                                               #mergeIntoTable           

2.實作對應的 visit,将 sql 翻譯為 delta api,以最簡單的 delete 為例

override def visitDeleteFromTable(ctx: DeleteFromTableContext): AnyRef = withOrigin(ctx) {
    DeleteTableCommand(
        visitTableIdentifier(ctx.table),
         Option(getText(ctx.where)))
}

case class DeleteTableCommand(table: TableIdentifier,
                              where: Option[String]) extends RunnableCommand {
     override def run(sparkSession: SparkSession): Seq[Row] = {
       DeltaUtils.deltaTableCheck(sparkSession, table, "DELETE")
       val deltaTable = DeltaUtils.getDeltaTable(sparkSession, table)
       if (where.isEmpty) {
         deltaTable.delete()
       } else {
         deltaTable.delete(where.get)
       }
       Seq.empty[Row]
     }
}           

3.啟動 Spark 時加載打包的 extension jar ,初始化 SparkSession 時指定 Extension 類。

val spark = SparkSession.builder
    .enableHiveSupport()
    .config("spark.sql.extensions", "cn.tongdun.spark.sql.TDExtensions")           

tip

spark 3 之前不支援配置多個 extension ,如果遇到使用多個 extension 的情況,可以将多個 extension 在一個 extension 代碼中進行注入。

以同時增加 tispark extension 和 自定義 extension 為例

override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectParser(TiParser(getOrCreateTiContext))
    extensions.injectResolutionRule(TiDDLRule(getOrCreateTiContext))
    extensions.injectResolutionRule(TiResolutionRule(getOrCreateTiContext))
    extensions.injectPlannerStrategy(TiStrategy(getOrCreateTiContext))
    extensions.injectParser { (session, parser) => new TDSparkSqlParser(session, parser)}
}           

1.2 Query

識别 delta table 有三種實作方式

  1. 使用相應表名字首/字尾作為辨別
  2. 在 table properties 中增加相應的參數進行識别
  3. 判斷表目錄下是否存在_delta_log

我們一開始是使用 delta_ 的字首作為 delta 表名辨別,這樣實作最為簡單,但是如果使用者将 hive(parquet) 表轉為 hive(delta) ,要是表名發生變化則需要修改相關代碼,是以後面改為在table propertie 中增加相應的參數進行識别。

也可以通過判斷是否存在 _delta_log 檔案識别,該方式需要在建表時寫入帶有 schema 資訊的空資料。

Query 通過對sql執行進行攔截,判斷 Statement 為 SELECT 類型,然後将 delta 表的查詢翻譯成對應的 api 進行查詢。

if (statementType == SELECT) {
    TableData tableData = (TableData) statementData.getStatement();
    sql = DatasourceAdapter.selectAdapter(tableData, sparkSession, sql);
}           

1.3 Insert

Insert 需要考慮 INSERT_VALUES/INSERT_SELECT ,還有分區表/非分區表以及寫入方式的一些情況。

sql 類型判斷

if (INSERT_SELECT == statementType) {
    isDeltaTable = DatasourceAdapter.deltaInsertSelectAdapter(sparkSession, statementData);
} else if (INSERT_VALUES == statementType) {
    isDeltaTable = DatasourceAdapter.deltaInsertValuesAdapter(sparkSession, statementData);
}           

INSERT_INTO 需要從 catalog 中擷取對應的 schema 資訊,并将 values 轉化為 dataFrame

val rows = statementData.getValues.asScala.map(_.asScala.toSeq).map { x => Row(x: _*) }
import spark.implicits._
val schemaStr = spark.catalog.listColumns(dbName, tableName)
    .map(col => col.name + " " + col.dataType)
    .collect().mkString(",")
val schema = StructType.fromDDL(schemaStr)
val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows), schema)           

INSERT_SELECT 則直接通路被解析過的 Delta Query 子句。

partition

由于 delta api 的限制,不支援靜态分區,可以從 tableMeta 中解析到對應的動态分區名,使用 partitionBy 寫入即可。

至此,已經實作使用 apache spark 2.4 使用 sql 直接操作 delta table 表。

02

平台化工作

與 hive metastore 的內建,表資料管理 等平台化的一些工作。

2.1 浏覽 delta 資料

使用者在平台上點選浏覽資料,如果通過 delta api ,啟動 spark job 的方式從 HDFS 讀取資料,依賴重,延時高,使用者體驗差。

基于之前在 parquet 格式上的一些工作,浏覽操作可以簡化為找出 delta 事務日志中還存活 (add - remove) 的 parquet 檔案進行讀取,這樣就避免了啟動 spark 的過程,大多數情況能做到毫秒級傳回資料。

需要注意的是,_delta_log 檔案隻存在父目錄,浏覽某個分區的資料同樣需要浏覽父目錄擷取相應分區内的存活檔案。

// DeltaHelper.load 方法會從 _delta_log 目錄中找到存活 parquet 檔案,然後使用 ParquetFileReader 讀取
List<Path> inputFiles;
if (DeltaHelper.isDeltaTable(dir, conf)) {
    inputFiles = DeltaHelper.load(dir, conf);
} else {
    inputFiles = getInputFilesFromDirectory(projectCode, dir);
}
           

從 delta 0.5 開始,浏覽資料的功能可以通過 manifest 檔案進行更簡單的實作,具體内容可以參考下一篇文章。

2.2 浏覽 delta 資料

将原生 delta lake 基于 path 的工作方式與 hive metastore 進行相容。

資料寫入/删除

資料動态分區插入 - 統計寫入的分區資訊(我們是通過修改了 spark write 部分的代碼得到的寫入分區資訊),如果分區不存在則自動增加分區 add partition if ...。還有一種更簡單的做法是直接使用 msck repair table ,但是這種方式在分區多的情況下,性能會非常糟糕。

删除分區 - 在界面上操作對某個分區進行删除時,背景調用 delta 删除api,并更新相關 partition 資訊。

中繼資料資訊更新

中繼資料中表/分區記錄數,大小等中繼資料的更新支援。

2.3 碎片檔案整理

  • 非 delta lake 表小檔案整理方式可以參考我之前在 csdn 上的文章。這種方式采用的是在資料生成後校驗,如果有碎片檔案則進行同步合并, Spark 小檔案合并優化實踐
  • 非 delta lake 表的小檔案整理使用的是同步模式,可能會影響到下有任務的啟動時間。

基于 delta lake 的小檔案整理要分為兩塊,存活資料和标記删除的資料

  1. 标記删除的資料

    被 delta 删除的資料,底層 parquet 檔案依舊存在,隻是在 delta_log 中做了标記,讀取時跳過了該檔案。

可以使用 delta 自帶的 vacuum 功能删除一定時間之前标記删除的資料。

  1. 存活資料

    可以實作一個 compaction 功能,在背景定時做異步合并,由于 delta 支援事務管理的特性,該過程對使用者透明,合并過程中保證了資料一緻性且不會中斷任務。

03

結語

3.1 一些限制

由于 delta api 的限制,目前 delta delete / update 不支援子句,可以使用 merge into 文法實作相同功能。

由于 delta api 的限制,隻支援動态分區插入。

3.2 merge 使用場景

upsert

有 a1,a2 兩張表,如果 a.1eventId = a2.eventId ,則 a2.data 會覆寫 a1.data,否則将 a2 表中相應的資料插入到 a1 表

MERGE INTO bigdata.table1 a1
USING bigdata.table2 a2
ON a1.eventId = a2.eventId
WHEN MATCHED THEN
  UPDATE SET a1.data = a2.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (a2.date, a2.eventId, a2.data)           

ETL 避免資料重複場景

如果 uniqueid 隻存在于 a2 表,則插入 a2 表中的相應記錄

MERGE INTO logs a1
USING updates a2
ON a1.uniqueId = a2.uniqueId
WHEN NOT MATCHED
  THEN INSERT *           

次元表更新場景

  • 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 為 true ,則删除 a1 表相應記錄
  • 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 為 false ,則将 a2 表相應記錄的 value 更新到 a1 表中
  • 如果沒有比對到相應合作方,且 a2 中 deleted 為 fasle ,則将 a2 表相應記錄插入到 a1 表
MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a2.deleted = true THEN DELETE
WHEN MATCHED THEN UPDATE SET a1.value = a2.newValue
WHEN NOT MATCHED AND a2.deleted = false THEN INSERT (partnerCode, value) VALUES (partnerCode, newValue)           

曆史資料清理場景

如果 a1 和 a2 表的合作方相同,則删除 a1 表中 ds < 20190101 的所有資料

MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a1.ds < '20190101' THEN
  DELETE           

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

Apache Spark技術交流社群公衆号,微信掃一掃關注