天天看點

spark——RDD常見的轉化和行動操作

今天是spark第三篇文章,我們繼續來看RDD的一些操作。

我們前文說道在spark當中RDD的操作可以分為兩種,一種是轉化操作(transformation),另一種是行動操作(action)。在轉化操作當中,spark不會為我們計算結果,而是會生成一個新的RDD節點,記錄下這個操作。隻有在行動操作執行的時候,spark才會從頭開始計算整個計算。

而轉化操作又可以進一步分為針對元素的轉化操作以及針對集合的轉化操作。

針對元素的轉化操作

針對元素的轉化操作非常常用,其中最常用的就是map和flatmap。從名字上看這兩者都是map操作,map操作我們都知道,在之前的MapReduce文章以及Python map、reduce用法的文章當中都有提及。簡而言之就是可以将一個操作映射在每一個元素上。

比如假設我們有一個序列[1, 3, 4, 7],我們希望将當中每一個元素執行平方操作。我們當然可以用for循環執行,但是在spark當中更好的辦法是使用map。

nums = sc.parallelize([1, 3, 4, 7])
spuare = nums.map(lambda x: x * x)           

複制

我們知道map是一個轉化操作,是以square仍然是一個RDD,我們直接将它輸出不會得到結果,隻會得到RDD的相關資訊:

spark——RDD常見的轉化和行動操作

内部RDD的轉化圖是這樣的:

spark——RDD常見的轉化和行動操作

我們想看結果就必須要執行行動操作,比如take,我們take一下檢視一下結果:

spark——RDD常見的轉化和行動操作

和我們的預期一緻,對于之前一直關注的同學來說map操作應該已經很熟悉了,那麼這個flatmap又是什麼呢?

差别就在這個flat,我們都知道flat是扁平的意思,是以flatmap就是說map執行之後的結果扁平化。說白了也就是說如果map執行之後的結果是一個數組的話,那麼會将數組拆開,把裡面的内容拿出來組合到一起。

我們一起來看一個例子:

texts = sc.parallelize(['now test', 'spark rdd'])
split = texts.map(lambda x: x.split(' '))           

複制

由于我們執行map的對象是一個字元串,一個字元串執行split操作之後會得到一個字元串數組。如果我們執行map,得到的結果會是:

spark——RDD常見的轉化和行動操作

如果我們執行flatmap呢?我們也可以試一下:

spark——RDD常見的轉化和行動操作

對比一下,有沒有注意到差别?

是了,map執行的結果是一個array的array,因為每一個string split之後就是一個array,我們把array拼接到一起自然是一個array的array。而flatMap會把這些array攤平之後放在一起,這也是兩者最大的差别。

針對集合的轉化操作

上面介紹了針對元素的轉化操作,下面來看看針對集合的轉化操作。

針對集合的操作大概有union,distinct,intersection和subtract這幾種。我們可以先看下下圖有一個直覺地感受,之後我們再一一分析:

spark——RDD常見的轉化和行動操作

首先來看distinct,這個顧名思義,就是去除重複。和SQL當中的distinct是一樣的,這個操作的輸入是兩個集合RDD,執行之後會生成一個新的RDD,這個RDD當中的所有元素都是unique的。有一點需要注意,執行distinct的開銷很大,因為它會執行shuffle操作将所有的資料進行亂序,以確定每個元素隻有一份。如果你不明白shuffle操作是什麼意思,沒有關系,我們在後序的文章當中會着重講解。隻需要記住它的開銷很大就行了。

第二種操作是union,這個也很好了解,就是把兩個RDD當中的所有元素合并。你可以把它當成是Python list當中的extend操作,同樣和extend一樣,它并不會做重複元素的檢測,是以如果合并的兩個集合當中有相同的元素并不會被過濾,而是會被保留。

第三個操作是intersection,它的意思是交集,也就是兩個集合重疊的部分。這個應該蠻好了解的,我們看下下圖:

spark——RDD常見的轉化和行動操作

下圖當中藍色的部分,也就是A和B兩個集合的交集部分就是A.intersection(B)的結果,也就是兩個集合當中共有的元素。同樣,這個操作也會執行shuffle,是以開銷一樣很大,并且這個操作會去掉重複的元素。

最後一個是subtract,也就是差集,就是屬于A不屬于B的元素,同樣我們可以用圖來表示:

spark——RDD常見的轉化和行動操作

上圖當中灰色陰影部分就是A和B兩個集合的差集,同樣,這個操作也會執行shuffle,非常耗時。

除了以上幾種之外,還有cartesian,即笛卡爾積,sample抽樣等集合操作,不過相對而言用的稍微少一些,這裡就不過多介紹了,感興趣的同學可以了解一下,也并不複雜。

行動操作

RDD中最常用的行動操作應該就是擷取結果的操作了,畢竟我們算了半天就是為了拿結果,隻擷取RDD顯然不是我們的目的。擷取結果的RDD主要是take,top和collect,這三種沒什麼特别的用法,簡單介紹一下。

其中collect是擷取所有結果,會傳回所有的元素。take和top都需要傳入一個參數指定條數,take是從RDD中傳回指定條數的結果,top是從RDD中傳回最前面的若幹條結果,top和take的用法完全一樣,唯一的差別就是拿到的結果是否是最前面的。

除了這幾個之外,還有一個很常用的action是count,這個應該也不用多說,計算資料條數的操作,count一下就可以知道有多少條資料了。

reduce

除了這些比較簡單的之外,再介紹另外兩個比較有意思的,首先,先來介紹reduce。reduce顧名思義就是MapReduce當中的reduce,它的用法和Python當中的reduce幾乎完全一樣,它接受一個函數來進行合并操作。我們來看個例子:

spark——RDD常見的轉化和行動操作

在這個例子當中,我們的reduce函數是将兩個int執行加和,reduce機制會重複執行這個操作将所有的資料合并,是以最終得到的結果就是1 + 3 + 4 + 7 = 15.

fold

除了reduce之外還有一個叫做fold的action,它和reduce完全一樣,唯一不同的是它可以自定義一個初始值,并且是針對分區的,我們還拿上面的例子舉例:

spark——RDD常見的轉化和行動操作

直接看這個例子可能有點懵逼,簡單解釋一下就明白了,其實不複雜。我們注意到我們在使用parallelize創造資料的時候多加了一個參數2,這個2表示分區數。簡單可以了解成數組[1, 3, 4, 7]會被分成兩部分,但是我們直接collect的話還是原值。

現在我們使用fold,傳入了兩個參數,除了一個函數之外還傳入了一個初始值2。是以整個計算過程是這樣的:

對于第一個分區的答案是1 + 3 + 2 = 6,對于第二個分區的答案是4 + 7 + 2 = 13,最後将兩個分區合并:6 + 13 + 2 = 21。

也就是說我們對于每個分區的結果賦予了一個起始值,并且對分區合并之後的結果又賦予了一個起始值。

aggregate

老實講這個action是最難了解的,因為它比較反常。首先,對于reduce和fold來說都有一個要求就是傳回值的類型必須和rdd的資料類型相同。比如資料的類型是int,那麼傳回的結果也要是int。

但是對于有些場景這個是不适用的,比如我們想求平均,我們需要知道term的和,也需要知道term出現的次數,是以我們需要傳回兩個值。這個時候我們初始化的值應該是0, 0,也就是對于加和與計數而言都是從0開始的,接着我們需要傳入兩個函數,比如寫成這樣:

nums.aggregate((0, 0), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1]))           

複制

看到這行代碼會懵逼是必然的,不用擔心,我們一點一點解釋。

首先是第一個lambda函數,這裡的x不是一個值而是兩個值,或者說是一個二進制組,也就是我們最後傳回的結果,在我們的傳回預期裡,第一個傳回的數是nums的和,第二個傳回的數是nums當中數的個數。而這裡的y則是nums輸入的結果,顯然nums輸入的結果隻有一個int,是以這裡的y是一維的。那麼我們要求和當然是用x[0] + y,也就是說把y的值加在第一維上,第二維自然是加一,因為我們每讀取一個數就應該加一。

這點還比較容易了解,第二個函數可能有些費勁,第二個函數和第一個不同,它不是用在處理nums的資料的,而是用來處理分區的。當我們執行aggregate的時候,spark并不是單線程執行的,它會将nums中的資料拆分成許多分區,每個分區得到結果之後需要合并,合并的時候會調用這個函數。

和第一個函數類似,第一個x是最終結果,而y則是其他分區運算結束需要合并進來的值。是以這裡的y是二維的,第一維是某個分區的和,第二維是某個分區當中元素的數量,那麼我們當然要把它都加在x上。

spark——RDD常見的轉化和行動操作

上圖展示了兩個分區的時候的計算過程,其中lambda1就是我們傳入的第一個匿名函數,同理,lambda2就是我們傳入的第二個匿名函數。我想結合圖應該很容易看明白。

行動操作除了這幾個之外還有一些,由于篇幅原因我們先不贅述了,在後序的文章當中如果有出現,我們會再進行詳細解釋的。初學者學習spark比較抗拒的一個主要原因就是覺得太過複雜,就連操作還區分什麼轉化操作和行動操作。其實這一切都是為了惰性求值進而優化性能。這樣我們就可以把若幹個操作合并在一起執行,進而減少消耗的計算資源,對于分布式計算架構而言,性能是非常重要的名額,了解了這一點,spark為什麼會做出這樣的設計也就很容易了解了。

不僅spark如此,TensorFlow等深度學習架構也是如此,本質上許多看似反直覺的設計都是有更深層的原因的,了解了之後其實也很容易猜到,凡是拿到最終結果的操作往往都是行動操作,如果隻是一些計算,那麼十有八九是轉化操作。

持久化操作

Spark當中的RDD是惰性求值的,有的時候我們會希望多次使用同一個RDD。如果我們隻是簡單地調用行動操作,那麼spark會多次重複計算RDD和它對應的所有資料以及其他依賴,這顯然會帶來大量開銷。我們很自然地會希望對于我們經常使用的RDD可以緩存起來,在我們需要的時候随時拿來用,而不是每次用到的時候都需要重新跑。

為了解決這個問題,spark當中提供了持久化的操作。所謂的持久化可以簡單了解成緩存起來。用法也很簡單,我們隻需要對RDD進行persist即可:

texts = sc.parallelize(['now test', 'hello world'])
split = texts.split(lambda x: x.split(' '))
split.persist()           

複制

調用完持久化之後,RDD會被緩存進記憶體或磁盤當中,我們需要的時候可以随時調出來使用,就不用把前面的整個流程全部跑一遍了。并且spark當中支援多種級别的持久化操作,我們可以通過StorageLevel的變量來控制。我們來看下這個StorageLevel的取值:

spark——RDD常見的轉化和行動操作

我們根據需要選擇對應的緩存級别即可。當然既然有持久化自然就有反持久化,對于一些已經不再需要緩存的RDD,我們可以調用unpersist将它們從緩存當中去除。

今天的内容雖然看起來各種操作五花八門,但是有些并不是經常用到,我們隻需要大概有個印象,具體操作的細節可以等用到的時候再做仔細的研究。希望大家都能忽略這些并不重要的細節,抓住核心的本質。