天天看點

Spark UDF變長參數的二三事兒

<a href="http://s3.51cto.com/wyfs02/M00/9B/61/wKioL1lizuXBDcrMAAGgWFP06jo273.jpg-wh_651x-s_632358623.jpg" target="_blank"></a>

在複雜業務邏輯中,我們經常會用到Spark的UDF,當一個UDF需要傳入多列的内容并進行處理時,UDF的傳參該怎麼做呢? 下面通過變長參數引出,逐一介紹三種可行方法以及一些不可行的嘗試...

引子

變長參數對于我們來說并不陌生,在Java裡我們這麼寫

public void varArgs(String... args) 

在Scala裡我們這麼寫

def varArgs(cols: String*): String 

而在Spark裡,很多時候我們有自己的業務邏輯,現成的functions滿足不了我們的需求,而當我們需要處理同一行的多個列,将其經過我們自己的邏輯合并為一個列時,變長參數及其變種實作可以給我們提供幫助。

但是在Spark UDF裡我們是 無法使用變長參數傳值 的,但之是以本文以變長參數開頭,是因為需求起于它,而通過對它進行變換,我們可以使用變長參數或Seq類型來接收參數。

下面通過Spark-Shell來做示範,以下三種方法都可以做到多列傳參,分别是

變長參數(接受array類型)

Seq類型參數(接受array類型)

Row類型參數(接受struct類型)

變長參數類型的UDF

定義UDF方法

def myConcatVarargs(sep: String, cols: String*): String = cols.filter(_ != null).mkString(sep) 

注冊UDF函數

由于變長參數隻能通過方法定義,是以這裡使用部分應用函數來轉換

val myConcatVarargsUDF = udf(myConcatVarargs _) 

可以看到該UDF的定義如下

UserDefinedFunction(&lt;function2&gt;,StringType,List(StringType, ArrayType(StringType,true))) 

也即變長參數轉換為了ArrayType,而且函數是隻包括兩個參數,是以變長參數清單由此也可看出無法使用的。

變長參數清單傳值

我們構造一個DataFrame如下

val df = sc.parallelize(Array(("aa", "bb", "cc"),("dd","ee","ff"))).toDF("A", "B", "C") 

然後直接傳入多個String類型的列到myConcatVarargsUDF

df.select(myConcatVarargsUDF(lit("-"), col("A"), col("B"), col("C"))).show 

結果出現如下報錯

java.lang.ClassCastException: anonfun$1 cannot be cast to scala.Function4 

由此可以看出,使用變長參數清單的方式Spark是不支援的,它會被識别為四個參數的函數,而UDF确是被定義為兩個參數而不是四個參數的函數!

變換:使用array()轉換做第二個參數

我們使用Spark提供的array() function來轉換參數為Array類型

df.select(myConcatVarargsUDF(lit("-"), array(col("A"), col("B"), col("C")))).show 

結果如下

+-------------------+ 

|UDF(-,array(A,B,C))| 

|           aa-bb-cc| 

|           dd-ee-ff| 

由此可以看出,使用變長參數構造的UDF方法,可以通過構造Array的方式傳參,來達到多列合并的目的。

使用Seq類型參數的UDF

上面提到,變長參數最後被轉為ArrayType,那不禁要想我們為嘛不使用Array或List類型呢?

實際上在UDF裡,類型并不是我們可以随意定義的,比如使用List和Array就是不行的,我們自己定義的類型也是不行的,因為這涉及到資料的序列化和反序列化。

以Array/List為示例的錯誤

下面以Array類型為示例

定義函數

val myConcatArray = (cols: Array[String], sep: String) =&gt; cols.filter(_ != null).mkString(sep) 

注冊UDF

val myConcatArrayUDF = udf(myConcatArray) 

可以看到給出的UDF簽名是

UserDefinedFunction(&lt;function2&gt;,StringType,List()) 

應用UDF

df.select(myConcatArrayUDF(array(col("A"), col("B"), col("C")), lit("-"))).show 

會發現報錯

scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String 

同樣List作為參數類型也會報錯,因為反序列化的時候無法建構對象,是以List和Array是無法直接作為UDF的參數類型的

以Seq做參數類型

定義調用如下

val myConcatSeq = (cols: Seq[Any], sep: String) =&gt; cols.filter(_ != null).mkString(sep)  

val myConcatSeqUDF = udf(myConcatSeq)  

df.select(myConcatSeqUDF(array(col("A"), col("B"), col("C")), lit("-"))).show 

|UDF(array(A,B,C),-)| 

使用Row類型參數的UDF

我們可以使用Spark functions裡struct方法構造結構體類型傳參,然後用Row類型接UDF的參數,以達到多列傳值的目的。

def myConcatRow: ((Row, String) =&gt; String) = (row, sep) =&gt; row.toSeq.filter(_ != null).mkString(sep)  

val myConcatRowUDF = udf(myConcatRow)  

df.select(myConcatRowUDF(struct(col("A"), col("B"), col("C")), lit("-"))).show 

可以看到UDF的簽名如下

+--------------------+ 

|UDF(struct(A,B,C),-)| 

|            aa-bb-cc| 

|            dd-ee-ff| 

使用Row類型還可以使用模式提取,用起來會更友善

row match { 

  case Row(aa:String, bb:Int) =&gt; 

最後

對于上面三種方法,變長參數和Seq類型參數都需要array的函數包裝為ArrayType,而使用Row類型的話,則需要struct函數建構結構體類型,其實都是為了資料的序列化和反序列化。三種方法中,Row的方式更靈活可靠,而且支援不同類型并且可以明确使用模式提取,用起來相當友善。

而由此我們也可以看出,UDF不支援List和Array類型的參數,同時 自定義參數類型 如果沒有混合Spark的特質實作序列化和反序列化,那麼在UDF裡也是 無法用作參數類型 的。當然,Seq類型是可以 的,可以接多列的數組傳值。

此外,我們也可以使用柯裡化來達到多列傳參的目的,隻是不同參數個數需要定義不同的UDF了。  

本文作者:佚名

來源:51CTO