天天看點

spark筆記(一)--常用spark算子

1.mapPartitions

        映射分區。對rdd中的每個分區進行映射。

2.union

        rdd1.union(rdd2)。該結果的分區數是rdd1和rdd2的分區數之和。

3.intersection

        計算兩個rdd的交集,需要shuffle過程。交集後的rdd分區數是參與計算的兩個分區數的最大值。

4.distinct

        去重。内部通過reduceByKey實作。

        map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

5.groupByKey

        參與計算的rdd需要是一個二進制組。相同key的二進制組組合在一起,value形成一個集合。需要shuffle

6.reduceByKey(func)

       參與計算的rdd需要是一個二進制組。相同key的二進制組組合在一起,根據給定的函數func,對value進行處理

7.aggregateByKey(U)(func1,func2)

        将相同key的二進制組組合在一起。不同與groupByKey和reduceByKey。舉例可以讓此算子更容易被了解

        var conf = new SparkConf().setAppName("evenodd").setMaster("local[4]")
        conf.set("spark.default.parallelism","3")
        var sc = new SparkContext(conf)
        var rdd1 = sc.parallelize(1 to 7,2)
        var rdd2 = rdd1.map(e=>{
            var t = if (e%2 == 0) "even" else "odd"
            (t,e)
        })
        val z:List[Int] = Nil
        def f1(a:List[Int],b:Int):List[Int] = {
              b :: a
        }
        def f2(a:List[Int],b:List[Int]) = {
            a++b
        }
        var rdd3 = rdd2.aggregateByKey(z)(f1,f2)
           

        這段代碼中,aggregateByKey的主要作用是将rdd2以相同key分組,然後将value放入List中。z變量是一個空的List,f1是将一個Int變量加入List,f2是将兩個List合并成為一個List。在每個分區内進行預聚合時,我們需要一個初始值,那就是z,然後,以z為初始value去一一合并分區内的相同key,當分區内聚合完成後,分區之間調用f2進行聚合。

8.join

        類似于資料庫中的連接配接語句。join是内連接配接,另外,還有leftOuterJoin、rightOuterJoin、fullOuterJoin

9.cogroup

        協分組。rdd1.cogroup(rdd2),将兩個rdd進行聯合分組。結果為(k,(List[v1],List[v2])),List[v1]是rdd1中以k為key的value的集合,同理,List[v2]是rdd2中以k為key的value的集合。

10.coalesce

        再分區。可以将rdd重新分區。降低rdd分區數量時,可以選擇是否進行shuffle過程,對于大量資料,推薦不使用shuffle過程。增加rdd分區數量時,必須進行shuffle過程。

11.repartition

        也是再分區,但是此算子必須使用shuffle過程。是以當減少分區數量時,建議使用coalesce算子不進行shuffle過程。當增加分區數量時,這兩種方法是一樣的。

12.repartitionAndSortWithinPartitions

        分區并排序。

13.cartesian

        兩個rdd的笛卡爾積。不需要shuffle。部分源代碼:

for (x <- rdd1.iterator(currSplit.s1, context);
    y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
           

14.比較groupByKey和reduceByKey

        groupByKey不會進行map端combine操作。

reduceByKey預設進行map端combine操作,減少網絡負載。

        若兩種方式均可完成的操作,推薦使用reduceByKey

15.combineByKey

        可以對每一個分區預聚合。當reduceByKey(func)完成不了,但是又想減少網絡帶寬,就要自定義combineByKey的參數。比如将相同key的value結果放入list代碼。舉例說明

        def f1(x:Int):List[Int] = {
            x :: Nil
        }
        def f2(x:List[Int],y:Int) = {
            y :: x
        }
        def f3(x:List[Int],y:List[Int]) = {
            x ++ y
        }

        val rdd5 = rdd1.combineByKey(f1 _,f2 _,f3 _,new HashPartitioner(2),mapSideCombine = true)
           

f1是将x:Int變成一個List,f2是将Int變量加入一個List,f3将兩個List合并。我們需要初始值,f1的作用就是生成初始值,在map端,f2會發揮作用,生成List,shuffle之後,f3方法會發揮作用,将兩個List合并。若mapSideCombine參數為false,則不會在map端聚合。f3函數不會生效。

繼續閱讀