天天看點

spark算子集合

spark算子集合

1.Transformation算子

map/mapToPair

mapPartitions

mapPartitionWithIndex

flatMap/flatMapToPair

filter

sample

reduceByKey

sortByKey/sortBy

join

leftOuterJoin

rightOuterJoin

fullOuterJoin

union

intersection

subtract

distinct

cogroup

zip

zipWithIndex

repartition

coalesce

groupByKey

2.Action算子

count

foreach

take

first

collect

foreachPartition

countByKey

countByValue

reduce

3.持久化算子

cache

persist

checkpoint

checkpoint特點

checkpoint流程

checkpoint的優化

注意

reduce和reduceByKey的差別

reduceByKey()和groupByKey()的差別

算子:Spark中,

RDD的方法就叫算子(也叫函數)如flatMap,map,reduceByKey,foreach

Spark中的算子分為三類:

Transformation轉換算子 [懶執行的,需要Action算子觸發才執行]

map,flatMap,reduceByKey,sortBy(java不存在這個),sortByKey,filter,sample

Action行動算子 [觸發Transformation類算子執行,有一個Action算子,就有一個job(任務)]

foreach,take,first,count,collect

持久化算子

spark算子集合

又叫轉換算子,懶執行,需要Action算子觸發執行

Transformations類算子是一類算子(函數)叫做轉換算子,如map,flatMap,reduceByKey等。Transformations算子是延遲執行,也叫懶加載執行(需要Action算子觸發才能執行)。

将一個RDD中的每個資料項,通過map中的函數映射變為一個新的元素。

特點:輸入一條,輸出一條資料。

Scala隻有map,Java中有map,mapToPair

Java中map輸出的不能輸出kv格式,要想輸出kv格式資料要是用mapToPair

mapToPair會将一個長度為N的、每個元素都是T類型的對象,轉換成另一個長度為N的、每個元素都是<K2,V2>類型的對象

與map類似,周遊的機關是每個partition上的資料。

一個分區一個分區的資料來處理

傳入的參數是一個partition的資料

機關按分區來處理資料節省某些操作需要的時間

類似于mapPartitions,除此之外還會攜帶分區的索引值。可擷取每個分區的索引

先map後flat。與map類似,每個輸入項可以映射為0到多個輸出項。

scala隻有flatMap

java有flatMap和flatMapToPair,flatMapToDouble

Java中flatMap輸出的不能輸出kv格式,要想輸出kv格式資料要是用flatMapToPair

保留符合條件的記錄數,true保留,false過濾掉。

随機抽樣算子,根據傳進去的小數按比例進行又放回或者無放回的抽樣。

第一個參數:是否有放回 第二個參數:抽樣抽多少資料(百分比),第三個參數:種子

對RDD進行抽樣,其中參數withReplacement為true時表示抽樣之後還放回,可以被多次抽樣,false表示不放回;fraction表示抽樣比例;seed為随機數種子,比如目前時間戳

将相同的Key根據相應的邏輯進行處理。

reduceByKey就是對元素為KV對的RDD中Key相同的元素的Value進行binary_function的reduce操作,是以,Key相同的多個元素的值被reduce為一個值,然後與原RDD中的Key組成一個新的KV對。

作用在K,V格式的RDD上,對key進行升序或者降序排序。

作用在K,V格式的RDD上。根據K進行連接配接,對(K,V)join(K,W)傳回(K,(V,W))

必須是兩個k、v格式的RDD進行join,且key必須相同

隻傳回相同key的

join後的分區數預設與父RDD分區數多的那一個相同。

如果join指定了分區數則按照指定的分區進行

注: 不設定的話分區數預設為1.

顯示左邊RDD所有的key和value,右邊RDD對應不上的用optional

顯示右邊RDD所有的key和value,左邊RDD對應不上的用optional

兩邊RDD均顯示,不存在用optional,顯示None

合并兩個資料集。兩個資料集的類型要一緻。

傳回新的RDD的分區數是合并RDD分區數的總和。

邏輯上合并,不會有實際的資料傳輸

取兩個資料集的交集,傳回新的RDD與父RDD分區多的一緻

取兩個資料集的差集,結果RDD的分區數與subtract前面的RDD的分區數一緻。

取左邊RDD中右邊RDD沒有的元素

去重

将兩個RDD中相同key的合在一起

當調用類型(K,V)和(K,W)的資料上時,傳回一個資料集(K,(Iterable<V>,Iterable<W>)),子RDD的分區與父RDD多的一緻。

zip函數用于将兩個RDD組合成Key/Value形式的RDD,這裡預設兩個RDD的partition數量以及元素數量都相同,否則會抛出異常。

将兩個RDD中的元素(KV格式/非KV格式)變成一個KV格式的RDD,**兩個RDD的每個分區元素個數必須相同。

該函數将RDD中的元素和這個元素在RDD中的ID(索引号)組合成鍵/值對。

增加或減少分區。寬依賴的算子,預設産生shuffle。(多個分區分到一個分區不會産生shuffle)

repartition底層其實就是coalesce指定了開啟shuffle,變成寬依賴

repartition()一般用于增加分區

重分區,可以将RDD的分區增大,也可以減少,預設不産生shuffle.

coalesce常用來減少分區,第二個參數是減少分區的過程中是否産生shuffle。true為産生shuffle,false不産生shuffle。預設是false。

Coalesec()一般用于減少分區。Coalesec()方法不産生shuffle的話增加分區就不起作用,但如果指定産生shuffle的話那就是repartition()。

如果coalesce設定的分區數比原來的RDD的分區數多的話,第二個參數設定為false時不會起作用,分區情況不會變化,如果設定成true,效果和repartition一樣。即repartition(numPartitions) = coalesce(numPartitions,true)

作用在K,V格式的RDD上。根據Key進行分組,将相同key的聚合到一起。作用在(K,V),傳回(K,Iterable <V>)。會産生shuffle

對每個key對應的多個value進行操作,但是隻是彙總生成一個sequence,本身不能自定義函數,隻能通過額外通過map(func)來實作。

Action類算子也是一類算子(函數)叫做行動算子,如foreach,collect,count等。

Transformations類算子是延遲執行,Action類算子觸發Transformations類算子執行。

一個application應用程式中有幾個Action類算子執行,就有幾個job運作。

傳回資料集中的元素數。會在結果計算完成後回收到Driver端。

循環周遊資料集中的每個元素,運作相應的邏輯。

take(n)傳回一個包含資料集前n個元素的集合。

first=take(1),傳回資料集中的第一個元素。

将計算結果回收到Driver端。

周遊的資料是每個partition的資料。

以分區為機關進行周遊 可以避免重複建立連接配接等

foreach以每一條資料為機關周遊

沒有傳回值(mapPartitions有傳回RDD)

必須作用到K,V格式的RDD上,根據Key計數相同Key的資料集元素。

根據資料集每個元素相同的内容來計數。傳回相同内容的元素對應的條數。(不必作用在kv格式上)

countByValue()方法将整個元組看做value,不是将逗号後面的内容看做value

根據聚合邏輯聚合資料集中的每個元素。

reduce将RDD中元素前兩個傳給輸入函數,産生一個新的return值,新産生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函數,直到最後隻有一個值為止。

持久化有三個算子:

Cache

Persist

以上算子都可以将RDD持久化,持久化的機關是partition。cache和persist都是懶執行的。必須有一個action類算子觸發執行。checkpoint算子不僅能将RDD持久化到磁盤,還能切斷RDD之間的依賴關系。

預設将資料存于記憶體中,相當于指定了隻是用記憶體存儲政策的persist

預設将RDD的資料持久化到記憶體中。**cache是懶執行。**需要Action算子觸發執行

cache和persist不需要指定目錄,spark叢集會預設為cache,persist建立目錄

cache和persist的目錄在application執行完後會清空

cache()=persist()=persist(StorageLevel.MEMORY_ONLY)

注意:cache和persist算子後不能立即緊跟action算子。(如:lines.cache().collect()是不行的)

可以手動指定持久化級别.最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本數。

持久化級别如下:

常用級别:

MEMORY_ONLY

MEMORY_ONLY_SER

MEMORY_AND_DISK 先往記憶體中放,不夠了再往磁盤(放磁盤的資料一定會序列化)放,以塊為機關

MEMORY_AND_DISK_SER

盡量避免使用"_2"的級别和DISK_ONLY級别

放磁盤的資料一定會序列化

堆外記憶體:不歸JVM管的記憶體就是堆外記憶體

Spark存儲資料可以指定副本個數

當RDD的lineage非常長,計算邏輯複雜時,可以對某個RDD進行checkpoint,會将目前的資料持久化到指定的磁盤目錄上.

checkpoint預設将RDD資料持久化到磁盤,還可以切斷RDD之間的依賴關系。

可以儲存資料到外部存儲系統。(可以解決當機問題,當機時儲存狀态,資料執行到哪裡了的問題),切斷依賴關系。

checkpoint需要自己指定目錄

checkpoint主要用在中繼資料儲存上

checkpoint目錄資料當application執行完之後不會被清除,不手動删除就會一直存在。

第二次再次通路同一個RDD時,就會從上次持久化的checkpoint持久化資料中擷取。

checkpoint相比cache和persist多了一個功能:存儲中繼資料.checkpoint真正用在持久化資料上用的其實并不多

可以對RDD進行checkpoint,将資料存儲在外部的檔案存儲系統,當spark application執行完成之後,資料不會被清除.正是這個特點,我們可以使用checkpoint儲存狀态.在sparkstreaming中應用多

作用:

将資料持久化到磁盤

儲存程式運作狀态

設定了checkpoint之後,會将這個RDD向前切斷RDD的依賴關系

一定要避免對RDD進行多次checkPoint。千萬不要經常的對RDD進行checkpoint,因為每次checkpoint一次就會存到磁盤一次,過多存儲就會和mapreduce在磁盤存儲一樣了,這樣會讓性能下降

當RDD的job執行完畢後,會從finalRDD從後往前回溯。

當回溯到某一個RDD調用了checkpoint方法,會對目前的RDD做一個标記。

回溯完成後,Spark架構會自動啟動一個新的job,重新計算這個checkpoint RDD的資料,将資料持久化到HDFS的指定目錄上。

下一步會向前切斷RDD的依賴關系,下次如果checkpoint标記的RDD後的RDD資料丢了後,可以使用checkpoint這個目錄去恢複後面的RDD的資料,不用再向前去找了,節省了時間

比如,要計算打五角星的這個RDD的資料,在這個job執行完成之後,會重新從資料的源頭重新計算一遍之前的一個RDD中的資料并存放于程式中指定的外部存儲目錄。預設是将資料持久化到磁盤。

當下次重新計算五角星的RDD中的資料時,會直接從checkPoint的RDD中恢複資料,而不會從之前的有依賴關系的RDD中計算資料

spark算子集合

當最後一個RDD的action執行完時,會重新開機一個job,從後往前回溯,一直回溯到源頭,找到那些被标記成checkpoint的RDD的資料,重新計算該資料,将該資料存放到程式中指定的外部存儲目錄(一般是HDFS上的某個目錄),下一步會向前切斷RDD的依賴關系,下次如果checkpoint标記的RDD後的RDD資料丢了後,可以基于checkpoint這個目錄去恢複後面的RDD的資料

對RDD執行checkpoint之前,最好對這個RDD先執行cache,這樣新啟動的job隻需要将記憶體中的資料拷貝到HDFS上就可以,省去了重新計算這一步。

對哪個RDD設定了checkPoint就對其進行cache持久化,action類算子觸發cache持久化将資料存入記憶體中。當建立的job進行回溯時發現記憶體中存在資料,就會直接将記憶體中資料持久化到外部存儲目錄中,就省去了重新計算的這一步。

一定要避免對RDD進行多次checkPoint。千萬不要經常的對RDD進行checkpoint,因為每次checkpoint一次就會存到磁盤一次,過多存儲就會和mapreduce在磁盤存儲一樣了,這樣會讓application執行緩慢

cache和persist都是懶執行,必須有一個action類算子觸發執行,最小機關是partition。

cache和persist算子的傳回值可以指派給一個變量,在其他job中直接使用這個變量就是使用持久化的資料了。持久化的機關是partition。

cache和persist算子後不能立即緊跟action算子。(如:lines.cache().collect()是不行的)

cache和persist算子持久化的資料當applilcation執行完成之後會被清除。

錯誤:rdd.cache().count() 傳回的不是持久化的RDD,而是一個數值了。

checkpoint相比cache和persist多了一個功能:存儲中繼資料.checkpoint真正用在持久化資料上用的其實并不多,還可以切斷RDD之間的依賴關系

checkpoint需要指定目錄,cache和persist不需要指定目錄,spark叢集會預設為cache,persist建立目錄

cache和persist的資料在application執行完後會自動清空

checkpoint的資料不會被清空

Checkpoint的資料可以由外部的存儲系統管理。程式運作結束後,目錄仍然不會被删除。

checkpoint由外部的存儲系統管理,persist是由Spark内部管理

checkpoint可以将狀态儲存到外部,Spark可以基于外部的存儲狀态恢複

reduce(binary_function)

具體過程,RDD有1 2 3 4 5 6 7 8 9 10個元素,

1+2=3

3+3=6

6+4=10

10+5=15

15+6=21

21+7=28

28+8=36

36+9=45

45+10=55

reduceByKey(binary_function)

那麼講到這裡,差不多函數功能已經明了了,而reduceByKey的是如何運作的呢?下面這張圖就清楚了揭示了其原理:

spark算子集合

亦即,它會在資料搬移以前,提前進行一步reduce操作。

可以實作同樣功能的還有GroupByKey函數,但是,groupbykey函數并不能提前進行reduce,也就是說,上面的處理過程會翻譯成這樣:

spark算子集合

是以在處理大規模應用的時候,應該使用reduceByKey函數。

reduceByKey()對于每個key對應的多個value進行了merge操作,最重要的是它能夠先在本地進行merge操作。merge可以通過func自定義。

groupByKey()也是對每個key對應的多個value進行操作,但是隻是彙總生成一個sequence,本身不能自定義函數,隻能通過額外通過map(func)來實作。

spark算子集合

使用reduceByKey()的時候,本地的資料先進行merge然後再傳輸到不同節點再進行merge,最終得到最終結果。

而使用groupByKey()的時候,并不進行本地的merge,全部資料傳出,得到全部資料後才會進行聚合成一個sequence,

groupByKey()傳輸速度明顯慢于reduceByKey()。

雖然groupByKey().map(func)也能實作reduceByKey(func)功能,但是,優先使用reduceByKey(func)

繼續閱讀