mapPartitions/mapPartitionsWithIndex
這兩個transform中:mapPartitions與map的差別是map中是對每個partition中的iterator執行map操作,對map過程中的每一條record進行傳入的function的處理,而mapPartitions是把partition中整個iterator傳給function進行處理.如果是map操作,你并不能知道這個iterator什麼時候結束,但mapPartitions時給你的是一個iterator,是以你的函數中知道這個iterator什麼時候會結束.而mapPartitionsWithIndex的函數是在mapPartitions的基礎上,多了一個傳入參數,這個傳入參數就是對應的partition的index.
mapPartitions的函數定義:
def mapPartitions[U: ClassTag](
這個函數的定義部分為一個參數,是對應partition的資料的iterator,
f: Iterator[T] => Iterator[U],
這個參數false表示在生成的MapPartitionsRDD中不包含partitioner算子.
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
直接生成一個MapPartitionsRDD,在生成這個rdd時,通過傳入的function,這個function與map對應的function不同的是,function直接拿到一個iterator進行操作.
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
mapPartitionsWithIndex的函數定義:
這個函數的處理方法可以看到與mapPartitions基本上相同,不同的地方是f(function)的定義部分,此部分多出一個int類型的參數,這個參數是對應的iterator所在的partition的index.
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}