天天看點

[譯]介紹Spark2.4的用于處理複雜資料類型的新内置函數與高階函數[譯]介紹Spark2.4的用于處理複雜資料類型的新内置函數與高階函數

[譯]介紹Spark2.4的用于處理複雜資料類型的新内置函數與高階函數

本文翻譯自databricks的介紹spark2.4新特性的blog,英文原文參考

原文連結

Apache Spark2.4總共支援了29個用于處理複雜資料類型(比如,數組類型等)的新内置函數和高階函數。

在Spark2.4版本之前,有兩種典型的方式處理複雜資料類型:

  1. 将嵌套結構的資料轉化為多行資料,然後使用函數處理,最後再組裝成嵌套結構。
  2. 自己建構一個UDF。

和之前不同,新的内置函數可以直接操作複雜資料類型,同時新的高階函數可以使用匿名的lambda函數處理複雜資料類型,和UDF類似,但是性能大大提高。

在本篇文章,我們将通過一些示例展示部分内置函數以及它們的具體用法。

典型的處理方式

我們首先來看一下Spark2.4以前典型的處理方式。

選擇1 - Explode and Collect

我們使用explode函數将數組資料拆解為多行資料然後計算

val + 1

,最後用collect_list再将多行資料重新組織成數組。

SELECT id,
       collect_list(val + 1) AS vals
FROM   (SELECT id,
               explode(vals) AS val
        FROM input_tbl) x
GROUP BY id           

這種方式容易出錯,同時效率也比較低,這主要展現在三個方面。首先我們很難去確定最後重組的數組所用的行資料确定來自原始的數組,需要通過對unique key分組來保證。其次,我們需要用到

group by

,這就意味之需要一次shuffle操作,但是shuffle操作并不保證重組後的數組和原始數組中資料的順序一緻。最後,這樣的處理方式非常昂貴。

選擇2 - User Defined Function

接下來,我們使用Scala UDF處理Seq[Int],通路序列中每個元素并加1.

def addOne(values: Seq[Int]): Seq[Int] = {
  values.map(value => value + 1)
}
val plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])           

或者,我們也可以使用Python UDF。

SELECT id, plusOneInt(vals) as vals FROM input_tbl           

這樣更簡單更快,同時也避免了很多可能導緻錯誤的陷阱,但是由于還是需要将資料反序列化成Scala或者Python對象,仍然效率不高。

【譯者注】基于Tungsten引擎,Spark SQL處理的中間資料是以binary的方式直接存儲的,使用UDF需要将binary資料反序列化成Scala/Python數組,處理完成後,還需要序列化成binary資料。

你可以參考并嘗試我們之前釋出的一篇文章中

notebook示例

新内置函數

下面我們看看新内置函數是如何處理複雜資料類型的。這個

notebook

列舉每個函數的示例。每個函數的名稱和參數标注了它們對應處理的資料類型,T或U表示數組,K,V表示映射(MAP)類型。

高階函數

【譯者注】高階函數是來自函數式語言的一個概念,主要是指一個函數支援使用其他函數作為參數或者傳回類型為函數。具體定義可以參考

Higher-Order Function

為了更進一步處理數組和映射類型的資料,我們使用了匿名lambda函數或高階函數這兩種SQL中支援的文法,使用lambda函數作為入參。

lambda函數的文法規範如下:

argument -> function body
  (argument1, argument2, ...) -> function body           

箭頭->左邊表示參數清單,箭頭右邊定義函數體,在函數體中使用參數和其他成員變量計算結果值。

使用匿名Lambda函數

我們首先嘗試使用匿名lambda函數的

transform

函數。

假設有一個表,包含三列資料:integer類型的key,integer數組類型的values,二維Integer數組類型的nested_values。

key values nested_values
1 [1,2,3] [[1,2,3],[],[4,5]]

當我們執行如下SQL的時候:

SELECT TRANSFORM(values, element -> element + 1) FROM data;           

transform

函數疊代通路values數組中的每個元素,并執行lambda函數,給每個元素加1,然後建構一個新數組。

我們可以使用除了參數以外的其他變量,比如:key,key是表中的另外一列,在lambda函數上下文之外,但是我們仍然可以使用它,比如:

SELECT TRANSFORM(values, element -> element + key) FROM data;           

如果你想要處理更複雜的嵌套類型,比如nested_values列的資料,你甚至可以使用嵌套的lambda函數:

SELECT TRANSFORM(
    nested_values,
    arr -> TRANSFORM(arr,
      element -> element + key + SIZE(arr)))
  FROM data;           

在裡層的lambda函數中,你同樣也可以使用key和arr這些在lambda函數上下文之外的變量以及表的其他字段值。

需要注意的是,在上面的notebook中,同時展示了基于之前典型的處理方式和新的高階函數的處理方式兩種示例代碼,

結論

Spark2.4支援了24個新的内置函數,比如array_union, array_max/min等,以及5個高階函數,比如transform, filter等,都是用于處理複雜資料類型。如果你需要處理複雜資料類型,建議使用這些函數。感謝Apache Spark的contributor貢獻了這些功能:Alex Vayda, Bruce Robbins, Dylan Guedes, Florent Pepin, H Lu, Huaxin Gao, Kazuaki Ishizaki, Marco Gaido, Marek Novotny, Neha Patil, Sandeep Singh以及其他人。

歡迎加群指正交流

[譯]介紹Spark2.4的用于處理複雜資料類型的新内置函數與高階函數[譯]介紹Spark2.4的用于處理複雜資料類型的新内置函數與高階函數