Spark The Definitive Guide(Spark權威指南) 中文版。本書詳細介紹了Spark2.x版本的各個子產品,目前市面上最好的Spark2.x學習書籍!!!
掃碼關注公衆号:登峰大資料,閱讀中文Spark權威指南(完整版),系統學習Spark大資料架構!

6.1. 在哪裡檢視API
在我們開始之前,有必要解釋一下您作為使用者應該在哪裡查找DataFrame API。Spark是一個不斷增長的項目,任何書籍(包括這一本書)都是某個時間的快照。在本書中,我們的首要任務之一是教授在撰寫本文時,您應該在哪裡尋找轉換資料的函數。以下是關鍵的地方:
DataFrame(Dataset)的方法
這實際上有點小技巧,因為DataFrame隻是Row類型的Dataset,你最終會看到Dataset方法,在這個連結中可以找到:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
Dataset子子產品如DataFrameStatFunctions(包含各種統計相關的功能) 和 DataFrameNaFunctions(處理空資料時相關的函數)有更多的方法來解決特定的問題集。
Column方法
這些在第5章的大部分内容中已經介紹過。它們包含各種與列相關的通用方法,如alias或contains。您可以在這裡找到列方法的API引用。
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column
org.apache.spark.sql.functions包含一系列不同資料類型的函數。通常,您會看到整個包被導入,因為它們被頻繁地使用。您可以在這裡找到SQL和DataFrame函數。
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
如此繁多的函數,可能會讓人覺得有點難以承受,但不用擔心,這些函數中的大多數都是SQL和分析系統中可以找到的。所有這些工具的存在都是為了實作一個目的,即将資料行以一種格式或結構轉換為另一種格式。這可能會建立更多的行或減少可用的行數。首先,讓我們建立用于分析的DataFrame:
// in Scala val df = spark.read.format("csv") .option("header", "true") .option("inferSchema", "true") .load("/data/retail-data/by-day/2010-12-01.csv") df.printSchema() df.createOrReplaceTempView("dfTable")
下面是schema的結果和一個資料樣本:
6.2. 轉換為Spark類型
在本章中,您将看到我們所做的一件事是将本地原生類型轉換為Spark類型。我們用我們在這裡介紹的第一個函數來做這個,lit()函數。此函數将另一種語言中的類型轉換為其相應的Spark類型表示。以下是我們如何将幾個不同類型的Scala和Python值轉換為各自的Spark類型:
// in Scala import org.apache.spark.sql.functions.lit df.select(lit(5), lit("five"), lit(5.0)) # in Python from pyspark.sql.functions import lit df.select(lit(5), lit("five"), lit(5.0))
SQL中沒有等價的函數,是以我們可以直接使用這些值:
-- in SQL SELECT 5, "five", 5.0
6.3. 使用Boolean類型
當涉及到資料分析時,布爾值非常重要,因為它們是所有過濾的基礎。布爾語句由四個元素組成:and,or, true和false。我們使用這些簡單的結構來建構邏輯語句,以判斷真假。當一行資料必須通過測試(evaluate to true)或被過濾時,這些語句通常被用作條件要求。
讓我們使用我們的零售資料集探索使用布爾值。我們可以指定等于以及小于或大于:
// in Scala import org.apache.spark.sql.functions.col df.where(col("InvoiceNo").equalTo(536365)) .select("InvoiceNo", "Description") .show(5, false) // in Scala import org.apache.spark.sql.functions.col df.where(col("InvoiceNo") === 536365) .select("InvoiceNo", "Description") .show(5, false)
注意
Scala有一些關于使用==和===的特殊語義。在Spark中,如果您想要通過等式進行過濾,您應該使用===(相等)或=!=(不相等)。您還可以使用not函數和equalTo方法。
另一個選項(可能是最幹淨的選項)是将謂詞指定為字元串中的表達式。這對Python或Scala有效。請注意,這也使您可以使用另一種表達“不等于”的方式:
df.where("InvoiceNo = 536365") .show(5, false) df.where("InvoiceNo <> 536365") .show(5, false)
我們提到,在使用and或or時,可以使用多個部分指定布爾表達式。在Spark中,您應該始終将and過濾器連接配接在一起作為一個序列過濾器。原因是,即使布爾語句是串行的(一個接一個),Spark将所有這些過濾器壓平為一個語句,并同時執行過濾器。雖然您可以通過使用and(如果您願意的話)顯式地指定語句,但是如果您以串行方式指定語句,它們通常更容易了解和讀取。or須在同一語句中指明:
// in Scala val priceFilter = col("UnitPrice") > 600 val descripFilter = col("Description").contains("POSTAGE") df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter)) .show() -- in SQL SELECT * FROM dfTable WHERE StockCode in ("DOT") AND(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)
布爾表達式不僅僅是為過濾器保留的。要過濾一個DataFrame,您還可以指定一個布爾列:
// in Scala val DOTCodeFilter = col("StockCode") === "DOT" val priceFilter = col("UnitPrice") > 600 val descripFilter = col("Description").contains("POSTAGE") df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter))) .as("isExpensive") .select("unitPrice", "isExpensive").show(5) -- in SQL SELECT UnitPrice, (StockCode = 'DOT' AND (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive FROM dfTable WHERE (StockCode = 'DOT' AND (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))
如果您有SQL背景知識,那麼所有這些語句都應該非常熟悉。實際上,它們都可以表示為where子句。事實上,使用SQL語句來表達過濾器比使用程式設計的DataFrame接口和Spark SQL更容易,這使得我們無需付出任何性能代價就可以做到這一點。例如,以下兩個表述是等價的:
// in Scala import org.apache.spark.sql.functions.{expr, not, col} df.withColumn("isExpensive", not(col("UnitPrice").leq(250))) .filter("isExpensive") .select("Description", "UnitPrice").show(5) df.withColumn("isExpensive", expr("NOT UnitPrice <= 250")) .filter("isExpensive") .select("Description", "UnitPrice").show(5)
一個可能出現的“陷阱”, 如果在建立布爾表達式時使用null資料。如果您的資料中有一個空值,那麼您将需要以稍微不同的方式處理事情。null值等于的測試代碼:
df.where(col("Description").eqNullSafe("hello")).show()
6.4. 使用 Numbers類型
為了建構一個虛構的示例,假設我們發現我們在零售資料集中錯誤地記錄了數量,真實數量等于(目前數量*單價)的平方 + 5。
// in Scala import org.apache.spark.sql.functions.{expr, pow} val fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5 df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2) # in Python from pyspark.sql.functions import expr, pow fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5 df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2) +----------+------------------+ |CustomerId| realQuantity| +----------+------------------+ | 17850.0|239.08999999999997| | 17850.0| 418.7156| +----------+------------------+
注意我們可以把列相乘因為它們都是數值。當然,我們也可以根據需要進行加減操作。事實上,我們還可以将所有這些操作使用SQL表達式來完成:
// in Scala df.selectExpr( "CustomerId", "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2) # in Python df.selectExpr( "CustomerId", "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2) -- in SQL SELECT customerId, (POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity FROM dfTable
另一個常見的數值操作:四舍五入操作。如果你想四舍五入到一個整數,通常你可以将值轉換成一個整數,這樣就可以了。然而,Spark還具有更詳細的函數來顯式地執行此操作并達到一定的精度。在下面的例子中,我們四舍五入到小數點後一位:
// in Scala import org.apache.spark.sql.functions.{round, bround} df.select(round(col("UnitPrice"), 1).alias("rounded"), col("UnitPrice")).show(5)
round()操作是向上四舍五入。bround()操作是向下舍去小數。
// in Scala import org.apache.spark.sql.functions.lit df.select(round(lit("2.5")), bround(lit("2.5"))).show(2) # in Python from pyspark.sql.functions import lit, round, bround df.select(round(lit("2.5")), bround(lit("2.5"))).show(2) -- in SQL SELECT round(2.5), bround(2.5) +-------------+--------------+ |round(2.5, 0)|bround(2.5, 0)| +-------------+--------------+ | 3.0| 2.0| | 3.0| 2.0| +-------------+--------------+
另一個數值任務是計算兩列的相關性。例如,我們可以看到兩列的皮爾遜相關系數,看看便宜的東西是否大量購買。我們可以通過一個函數以及DataFrame統計方法來實作這一點:
// in Scala import org.apache.spark.sql.functions.{corr} df.stat.corr("Quantity", "UnitPrice") df.select(corr("Quantity", "UnitPrice")).show() # in Python from pyspark.sql.functions import corr df.stat.corr("Quantity", "UnitPrice") df.select(corr("Quantity", "UnitPrice")).show() -- in SQL SELECT corr(Quantity, UnitPrice) FROM dfTable +-------------------------+ |corr(Quantity, UnitPrice)| +-------------------------+ | -0.04112314436835551| +-------------------------+
6.5. 使用String類型
// in Scala import org.apache.spark.sql.functions.{lower, upper} df.select(col("Description"), lower(col("Description")), upper(lower(col("Description")))).show(2) # in Python from pyspark.sql.functions import lower, upper df.select(col("Description"), lower(col("Description")), upper(lower(col("Description")))).show(2) -- in SQL SELECT Description, lower(Description), Upper(lower(Description)) FROM dfTable +--------------------+--------------------+-------------------------+ | Description| lower(Description)|upper(lower(Description))| +--------------------+--------------------+-------------------------+ |WHITE HANGING HEA...|white hanging hea...| WHITE HANGING HEA...| | WHITE METAL LANTERN| white metal lantern| WHITE METAL LANTERN| +--------------------+--------------------+-------------------------+
另一個簡單的任務是在字元串周圍添加或删除空格。你可以使用lpad、ltrim、rpad和rtrim、trim:
// in Scala import org.apache.spark.sql.functions.{lit, ltrim, rtrim, rpad, lpad, trim} df.select( ltrim(lit(" HELLO ")).as("ltrim"), rtrim(lit(" HELLO ")).as("rtrim"), trim(lit(" HELLO ")).as("trim"), lpad(lit("HELLO"), 3, " ").as("lp"), rpad(lit("HELLO"), 10, " ").as("rp")).show(2) # in Python from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim df.select( ltrim(lit(" HELLO ")).alias("ltrim"), rtrim(lit(" HELLO ")).alias("rtrim"), trim(lit(" HELLO ")).alias("trim"), lpad(lit("HELLO"), 3, " ").alias("lp"), rpad(lit("HELLO"), 10, " ").alias("rp")).show(2) -- in SQL SELECT ltrim(' HELLLOOOO '), rtrim(' HELLLOOOO '), trim(' HELLLOOOO '), lpad('HELLOOOO ', 3, ' '), rpad('HELLOOOO ', 10, ' ') FROM dfTable +---------+---------+-----+---+----------+ | ltrim| rtrim| trim| lp| rp| +---------+---------+-----+---+----------+ |HELLO | HELLO|HELLO| HE|HELLO | |HELLO | HELLO|HELLO| HE|HELLO | +---------+---------+-----+---+----------+
注意,如果lpad或rpad取的數字小于字元串的長度,它将總是從字元串的右側删除值。
6.5.1. 正規表達式
可能最常執行的任務之一是搜尋另一個字元串的存在,或者用另一個值替換所有提到的字元串。這通常是通過一個稱為正規表達式的工具來實作的,該工具存在于許多程式設計語言中。正規表達式使使用者能夠指定一組規則,以便從字元串中提取值或用其他值替換它們。Spark利用了Java正規表達式的強大功能。Spark中有兩個關鍵函數,您需要它們來執行正規表達式任務:regexp_extract 和 regexp_replace。兩個函數分别提取值和替換值。讓我們探讨如何使用regexp_replace函數替換description列中的替換顔色名稱:
// in Scala import org.apache.spark.sql.functions.regexp_replace val simpleColors = Seq("black", "white", "red", "green", "blue") val regexString = simpleColors.map(_.toUpperCase).mkString("|") // 在正規表達式文法中,|表示“或” df.select( regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"), col("Description")).show(2) -- in SQL SELECT regexp_replace(Description, 'BLACK|WHITE|RED|GREEN|BLUE', 'COLOR') as color_clean, Description FROM dfTable +--------------------+--------------------+ | color_clean| Description| +--------------------+--------------------+ |COLOR HANGING HEA...|WHITE HANGING HEA...| | COLOR METAL LANTERN| WHITE METAL LANTERN| +--------------------+--------------------+
6.6. 使用Dates 和Timestamps類型
日期和時間是程式設計語言和資料庫中經常遇到的挑戰。總是需要跟蹤時區,確定格式正确和有效。Spark通過明确地關注兩種與時間相關的資訊,盡力使事情保持簡單有專門關注月曆日期的date和包含日期和時間資訊的timestamp。正如我們在目前資料集中看到的那樣,Spark将盡力正确地識别列類型,包括在啟用inferSchema時的日期和時間戳。我們可以看到,這在我們目前的資料集上運作得非常好,因為它能夠識别和讀取我們的日期格式,而不需要我們為它提供一些規範。如前所述,處理日期和時間戳與處理字元串密切相關,因為我們經常将時間戳或日期存儲為字元串,并在運作時将它們轉換為日期類型。這在處理資料庫和結構化資料時不太常見,但在處理文本和CSV檔案時更常見。我們将很快對此進行實驗。
不幸的是,在處理日期和時間戳時有很多需要注意的地方,特别是在處理時區時。在2.1版本和之前,如果您正在解析的值中沒有顯式地指定時區,Spark将根據機器的時區進行解析。如果需要,可以通過設定本地會話的時區。SQL配置:
spark.conf.sessionLocalTimeZone
。這應該根據Java時區格式進行設定。
df.printSchema() root |-- InvoiceNo: string (nullable = true) |-- StockCode: string (nullable = true) |-- Description: string (nullable = true) |-- Quantity: integer (nullable = true) |-- InvoiceDate: timestamp (nullable = true) |-- UnitPrice: double (nullable = true) |-- CustomerID: double (nullable = true) |-- Country: string (nullable = true)
盡管Spark會盡力讀取日期或時間。然而,有時我們無法處理格式奇怪的日期和時間。了解您将要應用的轉換的關鍵是確定您确切地知道在方法的每個給定步驟中有什麼類型和格式。另一個常見的“陷阱”是Spark的TimestampType類隻支援二級精度,也就是說,如果你的時間是毫秒或微秒,你需要通過long時間的操作來解決這個問題。在強制使用TimestampType時,将删除精度。Spark可以對任何給定時間點的格式進行一些特殊處理。在進行解析或轉換時,務必明确說明這樣做是沒有問題的。最後,Spark還在使用Java日期和時間戳,是以符合這些标準。讓我們從基礎開始,擷取目前日期和目前時間戳:
// in Scala import org.apache.spark.sql.functions.{current_date, current_timestamp} val dateDF = spark.range(10) .withColumn("today", current_date()) .withColumn("now", current_timestamp()) dateDF.createOrReplaceTempView("dateTable") dateDF.printSchema() root |-- id: long (nullable = false) |-- today: date (nullable = false) |-- now: timestamp (nullable = false)
現在我們有了一個簡單的DataFrame,讓我們從今天開始加減5天。這些函數接收列名和加上或減去的天數作為參數:
// in Scala import org.apache.spark.sql.functions.{date_add, date_sub} dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1) -- in SQL SELECT date_sub(today, 5), date_add(today, 5) FROM dateTable +------------------+------------------+ |date_sub(today, 5)|date_add(today, 5)| +------------------+------------------+ | 2017-06-12| 2017-06-22| +------------------+------------------+
另一個常見的任務是檢視兩個日期之間的差異。我們可以使用datediff函數來實作這一點,該函數将傳回兩個日期之間的天數。大多數時候我們隻關心天數,因為每個月的天數不同,還有一個函數,months_between,它給出兩個日期之間的月數:
// in Scala import org.apache.spark.sql.functions.{datediff, months_between, to_date} dateDF.withColumn("week_ago", date_sub(col("today"), 7)) .select(datediff(col("week_ago"), col("today"))).show(1) dateDF.select( to_date(lit("2016-01-01")).alias("start"), to_date(lit("2017-05-22")).alias("end")) .select(months_between(col("start"), col("end"))).show(1) -- in SQL SELECT to_date('2016-01-01'), months_between('2016-01-01', '2017-01-01'), datediff('2016-01-01', '2017-01-01') FROM dateTable +---------------------------------+ |datediff(week_ago, today)| +---------------------------------+ | -7| +---------------------------------+ +------------------ -----------------+ |months_between(start, end)| +------------------------------------+ | -16.67741935| +------------------------------------+
注意,我們引入了一個新函數:to_date函數。to_date函數允許您将字元串轉換為日期,可以選擇指定格式。我們以JavaSimpleDateFormat指定我們的格式,如果您使用此函數,該格式将非常重要,值得您參考:
// in Scala import org.apache.spark.sql.functions.{to_date, lit} spark.range(5).withColumn("date", lit("2017-01-01")) .select(to_date(col("date"))).show(1)
如果Spark不能解析日期,則不會抛出錯誤;而是傳回null。在更大的pipeline中,這可能有點棘手,因為您可能會以一種格式期待您的資料,并在另一種格式中擷取資料。為了說明這一點,讓我們看一下日期格式從year-month-day 到 year-day-month的轉換。
dateDF.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1) +-------------------+-------------------+ |to_date(2016-20-12)|to_date(2017-12-11)| +-------------------+-------------------+ | null| 2017-12-11| +-------------------+-------------------+
我們發現這對bug來說是一種特别棘手的情況,因為有些日期可能比對正确的格式,而另一些則不比對。在前面的示例中,請注意第二個日期如何顯示為12月11日,而不是11月12日。Spark不會抛出錯誤,因為它不知道日期是混在一起的還是特定的行是不正确的。讓我們一步一步地修複這個問題,并想出一個健壯的方法來完全避免這些問題。第一步是記住,我們需要根據Java SimpleDateFormat标準指定日期格式。我們将使用兩個函數來修複這個問題:to_date和to_timestamp。前者可選地要求格式,而後者需要格式:
// in Scala import org.apache.spark.sql.functions.to_date val dateFormat = "yyyy-dd-MM" val cleanDateDF = spark.range(1).select( to_date(lit("2017-12-11"), dateFormat).alias("date"), to_date(lit("2017-20-12"), dateFormat).alias("date2")) cleanDateDF.createOrReplaceTempView("dateTable2") -- in SQL SELECT to_date(date, 'yyyy-dd-MM'), to_date(date2, 'yyyy-dd-MM'), to_date(date) FROM dateTable2 +----------+----------+ | date| date2| +----------+----------+ |2017-11-12|2017-12-20| +----------+----------+
現在讓我們使用to_timestamp的一個例子,它總是需要指定一個格式:
// in Scala import org.apache.spark.sql.functions.to_timestamp cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show() -- in SQL SELECT to_timestamp(date, 'yyyy-dd-MM'), to_timestamp(date2, 'yyyy-dd-MM') FROM dateTable2 +----------------------------------+ |to_timestamp(`date`, 'yyyy-dd-MM')| +----------------------------------+ | 2017-11-12 00:00:00| +----------------------------------+
在日期和時間戳之間進行轉換在所有語言中都很簡單——在SQL中,我們采用以下方法:
-- in SQL SELECT cast(to_date("2017-01-01", "yyyy-dd-MM") as timestamp)
在我們有了正确的格式和類型的日期或時間戳之後,比較它們實際上是很容易的。我們隻需要確定使用日期/時間戳類型或者根據正确的yyyy - mm -dd格式指定我們的字元串,如果我們在比較日期:
cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()
一個小問題是,我們還可以将其設定為字元串,它将解析為文本:
cleanDateDF.filter(col("date2") > "'2017-12-12'").show()
6.7. 資料進行中的null值
作為一種最佳實踐,應該始終使用nulls來表示DataFrames中丢失的或空的資料。Spark可以比使用空字元串或其他值更優化地使用null值。在DataFrame範圍内,與null值互動的主要方式是在DataFrame上使用.na子包。還有幾個函數用于執行操作并顯式地指定Spark應該如何處理null值。有關更多資訊,請參見第5章(我們将在其中讨論排序),并參考“與boolean一起工作”。
Nulls是所有程式設計中具有挑戰性的部分,Spark也不例外。在我們看來,顯式處理null值總是比隐式處理好。例如,在本書的這一部分中,我們看到了如何将列定義為具有空類型。然而,這是有問題的。當我們聲明一個列不可為null時,實際上并沒有強制執行。重申一下,當您定義一個模式時,其中所有列都被聲明為notnull,Spark将不會強制執行該模式,并且很高興地讓空值進入該列。是否為null的辨別,隻是為了幫助觸發SQL優化以處理該列。如果列中不應該有null值,則可能會得到不正确的結果或看到難以調試的奇怪異常。
可以對空值做兩件事:可以顯式地删除空值,或者可以用值(全局或每列)填充空值。現在讓我們來做一個實驗。
Coalesce
Spark包含一個函數,允許您使用coalesce函數從一組列中選擇第一個非空值。在這種情況下,沒有空值,是以它隻傳回第一列:
// in Scala import org.apache.spark.sql.functions.coalesce df.select(coalesce(col("Description"), col("CustomerId"))).show()
ifnull, nullIf, nvl, 和 nvl2
您還可以使用其他幾個SQL函數來實作類似的功能。
- ifnull:允許您在第一個值為null時選擇第二個值,并預設為第一個值
- nullif: 如果兩個值相等則傳回null,否則傳回第二個值
- nvl: 如果第一個值為null,則傳回第二個值,但預設為第一個值
- nvl2: 如果第一個值不是null,則傳回第二個值; 否則,它将傳回最後指定的值(下面示例中的else_value)
-- in SQL SELECT ifnull(null, 'return_value'), nullif('value', 'value'), nvl(null, 'return_value'), nvl2('not_null', 'return_value', "else_value") FROM dfTable LIMIT 1 +------------+----+------------+------------+ | a| b| c| d| +------------+----+------------+------------+ |return_value|null|return_value|return_value| +------------+----+------------+------------+
當然,我們也可以在DataFrames的select表達式中使用它們。
drop
最簡單的函數是drop,它删除包含null的行。預設情況是删除任何值為空的行:
df.na.drop() df.na.drop("any")
在SQL中,我們必須逐列執行此操作:
-- in SQL SELECT * FROM dfTable WHERE Description IS NOT NULL
将“any”指定為參數将删除任一個字段為null的行。使用“all”隻在該行的所有值為null或NaN時才删除行:
df.na.drop("all")
我們也可以通過傳入列數組,将此應用于某些列集:
// in Scala df.na.drop("all", Seq("StockCode", "InvoiceNo"))
fill
使用fill函數,可以用一組值填充一個或多個列。這可以通過指定一個map來完成——它是一個特定的值和一組列。例如,要在String類型的列中填充所有空值,可以指定以下内容:
df.na.fill("All Null values become this string")
對于類型為Integer的列,我們也可以使用df.na.fill(5:Integer),對于類型為Double的列,可以使用
df.na.fill(5:Double)
.
要指定列,我們隻需傳入列名稱數組,就像前面示例中的那樣
// in Scala df.na.fill(5, Seq("StockCode", "InvoiceNo"))
我們還可以使用Scala Map來實作這一點,其中鍵是列名稱,值是我們希望用來填充空值的值:
// in Scala val fillColValues = Map("StockCode" -> 5, "Description" -> "No Value") df.na.fill(fillColValues)
replace
唯一的要求是該值與原始值的類型相同:
// in Scala df.na.replace("Description", Map("" -> "UNKNOWN"))
ordering
正如我們在第5章中讨論的,您可以使用asc_nulls_first、desc_nulls_first、asc_nulls_last或desc_nulls_last,來指定您希望在有序的DataFrame中顯示空值的位置
6.8. 使用複雜類型
複雜類型可以幫助您公司組織資料,進而使希望解決的問題更有意義。有三種複雜類型: structs, arrays, 和 maps。
Structs
可以将structs(結構)看作是DataFrames中的DataFrames。一個示例将更清楚地說明這一點。我們可以通過在查詢的括号中封裝一組列來建立一個結構體:
df.selectExpr("(Description, InvoiceNo) as complex", "*") df.selectExpr("struct(Description, InvoiceNo) as complex", "*") // in Scala import org.apache.spark.sql.functions.struct val complexDF = df.select(struct("Description", "InvoiceNo").alias("complex")) complexDF.createOrReplaceTempView("complexDF")
現在我們有了一個包含complex列的DataFrame。我們可以像查詢另一個DataFrame一樣查詢它,唯一的差別是我們使用點文法來查詢它,或者使用列方法getField:
complexDF.select("complex.Description") complexDF.select(col("complex").getField("Description"))
我們還可以使用*查詢結構中的所有值。這将把所有列顯示到頂層DataFrame:
complexDF.select("complex.*") -- in SQL SELECT complex.* FROM complexDF
Arrays
為了更好了解Arrays類型,先看一個例子。使用目前資料,我們的目标是将Description列中的每個單詞轉換為DataFrame中的一行。第一個任務是将Description列轉換為複雜類型Array。
我們使用split函數來實作這一點,并指定分隔符:
// in Scala import org.apache.spark.sql.functions.split df.select(split(col("Description"), " ")).show(2) # in Python from pyspark.sql.functions import split df.select(split(col("Description"), " ")).show(2) -- in SQL SELECT split(Description, ' ') FROM dfTable
這非常強大,因為Spark允許我們将這種複雜類型作為另一列來操作。我們也可以使用類似python文法查詢數組的值:
// in Scala df.select(split(col("Description"), " ").alias("array_col")) .selectExpr("array_col[0]").show(2) # in Python df.select(split(col("Description"), " ").alias("array_col"))\ .selectExpr("array_col[0]").show(2) -- in SQL SELECT split(Description, ' ')[0] FROM dfTable
Array長度
我們可以通過查詢數組的大小來确定數組的長度:
// in Scala import org.apache.spark.sql.functions.size df.select(size(split(col("Description"), " "))).show(2) // shows 5 and 3 # in Python from pyspark.sql.functions import size df.select(size(split(col("Description"), " "))).show(2) # shows 5 and 3
array_contains函數
我們還可以看到這個數組是否包含某個值:
// in Scala import org.apache.spark.sql.functions.array_contains df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2) # in Python from pyspark.sql.functions import array_contains df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2) -- in SQL SELECT array_contains(split(Description, ' '), 'WHITE') FROM dfTable
然而,這并不能解決我們目前的問題。要将複雜類型轉換為一組行(數組中的每個值對應一個行),需要使用explode函數。Explode函數explode函數接受一個由數組組成的列,并為數組中的每個值建立一行(其餘值重複)。圖6-1說明了這個過程。
// in Scala import org.apache.spark.sql.functions.{split, explode} df.withColumn("splitted", split(col("Description"), " ")) .withColumn("exploded", explode(col("splitted"))) .select("Description", "InvoiceNo", "exploded").show(2) # in Python from pyspark.sql.functions import split, explode df.withColumn("splitted", split(col("Description"), " "))\ .withColumn("exploded", explode(col("splitted")))\ .select("Description", "InvoiceNo", "exploded").show(2) -- in SQL SELECT Description, InvoiceNo, exploded FROM (SELECT *, split(Description, " ") as splitted FROM dfTable) LATERAL VIEW explode(splitted) as exploded
Maps
maps是通過使用map函數和列的鍵值對建立的。然後你就可以像從數組中選擇一樣選擇它們:
// in Scala import org.apache.spark.sql.functions.map df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(2) # in Python from pyspark.sql.functions import create_map df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(2) -- in SQL SELECT map(Description, InvoiceNo) as complex_map FROM dfTable WHERE Description IS NOT NULL
您可以使用正确的key查詢它們。如果key不存在,則傳回null:
// in Scala df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")) .selectExpr("complex_map['WHITE METAL LANTERN']").show(2) # in Python df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))\ .selectExpr("complex_map['WHITE METAL LANTERN']").show(2)
你也可以explode map類型,這将把他們變成列:
// in Scala df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")) .selectExpr("explode(complex_map)").show(2) # in Python df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))\ .selectExpr("explode(complex_map)").show(2)
6.9. 使用JSON
您可以直接操作Spark中的JSON字元串,并解析JSON或提取JSON對象。讓我們從建立一個JSON列開始:
// in Scala val jsonDF = spark.range(1).selectExpr(""" '{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""") # in Python jsonDF = spark.range(1).selectExpr(""" '{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")
可以使用get_json_object内聯查詢JSON對象,無論是字典還是數組。如果這個對象隻有一個嵌套層,你可以使用json_tuple:
// in Scala import org.apache.spark.sql.functions.{get_json_object, json_tuple} jsonDF.select( get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column", json_tuple(col("jsonString"), "myJSONKey")).show(2) # in Python from pyspark.sql.functions import get_json_object, json_tuple jsonDF.select( get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column", json_tuple(col("jsonString"), "myJSONKey")).show(2) --SQL jsonDF.selectExpr( "json_tuple(jsonString, '$.myJSONKey.myJSONValue[1]') as column").show(2)
您還可以使用to_json函數将StructType轉換為JSON字元串:
// in Scala import org.apache.spark.sql.functions.to_json df.selectExpr("(InvoiceNo, Description) as myStruct") .select(to_json(col("myStruct"))) # in Python from pyspark.sql.functions import to_json df.selectExpr("(InvoiceNo, Description) as myStruct")\ .select(to_json(col("myStruct")))
該函數還接受與JSON資料源相同的參數字典(map)。可以使用from_json函數解析這個(或其他JSON資料)。這自然要求您指定一個模式,您也可以選擇指定選項的map,以及:
// in Scala import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types._ val parseSchema = new StructType(Array( new StructField("InvoiceNo",StringType,true), new StructField("Description",StringType,true))) df.selectExpr("(InvoiceNo, Description) as myStruct") .select(to_json(col("myStruct")).alias("newJSON")) .select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2) # in Python from pyspark.sql.functions import from_json from pyspark.sql.types import * parseSchema = StructType(( StructField("InvoiceNo",StringType(),True), StructField("Description",StringType(),True))) df.selectExpr("(InvoiceNo, Description) as myStruct")\ .select(to_json(col("myStruct")).alias("newJSON"))\ .select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)
6.10. 使用者自定義函數UDF
在Spark中,最強大的功能之一就是定義自己的函數。這些使用者定義函數(udf)使您能夠使用Python或Scala編寫自己的自定義轉換,甚至使用外部library庫。udf可以接受并傳回一個或多個列作為輸入。Spark udf非常強大,因為您可以用幾種不同的程式設計語言編寫它們;您不需要以深奧的格式或領域特定的語言建立它們。它們隻是對資料進行操作的函數,一條記錄一條記錄的操作。預設情況下,這些函數被注冊為臨時函數,以便在特定的SparkSession或上下文中使用。盡管您可以用Scala、Python或Java編寫udf,但是您應該注意一些性能方面的考慮。為了說明這一點,我們将詳細介紹建立UDF時發生的情況,如何将其傳遞給Spark,然後使用UDF執行代碼。第一步是實際函數。我們将為本例建立一個簡單的例子。讓我們寫一個power3函數,它接受一個數字并将其提升到3的幂:
// in Scala val udfExampleDF = spark.range(5).toDF("num") def power3(number:Double):Double = number * number * number power3(2.0) # in Python udfExampleDF = spark.range(5).toDF("num") def power3(double_value): return double_value ** 3 power3(2.0)
在這個簡單的例子中,我們可以看到我們的函數按預期工作。我們能夠提供單獨的輸入并産生預期的結果(使用這個簡單的測試用例)。到目前為止,我們對輸入的期望很高:它必須是特定的類型,不能是空值(參見“處理資料中的空值”)。現在我們已經建立了這些函數并對它們進行了測試,我們需要将它們注冊到Spark中,以便能夠在所有worker上使用它們。Spark将在Driver程式上序列化該函數,并通過網絡将其傳輸給所有Executor程序。這與語言無關。當你使用這個函數時,實際上會發生兩件不同的事情。如果函數是用Scala或Java編寫的,您可以在Java虛拟機(JVM)中使用它。這意味着,除了不能利用Spark為内置函數提供的代碼生成功能之外,不會有什麼性能損失。如果建立或使用大量對象,可能會出現性能問題;我們将在第19章的優化一節中讨論這個問題。如果函數是用Python編寫的,就會發生完全不同的情況。Spark在worker上啟動一個Python程序,将所有資料序列化為Python能夠了解的格式(請記住,它在前面的JVM中),在Python程序中對該資料逐行執行函數,最後将行操作的結果傳回給JVM和Spark。圖6-2提供了該過程的概述。
警告
啟動這個Python程序的代價很高,但真正的代價是将資料序列化到Python。這樣做的代價很高,原因有二:它是一個昂貴的計算,而且,在資料進入Python之後,Spark無法管理worker的記憶體。這意味着,如果worker受到資源限制,您可能會導緻它失敗(因為JVM和Python都在同一台機器上争奪記憶體)。我們建議您使用Scala或java編寫您的udf—用Scala編寫函數所花費的少量時間将始終帶來顯著的速度提升,而且最重要的是,您仍然可以使用Python中的函數!
現在您已經了解了這個過程,讓我們通過一個示例來示範。首先,我們需要注冊這個函數,使它作為DataFrame的可用函數:
// in Scala import org.apache.spark.sql.functions.udf val power3udf = udf(power3(_:Double):Double) udfExampleDF.select(power3udf(col("num"))).show() # in Python from pyspark.sql.functions import udf power3udf = udf(power3) from pyspark.sql.functions import col udfExampleDF.select(power3udf(col("num"))).show(2)
在這一點上,我們隻能将其用作DataFrame函數。也就是說,我們不能在字元串表達式中使用它,隻能在表達式中使用它。不過,我們也可以将這個UDF注冊為Spark SQL函數。這很有價值,因為它使得在SQL中以及跨語言中使用這個函數變得很簡單。讓我們在Scala中注冊這個函數:
// in Scala spark.udf.register("power3", power3(_:Double):Double) udfExampleDF.selectExpr("power3(num)").show(2)
因為這個函數是用Spark SQL注冊的——我們已經了解到,任何Spark SQL函數或表達式在處理資料流時都可以作為表達式使用——是以我們可以轉而使用用Scala和Python編寫的UDF。但是,我們沒有将它用作DataFrame函數,而是将它用作SQL表達式:
# in Python udfExampleDF.selectExpr("power3(num)").show(2) # registered in Scala
我們還可以将Python函數注冊為SQL函數,并在任何語言中使用它。為了確定函數正常工作,我們還可以指定一個傳回類型。正如我們在本節的開頭所看到的,Spark管理自己的類型資訊,而這些資訊與Python的類型并不完全一緻。是以,最好在定義函數時為其定義傳回類型。需要注意的是,沒有必要指定傳回類型,但這是最佳實踐。如果指定的類型與函數傳回的實際類型不一緻,Spark将不會抛出錯誤,而是傳回null來指定失敗。如果将下面函數中的傳回類型切換為double類型,就可以看到這一點:
# in Python from pyspark.sql.types import IntegerType, DoubleType spark.udf.register("power3py", power3, DoubleType()) # in Python udfExampleDF.selectExpr("power3py(num)").show(2) # registered via Python
這是因為範圍建立整數。當整數在Python中操作時,Python不會将它們轉換為浮點數(與Spark的double類型對應的類型),是以我們看到null。我們可以通過確定Python函數傳回一個浮點數而不是整數來糾正這個問題,并且函數将正确地運作。當然,在我們注冊它們之後,我們也可以從SQL中使用它們中的任何一個:
-- in SQL SELECT power3(12), power3py(12) -- doesn't work because of return type
-- in SQL CREATE TEMPORARY FUNCTION myFunc AS 'com.organization.hive.udf.FunctionName'