天天看點

spark transform系列__mapPartitions

mapPartitions/mapPartitionsWithIndex

這兩個transform中:mapPartitions與map的差別是map中是對每個partition中的iterator執行map操作,對map過程中的每一條record進行傳入的function的處理,而mapPartitions是把partition中整個iterator傳給function進行處理.如果是map操作,你并不能知道這個iterator什麼時候結束,但mapPartitions時給你的是一個iterator,是以你的函數中知道這個iterator什麼時候會結束.而mapPartitionsWithIndex的函數是在mapPartitions的基礎上,多了一個傳入參數,這個傳入參數就是對應的partition的index.

mapPartitions的函數定義:

def mapPartitions[U: ClassTag](

這個函數的定義部分為一個參數,是對應partition的資料的iterator,

    f: Iterator[T] => Iterator[U],

這個參數false表示在生成的MapPartitionsRDD中不包含partitioner算子.

    preservesPartitioning: Boolean = false): RDD[U] = withScope {

  val cleanedF = sc.clean(f)

直接生成一個MapPartitionsRDD,在生成這個rdd時,通過傳入的function,這個function與map對應的function不同的是,function直接拿到一個iterator進行操作.

  new MapPartitionsRDD(

    this,

    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),

    preservesPartitioning)

}

mapPartitionsWithIndex的函數定義:

這個函數的處理方法可以看到與mapPartitions基本上相同,不同的地方是f(function)的定義部分,此部分多出一個int類型的參數,這個參數是對應的iterator所在的partition的index.

def mapPartitionsWithIndex[U: ClassTag](

    f: (Int, Iterator[T]) => Iterator[U],

    preservesPartitioning: Boolean = false): RDD[U] = withScope {

  val cleanedF = sc.clean(f)

  new MapPartitionsRDD(

    this,

    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),

    preservesPartitioning)

}

繼續閱讀