天天看點

Spark為什麼隻有在調用action時才會觸發任務執行呢(附算子優化和使用示例)?

Spark算子主要劃分為兩類:transformation和action,并且隻有action算子觸發的時候才會真正執行任務。還記得之前的文章《Spark RDD詳解》中提到,Spark RDD的緩存和checkpoint是懶加載操作,隻有action觸發的時候才會真正執行,其實不僅是Spark RDD,在Spark其他元件如SparkStreaming中也是如此,這是Spark的一個特性之一。像我們常用的算子map、flatMap、filter都是transformation算子,而collect、count、saveAsTextFile、countByKey、foreach則為action算子。

Spark為什麼隻有在調用action時才會觸發任務執行呢(附算子優化和使用示例)?

1.導緻map執行完了要立即輸出,資料也必然要落地(記憶體和磁盤)

2.map任務的生成、排程、執行,以及彼此之間的rpc通信等等,當牽扯到大量任務、大資料量時,會很影響性能

看到這兩點是不是很容易聯想到MapReduce的計算模型,MapReduce因為中間結果需要落地,導緻性能相對Spark較低下,這也是MapReduce廣為诟病的原因之一。是以Spark采用隻有調用action算子時才會真正執行任務,這是相對于MapReduce的優化點之一。

但是每個Spark RDD中連續調用多個map類算子,Spark任務是對資料在一次循環周遊中完成還是每個map算子都進行一次循環周遊呢?

答案很确定:不需要對每個map算子都進行循環周遊。Spark會将多個map算子pipeline起來應用到RDD分區的每個資料元素上(後續将要介紹的SparkSQL中的Dataset/DataFrame也是如此)

下面說幾個算子的優化,這也是面試中經常問的問題:

在我們實際的業務場景中經常會使用到根據key進行分組聚合的操作,當然熟悉Spark算子使用的都知道像reduceByKey、groupByKey、aggregateByKey、combineByKey大多都能滿足需求。但是筆者在這裡還是要重點說一下,因為很多人想到分組聚合往往第一個想到的算子就是groupByKey,但是groupByKey相對其他算子性能低并且處理不好的情況下,容易發生資料傾斜。是以我們能用其他算子比如reduceByKey替代groupByKey實作滿足我們業務需求的,就一律不用groupByKey。當然reduceByKey在某些場景下性能會比aggregateByKey低,具體算子的替換要結合實際業務需求場景來定。

這裡主要說明一下reduceByKey和groupByKey的對比,以及幾個算子替代的場景示例:

1.首先這幾個“ByKey”的算子會觸發shullfe,這裡強調一點,對于分布式任務,如果存在聚合操作的話往往都是要進行shuffle的

2.相對于reduceByKey,groupByKey沒有預先聚合,而是直接将相同key的value進行分組然後再聚合造成shuffle耗費嚴重;而reduceByKey會先在map端進行局部聚合,然後再在reduce端再次聚合,這點類似于MapReduce中combiner元件,可以減少磁盤IO和網絡IO,提高性能

3.aggregateByKey替代reduceByKey的場景:當輸出的結果和輸入的結果不同的時候可以被替換。例如,查找同一個key的所有不同的value值,也即是先根據key進行分組,然後去重。假設采用reduceByKey實作的話,需要先用map講單個元素裝到set裡,然後在針對set進行reduceByKey,僞代碼:rdd.map(case(k,v) => (k, Set(v))).reduceByKey(_ ++ _),但是該過程會導緻為每個記錄建立一個set,這是很沒必要的。此時我們可以使用aggregateByKey替代reduceByKey實作該需求,僞代碼:

val zero = mutable.Set[String]()

rdd.aggregateByKey(zero)((set, v) => set += v,(set1, set2) => set1 ++= set2)

具體示例:

1)reduceByKey

val rdd = rowRdd.map { row =>

val id = row.getAs[String]("id")
  val name = row.getAs[String]("name")
  val count = row.getAs[Long]("count")
  (id, (name, count))
}.map { case (id, (name, count)) => (id, Array(count)) }.reduceByKey(_ ++ _)
           

2)aggregateByKey

val zeroValue = mutable.Set[(String, Long)]()

val rdd = df.rdd.map { row =>

val id = row.getAs[String]("id")
  val name = row.getAs[String]("name")
  val count = row.getAs[Long]("count")
  (id, (name, count))           

}.aggregateByKey(zeroValue)(

(set, v) => set += v,
  (set1, set2) => set1 ++= set2)
           

3)combineByKey

val rdd = df.rdd.map { row =>

val id = row.getAs[String]("id")
  val name = row.getAs[String]("name")
  val count = row.getAs[Long]("count")
  (id, (name, count))           

}.combineByKey(

(v: (String, Long)) => List(v),
  (c: List[(String, Long)], v: (String, Long)) => v :: c,
  (c1: List[(String, Long)], c2: List[(String, Long)]) => c1 ::: c2)
           

4.當兩個資料集已經按照key進行分組,此時想對兩個資料集在仍然保持分組的基礎上進行join,則可以使用cgroup,以避免分組展開然後再次分組的開銷

Spark目前提供了80多種算子,想熟練掌握這些算子如何運用,筆者建議學習一下Scala語言,原因除了《Spark通識》中說的那兩點之外,還有就是Spark提供的很多算子跟Scala本身提供的函數功能很相似甚至有些名字都是一樣的,了解了Scala提供的,對于學習Spark算子将事半功倍。這裡舉一些常用的transformation和action使用示例:

transformation

map

map是對RDD中的每個元素都執行一個指定的函數來産生一個新的RDD。任何原RDD中的元素在新RDD中都有且隻有一個元素與之對應。

舉例:

val a = sc.parallelize(1 to 9, 3)

val b = a.map(x => x*2)

a.collect 【Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)】

b.collect 【Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)】

filter

filter是對RDD中的每個元素都執行一個指定的函數來過濾産生一個新的RDD,該RDD由經過函數處理後傳回值為true的輸入元素組成。任何原RDD中的元素在新RDD中都有且隻有一個元素與之對應。

val rdd = sc.parallelize(List(1,2,3,4,5,6))

val filterRdd = rdd.filter(_ > 3)

filterRdd.collect() 【傳回所有大于3的資料的:Array(6,8,10,12)】

flatMap

與map類似,差別是原RDD中的元素經map處理後隻能生成一個元素,而原RDD中的元素經flatmap處理後可生成多個元素來建構新RDD。舉例:對原RDD中的每個元素x産生y個元素(從1到y,y為元素x的值)

val a = sc.parallelize(1 to 4, 2)

val b = a.flatMap(x => 1 to x)

b.collect 【Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)】

reduceByKey和sortByKey

分組聚合與排序,這裡以單詞統計,并按單詞排序為例

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))

val rdd3 = rdd1.union(rdd2)

//按key進行聚合(PairRDDFunctions提供)

val rdd4 = rdd3.reduceByKey(_ + _)

//false降序,預設true(OrderedRDDFunctions提供)

val rdd5 = rdd4.sortByKey(false)

repartition

該函數其實就是coalesce函數第二個參數為true的實作,改變分區數會産生shuffle,repartition之後會傳回一個新的RDD

var data = sc.parallelize(1 to 12, 3) //分區數3

var rdd1 = data.repartition(1) //分區數1

var rdd1 = data.repartition(4) //4

data.partitions.size 還是3

action

first

first傳回RDD中的第一個元素,不排序。

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

rdd1.first 【 (A,1) 】

var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))

rdd1.first 【 10 】

count

count傳回RDD中的元素數量。

rdd1.count 【 3 】

take

take用于擷取RDD中從0到num-1下标的元素,不排序。

rdd1.take(1) 【 Array(10) 】

rdd1.take(2) 【 Array(10, 4) 】

像各種save操作,如saveAsNewAPIHadoopDataset都是action算子,這裡就不一一列舉了。

繼續閱讀