天天看點

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

本文轉載自公衆号: eBay技術荟

作者 | 金瀾濤

原文連結:

https://mp.weixin.qq.com/s/L64xhtKztwWhlBQrreiDfQ

摘要

大資料處理技術朝傳統資料庫領域靠攏已經成為行業趨勢,目前開源的大資料處理引擎,如Apache Spark、Apache Hadoop、Apache Flink等等都已經支援SQL接口,且SQL的使用往往占據主導地位。各個公司使用以上開源軟體建構自己的ETL架構和OLAP技術,但在OLTP技術上,仍然是傳統資料庫的強項。其中的一個主要原因是傳統資料庫對ACID的支援。具有ACID能力的傳統商用資料庫基本都實作了完整的CRUD操作。而在大資料技術領域,由于缺少ACID的支援,基本隻實作了C/R操作,對U/D操作很少涉及。

eBay資料倉庫的部分基礎設施是建構在商用資料産品Teradata之上的,近年來,随着公司整體朝開源技術遷移,資料倉庫的基礎設施已基本遷移到Apache Hadoop、Apache Spark平台。但要完全從Teradata上遷移下來,必須建構具有相同能力的SQL處理引擎。在Teradata上的分析型SQL,有超過5%的查詢使用Update/Delete操作,目前Apache Spark并不具備這個能力。

本文介紹eBay Carmel團隊利用Delta Lake,使Spark SQL支援Teradata的Update/Delete文法。對比标準SQL的Update/Delete文法,以及目前尚未正式釋出的Apache Spark 3.0 提供的文法(不含實作),我們還實作了Teradata的擴充文法,可以進行跨表更新和删除的SQL操作。

1.簡介

Carmel Spark是Carmel團隊基于Apache Spark進行魔改的SQL-on-Hadoop引擎。主要改善了互動式分析的使用體驗,提供即席查詢(ad-hoc)服務。Carmel Spark是“Teradata退出”項目的重要組成部分,在功能性和性能上,都做了大量開發和優化。例如全新的CBO、并發排程、物化視圖、索引、臨時表、Extended Adaptive Execution、Range Partition、列級通路權限控制,以及各類監控和管理功能等,目前已經線上上使用且滿足業務需求。

但由于Apache Spark缺少ACID事務能力,并沒有提供Update/Delete文法。去年年初,Databricks開源了存儲層Delta Lake,為Apache Spark 提供可伸縮的 ACID 事務,提供事務管理、統一流批、中繼資料管理、版本回溯等資料庫領域常見功能。一年過去了,Delta Lake的版本也更新到了0.5.0,但開源版本始終沒有提供Update/Delete的SQL實作。目前隻提供Dataframe API,使用者需通過編寫代碼來對資料進行更新和删除等操作。此外,根據Apache Spark 3.0分支上提供的SQL文法接口,也隻支援基本的單表Update/Delete操作,對于複雜的帶有join語義的跨表操作,則完全不支援。而Teradata使用者已經在廣泛使用擴充的SQL文法對資料進行更新和删除操作。

基于Delta Lake存儲層提供的ACID事務能力,Carmel Spark實作了Update/Delete的SQL文法,且該文法完全相容Teradata的擴充語義,即能進行跨表的更新和删除。同時,我們拓展了delta表的資料分布,支援bucket delta表,并對其進行了bucket join等優化。此外,由于Carmel Spark叢集部署是多租戶的,是以同一套代碼會長期運作在YARN的不同隊列中。雖然Delta Lake存儲層提供了良好的事務隔離性,但仍會出現重複操作的風險(非同一事務)。是以,我們使用delta表本身來治理delta表,即将所有delta表的元資訊存儲在一張delta表中,通過對該中繼資料表的增删改查操作,來對使用者使用的所有delta表進行管理。

本文的組織結構如下:第二節介紹相關技術和産品;第三節闡述項目的整體架構和實作;第四節詳細介紹如何利用Delta Lake使SparkSQL支援CRUD操作;第五節介紹delta表的bucket優化;第六節介紹delta表的自治和管理;最後兩節分别談一下未來的工作和對本文的總結。

2.相關工作

2.1 Spark SQL

Apache Spark[1]是一款開源的分布式計算架構,誕生于2009年加州大學伯克利分校AMPLab的一個研究項目,于2013年捐贈給了Apache軟體基金會。在處理結構化資料上,Spark提供了DataFrame API和Spark SQL子產品。DataFrame API允許使用者通過表、行和列的概念對資料進行操作。

同樣,使用者可以使用SQL來操作它們。Spark SQL子產品将SQL查詢轉成一棵查詢計劃樹(query plan tree)。給定一個原始SQL查詢,該查詢首先經詞法分析和解析,轉換為邏輯查詢計劃(logical plan)。該邏輯查詢計劃經過查詢優化器,産生優化的查詢計劃(optimized plan)。最終,優化的查詢計劃被轉換為實體計劃(physical plan),實體計劃會被轉成job和task最終送出到叢集上執行。

Apache Spark 3.0開始,SQL子產品提供了Update/Delete的文法定義,定義在Antlr4的文法檔案裡,但并沒有具體實作,而是交由第三方實作。如圖1所示:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

Teradata

Teradata[2]是Teradata Corp.開發的可橫向擴充的關系型資料庫管理系統,設計用于分析型查詢,主要用于資料倉庫領域,采用大規模并行處理(MPP)架構。Teradata對Update/Delete等文法支援非常完備,除了ANSI SQL: 2011定義的标準Update/Delete文法,Teradata還做了大量擴充,如跨表更新和删除。其所提供的豐富的文法也給我們遷移到Spark帶來了挑戰。圖2所示為Teradata支援的更新和删除文法:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

2.3 Delta Lake

2018年初,Databricks開源了存儲層Delta Lake[3],為Apache Spark 提供可伸縮的 ACID 事務,提供事務管理、統一流批、中繼資料管理、版本回溯等資料庫領域常見功能。Delta Lake将其資料存儲在Parquet檔案中,并提供ACID事務。它使用名為Delta Log的事務日志來跟蹤對表所做的所有更改。

與開源的Delta Lake相比,Databricks内部版本可以通過SQL來進行Update/Delete操作,而目前開源版本隻支援DataFrame的API,隻能通過Parquet[4]檔案推斷表的Schema資訊,對Hive Metastore[5]的支援較弱,且不支援bucket表等等。Apache Iceberg[6]和Apache Hudi[7]雖然實作形式與Delta Lake不同,但在Update/Delete的SQL文法支援上,目前都不完善。表1給出了這三個系統的對比(截止2019年11月)。

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

3.項目概述

有了Delta Lake在存儲層提供ACID事務保障,我們的主要工作就是利用Delta Lake,在我們的Spark版本上實作和Teradata相同的Update/Delete功能。要達到這個目标,有以下任務有待完成:

  1. Delta Lake目前隻支援Apache Spark 2.4+版本,而Carmel團隊使用的Spark版本是基于2.3版本的,是以我們改了Delta Lake的部分實作并為我們的Spark版本打了一些更新檔。
  1. Spark 3.0中雖然沒有Update/Delete文法的具體實作,但仍然在Catalyst[8]中加入了相關的邏輯計劃節點。不過這些新增的接口都是基于DataSourceV2的,我們需要将這部分代碼在DataSourceV1上進行重寫:
實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作
  1. Teradata支援跨表的Update/Delete文法,目前Delta Lake和Spark都不支援,我們需要自己實作帶join的跨表連接配接更新和删除操作。
  1. Delta Lake目前對Catalog[9]的通路還不成熟,delta表的schema是通過Parquet檔案推斷出來的,通過Catalog通路Hive Metastore是使用SQL通路delta表的重要一環。
  2. 由于上述原因,delta表無法識别bucket資訊,更沒有考慮讀寫bucket表時的分布(distribution)。
  3. 在以上3,4,5步驟完成之後,還要對跨表操作進行優化,這裡将主要介紹bucket join的優化。
  4. 開源版本的Delta Lake缺少一定的管理機制,需要實作一些自動化管理功能,如自動清理和合并檔案等。

4.CRUD的實作

4.1 前置工作

首先,要在我們的Spark 2.3内部版本中使用Delta Lake,就需要從社群打一些更新檔。這裡重點說一下SPARK-28303。

SPARK-28303引入了基于DataSource V2的DELETE / UPDATE / MERGE文法。由于Spark 2.3不支援DataSource V2,是以我們需要将此功能移植到V1版本,在ddl.scala中增加了UpdateTableStatement和DeleteFrom Statement。Antlr4[10]的文法結構如下所示:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

4.2 實作單表更新

Delta Lake目前不支援Update/Delete SQL的解析,我們增加了兩個類:DeltaSqlResolution和PreprocessTableUpdateDelete,通過SparkSessionExtensions注入到Analyzer:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

DeltaSqlResolution主要是用于解析condition和assignments表達式:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

再由PreprocessTableUpdateDelete生成RunnableCommand。如果是delta表的話,這裡可以從LogicalRelation中拿出delta表的TahoeFileIndex(在DataSource.scala的resolveRelation中添加的),如果是非delta表,則會抛出AnalysisException。

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

UpdateCommand是Delta Lake自帶的類,我們對其改動不多,主要改了如下幾個地方:

一個是鑒于目前Update操作不會更新表的統計資訊(Statistics),造成delta表在進行join等操作時無法正确判斷是走SortMergeJoin還是BroadcastJoin,我們增加了catalog的通路使delta表的CRUD操作都能更新表的統計資訊。

第二個改動是增加了update/delete的row級别metrics資訊。Delta Lake已經釋出的0.5.0版本update和delete缺少row級别的metrics。社群最新的代碼已經做了添加,但當更新或删除單個partition或全表時仍舊是缺少的,而我們的實作在無論何種情況下都做了收集。

4.3實作跨表更新

目前Spark3.0定義的Update/Delete文法不支援跨表操作,而跨表更新和删除操作卻十分普遍,比如更新目标表中具有(在inner join情況下)或可能沒有(在left outer join情況下)另一個表比對行的行。

許多資料庫都提供跨表更新和删除的文法。下面給出了幾種常用資料庫的跨表更新的例子。

MYSQL[11]跨表更新:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

Teradata的跨表更新:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

PostgreSQL[12]的跨表更新:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

Teradata的文法和PostgreSQL的基本一緻,隻是FROM子句和SET子句順序調換了一下,而MYSQL支援在一條SQL裡同時更新多張表。Carmel Spark目前參考的是Teradata的文法,同時在DeltaSqlResolution中增加了帶join的解析:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

和單表Update一樣,首先對condition和SET子句進行解析。不同的是,除了被更新的target是一個LogicalRelation以外,這裡的source可以是一個LogicalRelation,也可以是多張表連接配接在一起的join plan。

我們從WHERE條件的condition中分離出哪些是target和source之間的join criteria,哪些是source中自身的join criteria(source可以是多表join的plan),以及哪些是分别作用在target或source上的普通Filter。同樣地,再由PreprocessTableUpdateDelete生成Runnable Command:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

上述代碼中,跨表更新和單表更新的差別是多建構了一個DeltaMergeAction。可見跨表更新的實作參考了MergeInto。

UpdateWithJoinCommand是跨表更新的主要執行類,一共分為三步:

  1. 通過将需要被更新的target表和source(可以是一個帶join的plan)進行内連接配接(inner join)找出所有會被更新的行所涉及的檔案,标記為removeFiles。這一步還能簡化後續的步驟,例如不涉及任何檔案或者隻涉及partition目錄時,不用全表執行第2步。
  1. 将target和source使用左外連接配接(left outer join),對于join條件比對的行,使用build side iterator的資料(右表),不比對的行使用stream side iterator的資料(左表)。将資料寫出到target表,寫出的資料檔案标記為addedFiles。
  2. 将1中removeFiles和2中的addedFiles寫入transaction log中,即delta log。

    删除操作和更新操作基本類似,可以視為更新操作的簡化版,這裡就不展開了。

4.4 實作SELECT/INSERT

對delta表的讀操作(SELECT)實際上是對delta表的解析。Delta表是DataSource表的一種。在FindDataSourceTable這條rule中,通過resolveRelation方法對delta表進行特殊處理:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

這裡我們把catalogTable對象傳入到DeltaDataSource的createRelation方法裡。補充一點,之是以這個case可以比對到DeltaDataSource,是因為我們在ConvertToDelta Command裡,通過alterTable,把provider從parquet改成了delta:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

回到createRelation。通過傳入的catalogTable對象,我們在DeltaLog.scala裡将表的資訊填到HadoopFsRelation裡面:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

Delta表的INSERT操作也很簡單。在DataSourceStrategy中添加InsertIntoData SourceCommand:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

普通delta表的insert我們沒有進行修改,這裡就不展開了,下一節講bucket表的insert時再詳細闡述這部分的改動。

4.5 建立Delta表

建立delta表(CREATE操作)目前完全複用了普通Parquet表的CREATE,隻是需要在建完表後執行CONVERT TO DELTA指令。我們簡單做了一些修改,使其可以CONVERT一張空的Parquet表,目前社群版是不支援的。其他的修改主要是針對管理上的,在第六節會詳細介紹。

到此,CRUD功能的SQL實作已經基本完成。在這一節裡,我們引入了跨表更新操作,但是跨表更新涉及到join算子,這在大表之間進行更新操作時會有性能問題。在下一節中會介紹如何針對bucket表進行優化。

5.Bucket優化

跨表更新操作中,會有多次連接配接算子,當進行連接配接操作的表是上TB資料量的大表時,整個更新操作就會變得非常慢。甚至,大量資料的SortMergeJoin可能抛出OutOfMemory。事實上,在我們實際的業務場景中,就存在着大量的大表更新。例如被更新的表往往是一張幾個TB的大表,然後和另一張或幾張中型表進行連接配接操作。為了優化這類SQL,最容易想到的方法是通過bucket join來避免大表資料的shuffle。現實中,我們使用者的許多大表也的确做了分桶(bucket)。

然而目前delta表并不支援分桶表,相關代碼的BucketSpec都被預設填了None,對更新和删除的操作也沒有考慮資料的分布(Distribution)。那麼該如何實作bucket表的資料分布呢?

5.1 建立delta bucket表和讀取

首先和Parquet表一樣,我們需要在建表時指定分桶字段。形如:CLUSTERED BY (col) [SORTED BY (col) ] INTO number BUCKETS。

在4.3小節中我們提到了在ResolveRelation時将CatalogTable對象傳入了HadoopFsRelation。有了這個CatalogTable對象,就可以幫我們在後續的各類操作中識别bucket表了。

5.2 插入資料到delta bucket表

上一步隻是告訴Spark,這是一張bucket表,真正寫入資料的時候發現資料并沒有分桶分布。這是因為Insert操作在delta表上是走InsertIntoDataSource -> InsertIntoDataSourceCommand的,而不是通過DataWritingCommand,是以也就走不到ensureDistributionAndOrdering的邏輯。以下代碼是社群版InsertIntoDataSourceCommand的實作:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

如上代碼所示,它的實作非常簡單,将需要insert的邏輯計劃“query”封裝成一個data frame,然後傳入到實作類的insert方法裡。在Delta Lake中這個data frame會被傳入到TransactionalWrite的writeFiles方法中。最終從這個data frame中取出physical plan并傳入DataFormatWriter的write方法。之後就是真正的生成job并分發執行了。

從整個流程可以看出,從一開始的邏輯計劃對象“query”到最後的實體計劃,并沒有機會進行資料分布的實作。是以不管在建表時是否指定分桶,插入資料時都不會滿足資料分布。

鑒于目前DataSource并沒有考慮資料分布的問題,我們在resolution階段就需要進行處理。大體就是在Catalyst裡增加一個InsertIntoDataSource的邏輯計劃節點和一個InsertIntoDataSourceExec的實體計劃節點。在InsertIntoDataSourceExec這個實體計劃中實作了requiredChildDistribution和requiredChildOrdering方法(代碼可以參考InsertIntoHadoopFsRelationCommand的requiredDistribution和requiredOrdering方法)。

這裡說一下整體流程。首先,DataSourceStrategy原本是比對到了InsertIntoTable就會将邏輯計劃“query”原封不動地傳入InsertIntoDataSource Command。我們現在做出如下改變:增加一個新的邏輯計劃節點InsertIntoDataSource,為其添加partition,bucket等資訊,并将“query”作為該新節點的child:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

然後在SparkStrategy.scala的BasicOperators裡将InsertIntoDataSource節點轉成實體計劃節點InsertIntoDataSourceExec,通過planLater(i.query)得到實體計劃作為該實體節點的child。這樣InsertIntoDataSourceExec的requiredChildDistribution和requiredChildOrdering方法就可以對資料進行分布了:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

5.3 在跨表更新或删除操作中利用bucket join

到目前為止,對delta表的改造已經使其具有了bucketSpec字段和資料分布的特性。在跨表更新或删除時,無論是inner join還是left outer join,隻要target和source都是bucket表且滿足bucket join條件,就能走bucket join而不是SortMergeJoin。這就解決了大表之間join産生大量shuffle帶來的性能問題。

下面這個例子是跨表更新一張3.9TB的表,source則是一張5.2TB的表。圖3所示是left outer join階段,右表雖然有一個Filter,但是仍然不滿足broadcast join門檻值,這個更新操作在非bucket join的情況下,會造成大量Executor OOM,最終導緻job失敗。通過引入bucket join,該job在2分鐘左右就能順利完成。從圖3可以看到在SortMergeJoin的前後,已經沒有ShuffleExchange了。

圖3 跨表更新中利用bucket join避免shuffle

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

但是,這裡仍然可能存在問題,因為被更新的表仍然是一張bucket表,而圖3的輸出沒有考慮資料的分布。對于bucket表尚不滿足資料分布的情況,我們需要在SortMergeJoin之後增加一輪HashRepartition,以保證最終的結果輸出符合被更新表的資料分布特性:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

6.Delta的自治和管理

介紹完CRUD的功能和相關優化,這一節講一下我們是如何管理delta表的,主要包括:如何統計delta的使用情況,如何自動進行檔案清理,如何管理TimeTraval[13]等。

在這之前我們需要簡單介紹一下eBay Carmel Spark的基本架構。eBay的Carmel Spark平台是計算存儲分離的。資料存儲有一個專門的Hadoop叢集(Apollo),Carmel Spark叢集(Hermes)主要是由大記憶體加SSD的計算節點組成,通過YARN[14]進行排程。除了本地SSD以外,也有一部分存儲容量搭建了一個小容量的HDFS,主要是拿來做Relation Cache和物化視圖,這部分以後有機會另起一篇文章進行介紹。

我們使用Spark Thriftserver來提供JDBC和ODBC服務,但所有的Thriftserver并不是固定在某個機器上的,而是通過YARN進行排程,通過cluster mode将Spark Thriftserver送出到叢集内部。同時,根據Budget Group對YARN叢集分queue,不同的Budget Group有一個YARN的queue,例如廣告部門有一個queue,資料部門有一個queue,每個queue可以有多個Spark Thriftserver。Carmel Spark對scheduler子產品做過大量并發優化,經過壓測,一個Driver排程起來的任務能把200台實體機的所有CPU壓滿。是以Driver排程并不是瓶頸,目前最大的一個queue僅使用一個Thriftserver就可以排程近7000個executors。

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

目前有多少個queue,就有多少個Thriftserver,也就有多少個Application。但不同的Thriftserver仍然共享了一些元件,例如HDFS,Hive Metastore等。這就要求我們對所有的queue做一些管理。例如在物化視圖功能中,當對一張基礎表建構物化視圖後,所有的queue都需要在記憶體裡建構一些邏輯計劃樹。delta表的管理也類似,不過相比物化視圖簡單的多。例如我們要對所有的delta表進行自動化的檔案清理工作,一種方式是起一個背景線程周遊Hive Metastore的所有表,對provider是delta的表進行處理。這樣的好處是不需要跨Thriftserver進行任何消息的同步,壞處自然是不斷周遊Hive Metastore帶來的壓力(多叢集公用的Hive Metastore壓力已經比較大了)。是以我們使用了一種更加直覺的方式進行管理,即用delta表來管理delta表。

我們建立了一張名為carmel_system.carmel_ delta_meta的表,記錄了如表名、owner、deltalog路徑、是否自動清理、清理周期等元資訊,并将其CONVERT成一張delta表。是以carmel_delta_meta表的第一條記錄就是自己的資訊。然後我們提供了一套操作這張表的API,以調用靜态方法的方式放在DeltaTableMetadata類的半生對象中:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

如下所示,當使用者對一張表執行CONVERT TO DELTA指令時,會生成一個事件,通過DeltaTableListener捕獲後将該delta表的元資訊寫入carmel_delta_meta,當使用者删除delta表時,DropTableEvent同樣可以觸發上圖的删除操作API,從carmel_delta_meta删除這條記錄:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

另外在YARN的保留隊列(reserved queue隻允許管理者權限連接配接)裡啟動一個DeltaValidate線程,通過讀取carmel_delta_meta中的資料進行驗證,觸發如删除記錄等操作。同時,如果使用者在CONVERT TO DELTA時指定了Vacuum保留時間:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

或是一開始沒有指定保留時間,後續通過指令VACUUM AUTO RUN進行修改:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

DeltaValidate線程會自動生成Vacuum任務,并丢到Vacuum線程池排程執行。這裡就不貼代碼了。整個架構如圖5所示:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

此外,我們還增加了TimeTravel的SQL語義,使用者可以通過在SELECT指令裡增加AT關鍵字,單次讀取delta表某個version的快照。也可以通過ROLLBACK指令永久回到某個版本:

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

通過carmel_delta_meta中記錄的一些表的血緣資訊,可以實作delta表的及聯復原。在某個delta表rollback後,觸發器根據carmel_delta_meta的血緣資訊,自動復原其他相關表(這需要事先定義在carmel_delta_meta的rollback依賴樹和觸發器條件,該功能目前還未上線)。

上面介紹了通過delta表來管理delta表的方式,這一方法能很好地幫我們解耦隊列同步和外部系統依賴的問題,既友善靈活,又快捷安全。

7.未來的工作

7.1 持續的性能優化

Carmel Spark項目經過兩年的技術疊代,已經具備非常多的功能和優化,例如Range Partation、Optimized Bucket Join、Broadcast/Local Cache、Extended Adaptive Execution、Parquet File Index、Materialized View、ACL、Volcano CBO、Adaptive Runtime Filter、Mutiple Files Scan等,如何讓新的功能如CRUD複用以上優化和特性,也變得越來越富有挑戰了。例如我在測試時發現Broadcast Cache和Mutiple Files Scan兩個功能在和CRUD功能內建時存在bug,又或者目前的Volcano CBO和Parquet File Index還不能應用在delta表上等。

此外,在跨表更新操作上,大表連接配接的優化目前隻針對bucket表,但是當兩張非bucket表進行連接配接時,性能仍然不夠好。這裡就有多個優化點,比如Adaptive Runtime Filter,就是Join Pushdown,可以将join表的min/max或者join key的bloomfilter推到兩邊進行過濾,以減少參與連接配接的記錄數,目前隻完成了在inner join下的部分功能。

7.2 更完備的語義

除了性能的優化,Carmel Spark作為Teradata戰略代替品,需要盡可能相容Teradata的語義,後續如果有使用者需要MERGE INTO或者UPSERT操作,這部分還要繼續擴充。此外,目前UPDATE和DELETE的WHERE條件還不支援子查詢,CONVERT TO DELTA不支援Parquet Format的Hive表,這些都将是後續的工作。

7.3 高度自治的管理

第6節最後提到過的及聯復原功能,以及對delta表的審計和監控都屬于平台管理的範疇。這些有的已經具備成熟的解決方案,如我們已經有完全和Teradata對标的列級通路權限控制和審計功能。有的還在不斷完善,如用于File Index和Materialized View的Hive Metastore同步機制還沒有上線,目前用的還是過渡方案。這部分不止針對delta表,有些還可以應用于整個Carmel Spark。

8.實施和總結

8.1 技術之外

最後簡單說一下項目的情況。這個項目找到我的時候是在2019年的10月底,我剛上線完Spark臨時表功能,物化視圖項目也還陸陸續續有一些bug fix的工作要做,是以真正開始投入去做應該是在11月中旬。

CRUD功能目标上線時間是在2020年的2月份,不像物化視圖這類優化型項目,功能型項目承諾上線時間的要求往往更高一些。加之期間還有春節假期,oncall和各種bug fix的工作,對于該項目來說排期還是比較緊的。

此外,我們對Delta Lake的成熟度和性能也比較擔憂(現實也驗證了Delta Lake的開源版本在SQL成熟度上的确不足)。實踐中發現除了ACID這個核心功能不用操心以外,基本上都要二次開發。最後和我們使用的基于社群2.3版本進行魔改的Carmel Spark的內建相比,也存在許多挑戰。

再說一下為什麼選擇Delta Lake。目前來看,除了Delta Lake之外,Apache Hudi和Apache Iceberg也能完成ACID的功能。當時選擇Delta Lake一是因為它是Databricks的産品,在Databricks内部版本比較成熟,長期來看其開源版本也會和Apache Spark更加緊密。二是當時公司内部還有一個準實時數倉的項目,立項也是使用Delta Lake。考慮到盡可能保持技術棧一緻,我們選擇了Delta Lake,而且單從這個項目上Apache Hudi和Apache Iceberg并沒有特别的優勢。

最後說一下使用者支援,其實做一個項目最複雜也是最耗時的并不是編碼階段,而是上線後接受使用者的考驗。該功能的第一批使用者是來自eBay瑞士的财務部門分析師團隊,因為不在同一個時區,春節假期裡幾乎每晚都會通過Zoom和我溝通。這種在使用者和開發者之間的持續交流,使得一些隐藏的問題即時浮現出來,使用者也得到了較好的使用體驗。我們的Carmel Spark每周都會有半個小時的例行釋出視窗,使用者遇到的bug幾乎都在下次釋出視窗時得到了修複。在這一周中,我們也會找出workaround方式,幫助使用者進度的推進。目前該功能已經在所有隊列上啟用,越來越多的使用者開始參與試用。

8.2 總結

本文從源碼角度講解如何利用Delta Lake使老版本的Spark SQL支援跨表的CRUD操作,以及我們所做的優化和管理工作。最後,簡單介紹了未來的工作方向以及項目實施上的一些感悟,希望能對閱讀者有所幫助。

參考文獻

[1]

https://spark.apache.org/

[2]

https://www.teradata.com/

[3]

https://delta.io/

[4]

https://parquet.apache.org/

[5]

https://hive.apache.org/

[6]

https://iceberg.apache.org/

[7]

https://hudi.apache.org/

[8]

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

[9]

http://blog.madhukaraphatak.com/introduction-to-spark-two-part-4/

[10]

https://www.antlr.org/

[11]

https://www.mysql.com/

[12]

https://www.postgresql.org/

[13]

https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html

[14]

https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。Apache

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作

Spark技術交流社群公衆号,微信掃一掃關注

實戰 | 利用Delta Lake使Spark SQL支援跨表CRUD操作