天天看點

[Spark進階]-- spark RDD中foreachPartition和foreach說明

主題:RDD的foreachPartition/foreach的操作

說明:這兩個action主要用于對每個partition中的iterator時行疊代的處理.通過使用者傳入的function對iterator進行内容的處理.

一、foreach的操作

foreach中,傳入一個function,這個函數的傳入參數就是每個partition中,每次的foreach得到的一個rdd的kv執行個體,也就是具體的内容,

這種處理你并不知道這個iterator的foreach什麼時候結果,隻能是foreach的過程中,你得到一條資料,就處理一條資料.

由下面的紅色部分可以看出,foreach操作是直接調用了partition中資料的foreach操作:

def foreach(f: T => Unit): Unit = withScope {
   val cleanF = sc.clean(f)
   sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
 }      

示例說明:

val list = new ArrayBuffer()
Rdd.foreach(record => {
  list += record
  If (list.size >= 10000) {
    list.flush
  }
})      

上面這段示例代碼中,如果會存在一個問題,疊代的最後,list的結果可能還沒有達到10000條,這個時候,

你在内部的處理的flush部分就不會執行,也就是疊代的最後如果沒有達到10000的資料就會丢失.

是以在foreach中,一般就是拿到一條資料進行下處理Rdd.foreach(record => {record._1 == a return})

二、foreachPartition操作

這個函數也是根據傳入的function進行處理,但不同之處在于,這裡function的傳入參數是一個partition對應資料的iterator.

而不是直接使用iterator的foreach,這種情況下,如果是上面foreach的示例代碼中list這個片段在這個action中就能夠正常的去處理.

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
   val cleanF = sc.clean(f)
   sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
 }      

示例代碼:

Val list = new ArrayBuffer
rdd.foreachPartition(it => {
  It.foreach(r => {
List += r
If (list.size > 10000) flush
  })
  If (list.size > 0) flush
})      

最後說下這兩個action的差別:

Foreach與ForeachPartition都是在每個partition中對iterator進行操作,

不同的是,foreach是直接在每個partition中直接對iterator執行foreach操作,而傳入的function隻是在foreach内部使用,

而foreachPartition是在每個partition中把iterator給傳入的function,讓function自己對iterator進行處理(可以避免記憶體溢出).

繼續閱讀