天天看點

Pyspark實戰(三)wordcount算子分析

Pyspark的本質還是調用scala的jar包,我們以上篇文章wordcount為例,其中一段代碼為:

rdd.flatMap(lambda x:x.split( )).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).foreach(lambda x:print(x))

其中:flatMap,map為轉換算子。

reduceByKey,foreach為執行算子,當rdd添加轉換算子的時候,rdd本身不會做任何操作,當執行算子添加時才會執行轉換算子。

我們把代碼定位到rdd.py的map,flatMap,源代碼如下:

def map(self, f, preservesPartitioning=False):

    """

    Return a new RDD by applying a function to each element of this RDD.



    >>> rdd = sc.parallelize(["b", "a", "c"])

    >>> sorted(rdd.map(lambda x: (x, 1)).collect())

    [('a', 1), ('b', 1), ('c', 1)]

    """

    def func(_, iterator):

        return map(f, iterator)

    return self.mapPartitionsWithIndex(func, preservesPartitioning)      
def flatMap(self, f, preservesPartitioning=False):

    """

    Return a new RDD by first applying a function to all elements of this

    RDD, and then flattening the results.



    >>> rdd = sc.parallelize([2, 3, 4])

    >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())

    [1, 1, 1, 2, 2, 3]

    >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())

    [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]

    """

    def func(s, iterator):

        return chain.from_iterable(map(f, iterator))

    return self.mapPartitionsWithIndex(func, preservesPartitioning)      

map需要兩個參數,第一個參數為f,第二個從字面意思是分片數量。那麼f是什麼類型呢?我們從scala源代碼看可能更清楚一些:

/**

 * Return a new RDD by applying a function to all elements of this RDD.

 */

def map[U: ClassTag](f: T => U): RDD[U] = withScope {

  val cleanF = sc.clean(f)

  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))

}      

map是一個泛型方法,這裡的U類型實際上可以是所有類型,這裡清楚的标明f的類型:f: T => U,f是一個參數為T, U為傳回值的匿名函數,算子最後傳回一個新的rdd

/**

 *  Return a new RDD by first applying a function to all elements of this

 *  RDD, and then flattening the results.

 */

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {

  val cleanF = sc.clean(f)

  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))

}      

flat相比map,多了一步處理,就是将傳回的結果U進行TraversableOnce處理,意思是将U類型的集合分散并合并為一個新的集合。

是以,我們再回頭看看代碼:

rdd=sc.textFile(txtfile)

rdd是一個集合,集合的要素是文本檔案的一行資料,類似于Array[line]。

rdd.flatMap(lambda x:x.split( )).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).foreach(lambda x:print(x))

rdd.flatMap(lambda x:x.split( )).的意思是先将每個line通過空格分開,這時候line傳回的是Array[char],最後通過TraversableOnce處理,多個Array[char]傳回一個Array[char]

map(lambda x:(x,1))的意思是将每一個值轉換成key,value對象,x為Array[char]的char值。

reduceByKey(lambda x,y:x+y)根據key值計算,相同k值相加運算。

Foreach周遊因子。

繼續閱讀