天天看點

深入分析Spark UDF的性能

這篇部落格會闡述一份關于Apache Spark的在Scala UDF、 PySpark UDF 和PySpark Pandas UDF之間的性能評測報告。

Spark提供了多種解決方案來應對複雜挑戰, 但是我們面臨了很多場景, 原生的函數不足以解決問題。是以,Spark允許我們注冊自定義函數(User-Defined Functions, 或者叫 UDFs)

在這篇文章, 我們會探索Spark的UDF的性能特點。

Spark支援多種語言,比如Python, Scala, Java, R, SQL. 但是通常資料操作都是用PySpark或者Spark Scala寫的。我們認為Pyspark被大多數使用者采用, 是因為以下原因:

更快的學習曲線 -- Python比Scala更簡單。

更廣的社群支援 -- 程式員對Pyspark性能等建議,回報到社群,形成更好的生态。

豐富的可用的類庫 -- Python有很多機器學習、時序分析、數理統計的類庫。

很小的性能差異 -- Spark DataFrames引入之後,意味着Scala和Python的性能幾乎相同。Datafarme現在是按照帶名字的列(named columns)來組織的, 這樣Spark可以更好地了解Schema。而那些用來建構dataframe的操作,會被Catalyst Optimizer編譯成實體執行計劃(physical execution plan)來加速計算。

資料工程師和資料科學家,交接代碼也更簡單。有一些dataframe的操作需要UDFs, PySpark可能會有性能問題。有一些解決辦法,就是将PySpark和Scala UDF, UDF Wrapper一起使用。

PySpark作業送出的時候, driver端跑在Python上, driver會建立一個SparkSession對象以及Dataframes/RDDs. 這些Python對象是一些wrapper對象, 本質是JVM(Java)對象。為了簡化,PySpark提供了一個wrapper來跑原生Scala代碼。

Spark UDF 函數

通過Scala, Python 或者 Java 注冊自定義函數,是非常通用的方法, 來擴充SQL使用者的能力, 是的使用者可以調用這些函數而不需要再寫代碼。

例如, 将一個100w行的集合乘以1000:

def times1000(field):

return field * 1000.00

或者, 對經緯度資料集進行反向地理編碼(reverse geocode):

import geohash

def geohash_pyspark(lat, lon):

return geohash.encode(lat, lon)

Spark SQL提供了一種方法, 你可以用自己的程式設計語言來傳入1個函數,進而注冊UDF。Scala和Python可以用原生的函數或者lamdba文法,除了Java繁瑣一些,需要擴充這個UDF類。

UDF可以作用于多種不同的資料類型,并傳回一種不同的類型。在Python和Java裡,我們需要指定發傳回類型。

UDF可以通過以下方式進行注冊:

spark.udf.register("UDF_Name", function_name, returnType())

*returnType() 在Python和Java裡是強制的。

多種Spark UDF和執行方式

在分布式模式下,Spark使用master/worker架構來執行。排程器(driver)來跟大量的workers(或者叫executors)進行通信。driver和worker跑在自己的Java程序裡。

driver端通過main()方法,建立了SparkContext, RDDs并執行一些變換操作。Executors負責跑一個個的任務。

Screen Shot 2019-12-14 at 20.40.53.png

1576224390592-89e2031e-a0f6-4acd-9b4b-58d5d667f264.png

性能基準測試

我們建立了一個随機的經緯度資料集, 包含100w條記錄, 共1.2GB,來測試3種Spark UDF類型的性能。我們建立了2個UDF:一個簡單的乘以1000的函數, 一個複雜的geohash函數。(是以總共有2 * 3 = 6組測試)

叢集配置:8個節點

Driver節點:16核 122GB記憶體

Worker節點:4核 30.5GB記憶體,開啟自動擴容

Notebook代碼:https://bit.ly/2YxiVp4 使用 QuantumBlack’s的方法來跑 Scala UDF, PySpark UDF and PySpark Pandas UDF 的測試。

除了上面3種類型的UDF,我們還建立了Python wrapper, 進而在Pyspark中調用Scala UDF。我們發現這種方式, 既可以使用簡單的python程式設計,又能兼顧Scala UDF的性能。

用Pyspark代碼來建立一個Python wrapper:

from pyspark.sql.column import Column

from pyspark.sql.column import _to_java_column

from pyspark.sql.column import _to_seq

from pyspark.sql.functions import col

def udfGeohashScalaWrapper(lat, lon):

_geohash = sc._jvm.sparkudfperformance.UDFs.udfGeohash()

return Column(_geohash.apply(_to_seq(sc, [lat, lon], _to_java_column)))

def udfTimes1000ScalaWrapper(field):

_times1000 = sc._jvm.sparkudfperformance.UDFs.udfTimes1000()

return Column(_times1000.apply(_to_seq(sc, [field], _to_java_column)))

Databricks對 Pandas UDF 做過一份性能報告 https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

重要結論

下面是測試結果

1576224428174-0d814c3d-160e-4ffa-8551-b4b5705c149b.png

測試結果中, Scala UDF的性能是最好的。前面提到, Scala和Python之間的轉換步驟, 使得Python UDF需要處理更多東西。

我們同時發現,PySpark Pandas UDF在小資料集或者簡單函數上,性能好于PySpark UDF。而如果是一個複雜的函數,比如引入了geohash,這種場景下 PySpark UDF的性能會比PySpark Pandas UDF好10倍。

我們還發現了,在PySpark代碼裡, 建立一個Python wrapper去調用Scala UDF,性能比這兩種PySpark UDFs好15倍。

綜合考慮了上面的一些性能特征, QuantumBlack公司現在采用的方式是: