天天看點

RDD Transformation和Action源碼剖析

wordcount.todebugstring檢視rdd的繼承鍊條

是以廣義的講,對任何函數進行某一項操作都可以認為是一個算子,甚至包括求幂次,開方都可以認為是一個算子,隻是有的算子我們用了一個符号來代替他所要進行的運算罷了,是以大家看到算子就不要糾結,他和f(x)的f沒差別,它甚至和加減乘除的基本運算符号都沒有差別,隻是他可以對單對象操作罷了(有的符号比如大于、小于号要對多對象操作)。又比如取機率p{x<x},機率是集合{x<x}(他是屬于實數集的子集)對[0,1]區間的一個映射,我們知道實數域和[0,1]區間是可以一一映射的(這個後面再說),是以取機率符号p,我們認為也是一個算子,和微分,積分算子算子沒差別。

總而言之,算子就是映射,就是關系,就是**變換**!

**mappartitions(f)**

f函數的輸入輸出都是每個分區集合的疊代器iterator

def mappartitions[u](f: (iterator[t]) => iterator[u], preservespartitioning: boolean = false)(implicit arg0: classtag[u]): rdd[u]

該函數和map函數類似,隻不過映射函數的參數由rdd中的每一個元素變成了rdd中每一個分區的疊代器。如果在映射的過程中需要頻繁建立額外的對象,使用mappartitions要比map高效的過。

比如,将rdd中的所有資料通過jdbc連接配接寫入資料庫,如果使用map函數,可能要為每一個元素都建立一個connection,這樣開銷很大,如果使用mappartitions,那麼隻需要針對每一個分區建立一個connection。

參數preservespartitioning表示是否保留父rdd的partitioner分區資訊。

參考文章:

<a href="http://lxw1234.com/archives/2015/07/348.htm" target="_blank">http://lxw1234.com/archives/2015/07/348.htm</a>

union(other: rdd[t])操作不去重,去重需要distinct()

subtract取兩個rdd中非公共的元素

sample傳回rdd,takesample直接傳回數組(數組裡面的元素為rdd中元素,類似于collect)

keyvalue之類的操作都在**pairrddfunctions.scala**中

mapvalues隻對value進行運算

groupby相同key的元素的value組成集合

cogroup是在groupby的基礎上

cogroup操作多個rdd,是兩個rdd裡相同key的兩個value集合組成的元組

RDD Transformation和Action源碼剖析

<a href="http://www.iteblog.com/archives/1280%5d(http://www.iteblog.com/archives/1280)" target="_blank">http://www.iteblog.com/archives/1280</a>

**combinebykey和reducebykey,groupbykey(内部都是通過combinebykey)**

源碼分析:

    reducebykey  mapsidecombine: boolean = true

    groupbykey  mapsidecombine=false

**join操作**

本質是先cogroup再笛卡爾積

      def join[w](other: rdd[(k, w)], partitioner: partitioner): rdd[(k, (v, w))] = {

    this.cogroup(other, partitioner).flatmapvalues( pair =&gt;

      for (v &lt;- pair._1.iterator; w &lt;- pair._2.iterator) yield (v, w)

    )

      }

RDD Transformation和Action源碼剖析

**yield** 關鍵字的簡短總結:

    針對每一次 for 循環的疊代, yield 會産生一個值,被循環記錄下來 (内部實作上,像是一個緩沖區).

    當循環結束後, 會傳回所有 yield 的值組成的集合.

    傳回集合的類型與被周遊的集合類型是一緻的.

<a href="http://unmi.cc/scala-yield-samples-for-loop/" target="_blank">http://unmi.cc/scala-yield-samples-for-loop/</a>

cache persist也是lazy級别的

action本質sc.runjob

foreach

collect()相當于toarray傳回一個數組

collectasmap()對keyvalue類型的rdd操作傳回一個hashmap,key重複後面的元素會覆寫前面的元素reduce

源碼解析:先調用collect()再放到hashmap[k, v]中

     def collectasmap(): map[k, v] = {

    val data = self.collect()

    val map = new mutable.hashmap[k, v]

    map.sizehint(data.length)

    data.foreach { pair =&gt; map.put(pair._1, pair._2) }

    map

**reducebykeylocally**相當于reducebykey+collectasmap()

該函數将rdd[k,v]中每個k對應的v值根據映射函數來運算,運算結果映射到一個map[k,v]中,而不是rdd[k,v]。

<a href="http://lxw1234.com/archives/2015/07/360.htm" target="_blank">http://lxw1234.com/archives/2015/07/360.htm</a>

**lookup**也是針對keyvalue傳回指定key對應的value形成的seq

    def lookup(key: k): seq[v] 

**reduce fold(每個分區是串行,有個初始值) aggregate(并行,與fold類似)**

前兩個元素作用的結果與第三元素作用依次類推

**sequencefile**檔案是hadoop用來存儲二進制形式的key-value對而設計的一種平面檔案(flat file)。目前,也有不少人在該檔案的基礎之上提出了一些hdfs中小檔案存儲的解決方案,他們的基本思路就是将小檔案進行合并成一個大檔案,同時對這些小檔案的位置資訊建構索引。不過,這類解決方案還涉及到hadoop的另一種檔案格式——**mapfile**檔案。sequencefile檔案并不保證其存儲的key-value資料是按照key的某個順序存儲的,同時不支援append操作。

**saveastextfile**-&gt;textoutputformat  (key為null,value為元素tostring)

**saveasobjectfile**(二進制)-&gt;saveassequencefile-&gt;sequencefileoutputformat(key為null,value為byteswritable)

cache\persist   

**checkpoint()**機制避免緩存丢失(記憶體不足)要重新計算帶來的性能開銷,會導緻另外一個作業,比緩存更可靠

sparkcontex.setcheckpointdir設定目錄位置