天天看點

Apache Spark 2.4 中解決複雜資料類型的内置函數和高階函數介紹

Apache Spark 2.4 是在11月08日正式釋出的,其帶來了很多新的特性具體可以參見這裡,本文主要介紹這次為複雜資料類型新引入的内置函數和高階函數。本次 Spark 釋出共引入了29個新的内置函數來處理複雜類型(例如,數組類型),包括高階函數。

在 Spark 2.4 之前,為了直接操作複雜類型,有兩種典型的解決方案:

  • 将嵌套結構展開為多行,并應用某些函數,然後再次建立結構;
  • 編寫使用者自定義函數(UDF)。

新的内置函數可以直接操作複雜類型,高階函數可以使用匿名 lambda 函數直接操作複雜值,類似于UDF,但具有更好的性能。

在本部落格中,通過一些示例,我們将展示一些新的内置函數以及如何使用它們來處理複雜的資料類型。

典型處理方式

讓我們首先通過以下示例來回顧一下 Spark 2.4 之前的典型解決方案。

Option 1 – Explode and Collect

我們使用 

explode

 函數将數組拆分成多行,并計算 

val + 1

,最後再使用 

collect_list

 重構數組,如下所示:

SELECT

id,

collect_list(val + 1)

AS

vals

FROM

(

SELECT

id,

explode(vals)

AS

val

FROM

iteblog) x

GROUP

BY

id

這種方式容易出錯并且效率低下,主要展現為三個方面。 首先,我們必須努力確定通過使用唯一鍵(unique key)來進行分組以便将新生成的數組完全組成為原始數組。其次,我們需要進行 group by 操作 ,這意味着需要進行一次 shuffle 操作; 但是 shuffle 操作并不保證重組後的數組和原始數組中資料的順序一緻;最後,使用這種方式非常低效。

Option 2 – User Defined Function

接下來,我們選擇使用 Scala UDF,它可以接收 

Seq[Int]

 并對其中每個元素進行加 1 操作:

def

addOne(values

:

Seq[Int])

:

Seq[Int]

=

{

values.map(value

=

> value +

1

)

}

val

plusOneInt

=

spark.udf.register(

"plusOneInt"

, addOne(

_:

Seq[Int])

:

Seq[Int])

或者,我們也可以使用 Python UDF,如下:

from

pyspark.sql.types

import

IntegerType

from

pyspark.sql.types

import

ArrayType

def

add_one_to_els(elements):

return

[el

+

1

for

el

in

elements]

spark.udf.register(

"plusOneIntPython"

, add_one_to_els, ArrayType(IntegerType()))

然後我們可以在 SQL 裡面如下使用:

SELECT

id, plusOneInt(vals)

as

vals

FROM

iteblog

這種方式更加簡單快速,并且可以避免很多陷阱。但這種方式可能仍然效率低下,因為 Scala 或 Python 中的資料序列化可能很昂貴。

新的内置函數

下面我們來看看直接操作複雜類型的新内置函數。 《Apache Spark 2.4 新增内置函數和高階函數使用介紹》 列舉了每個函數的示例。 每個函數的名稱和參數标注了它們處理資料類型,T 或 U 表示數組;K,V 表示 map 類型。

高階函數(Higher-Order Functions)

為了進一步處理數組和 map 類型,我們使用了 SQL 中支援的匿名 lambda 函數或高階函數文法,使用 lambda 函數作為參數。lambda 函數的文法如下:

argument ->

function

body

(argument1, argument2, ...) ->

function

body

符号 -> 左邊表示參數清單,符号右邊定義函數體,在函數體中可以使用參數和其他變量來計算新的值。

使用匿名 Lambda 函數進行轉換

首先我們來看一下使用帶有匿名 lambda 函數的 transform 函數的例子。假設有一個表,包含三列資料:integer 類型的 key,integer 數組的 values,Integer 類型的二維數組 nested_values。如下:

key values nested_values
1 [1, 2, 3] [[1, 2, 3], [], [4, 5]]

當我們執行下面 SQL 的時候:

SELECT

TRANSFORM(

values

, element -> element + 1)

FROM

iteblog;

transform

 函數通過執行lambda 函數周遊數組種的每個元素并進行加一操作,然後建立一個新數組。

除了參數之外,我們還可以在 lambda 函數中使用其他變量,例如:key,這是表的另外一列:

SELECT

TRANSFORM(

values

, element -> element +

key

)

FROM

iteblog;

如果你想要處理更複雜的嵌套類型,比如表中的 nested_values 列,你可以使用嵌套的 lambda 函數:

SELECT

TRANSFORM(

nested_values,

arr -> TRANSFORM(arr,

element -> element +

key

+

SIZE

(arr)))

FROM

iteblog;

在内層的 lambda 函數中你可以使用 key 和 arr 這些在 lambda 函數上下文之外的變量以及表的其他字段值。

總結

Spark 2.4 引入了 24 個新的内置函數,如 

array_union

array_max

array_min

等,以及 5 個高階函數,如 

transform

filter

 等,這些函數都可以用于處理複雜類型。完整的清單可以參見《Apache Spark 2.4 新增内置函數和高階函數使用介紹》。

繼續閱讀