天天看點

Spark中RDD的算子1、算子的簡單介紹2、算子的使用

1、算子的簡單介紹

Transformation(轉換)算子:根據資料集建立一個新的資料集,計算後傳回一個新RDD,例如一個rdd進行map操作後生了一個新的rdd。

Action(動作)算子:對rdd結果計算後傳回一個數值value給驅動程式(driver),例如collect算子将資料集的所有元素收集完成傳回給驅動程式。

控制算子:對資料集進行特殊操作,例如cache算子将對于重複使用的算子,進行cache做緩存使用,資料隻儲存在記憶體中,性能提升。

懶執行:Spark中轉化算子和控制算子是懶執行的,需要Action算子觸發才能執行。

懶執行就是延遲計算的意思,就像是建立了一個視圖,他并不是把查詢好的資料放入視圖了,而是當你需要這些資料時,檢視視圖時,他才執行定義視圖時候的SQL語句。

注意:

Driver即運作Application的main()函數并且建立SparkContext。

Application使用者編寫的Spark應用程式。

SparkContext整個應用的上下文、控制應用的生命周期。

job即在每一個application中,有幾個action,就會産生幾個job。

2、算子的使用

2.1、常用的Transformation算子

val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val pairRdd = sc.parallelize(List((1,1), (5,10), (5,9), (2,4), (3,5), (3,6),(4,7), (4,8),(2,3), (1,2)),4)
    //map(函數),一一映射,分區數量不變,有多少條資料,就被會運作多少次。
    val value1: RDD[(Int, Int)] = pairRdd.map(x => (x._1, x._2 + 1))

    //定義一個疊代函數
    val f1 = (iter:Iterator[(Int, Int)]) => iter.map(x=>(x._1,x._2*2))
    //mapPartitions函數可以認為是Map的變種,可以對分區進行并行處理,兩者的差別是調用的顆粒度不一樣,map的輸入函數是應用于RDD的每個元素,而mapPartition的輸入函數是應用于RDD的每個分區。
    val value2: RDD[(Int, Int)] = pairRdd.mapPartitions(f1)

    //定義一個疊代函數
    val f2 = (index:Int,iter:Iterator[(Int, Int)]) => iter.map((index,_))
    //mapPartitionsWithIndex函數類似于mapPartitions,但func帶有一個整數參數表示分片的索引值,是以在類型為T的RDD上運作時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]
    val value3: RDD[(Int, (Int, Int))] = pairRdd.mapPartitionsWithIndex(f2)

    //filter函數,傳回一個新的RDD,該RDD由經過func函數計算後傳回值為true的輸入元素組成,fitler并不會改變分區的數量,之前有幾個,現在仍然有幾個分區。
    val value4: RDD[(Int, Int)] = pairRdd.filter(x => x._2 % x._1 == 0)

    //flatMap函數: map之後,再flatten。每一個輸入元素可以被映射為0或多個輸出元素(是以func應該傳回一個序列,而不是單一進制素)
    val value5: RDD[Char] = pairRdd.flatMap(x => x._1.toString + x._2.toString)

    //groupByKey函數:在一個(K,V)的RDD上調用,傳回一個(K, Iterator[V])的RDD,可重新指定分區個數
    val value6: RDD[(Int, Iterable[Int])] = pairRdd.groupByKey(1)

    //groupBy函數與groupByKey類似,底層調用groupByKey,傳回一個(K, Iterator[V])的RDD,可重新指定分區個數
    val value7: RDD[(Int, Iterable[(Int, Int)])] = pairRdd.groupBy(_._1, 1)
    //groupBy後要搭配MapValues()使用

    //reduceByKey函數:在一個(K,V)的RDD上調用,傳回一個(K,V)的RDD,使用指定的reduce函數,将相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設定
    val value8: RDD[(Int, Int)] = pairRdd.reduceByKey(_ + _, 1)

    //aggregateByKey 聚合類算子:初始化在每一個分區聚合中參數運作,但是在全局聚合中,不參與,類似于reduceByKey,但多個初始值
    val value11: RDD[(Int, Int)] = pairRdd.aggregateByKey(100)(_ + _, _ + _)

    //sortByKey函數:在一個(K,V)的RDD上調用,K必須實作Ordered接口,傳回一個按照key進行排序的(K,V)的RDD,是全局排序,可指定分區數量
    //可設定排序規則,預設是從小到大排序,是true, 如果想從大到小排,設定為false
    val value12: RDD[(Int, Int)] = pairRdd.sortByKey(false,2)

    //sortBy函數:與sortByKey類似,但是更靈活,可設定排序的字段
    val value13: RDD[(Int, Int)] = pairRdd.sortBy(_._1, false, 2)

    //union函數:對源RDD和參數RDD求并集後傳回一個新的RDD
    val value14: RDD[(Int, Int)] = value1.union(value2)

    //intersection函數:對源RDD和參數RDD求交集後傳回一個新的RDD
    val value15: RDD[(Int, Int)] = value1.intersection(value2)

    //subtract函數:對源RDD和參數RDD求差集後傳回一個新的RDD
    val value16: RDD[(Int, Int)] = value1.subtract(value2)

    //join函數:對源RDD和參數RDD進行join傳回一個新的RDD
    val value17: RDD[(Int, (Int, Int))] = value1.join(value2)

    //cogroup函數:對源RDD和參數RDD進行全外關聯傳回一個新的RDD,相當于SQL中的全外關聯full outer join,傳回左右RDD中的記錄,關聯不上的為空。
    val value18: RDD[(Int, (Iterable[Int], Iterable[Int]))] = value1.cogroup(value2)

    //cartesian函數:對源RDD和參數RDD進行笛卡爾積傳回一個新的RDD
    val value19: RDD[((Int, Int), (Int, Int))] = value1.cartesian(value2)

    //coalesce:對RDD重新進行分區
    val value20: RDD[((Int, Int), (Int, Int))] = value19.coalesce(10)

    //repartition:類似于coalesce方法,底層就是用的coalesce方法,對RDD重新進行分區
    val value21: RDD[((Int, Int), (Int, Int))] = value20.repartition(5)

    //去重:distinct(分區數量)。
    val value22: RDD[((Int, Int), (Int, Int))] = value19.distinct(12)
    //等價于
    val value23: RDD[((Int, Int), (Int, Int))] =value19.map(x=>(x,null)).reduceByKey((x,y) => x).map(_._1)
           

2.2、常用的ACtion算子

//reduce函數與reduceByKey類似,傳回一個結果的RDD,+ ,++ 取決rdd的元素類型,string類型使用++
    val value9: Int = pairRdd.map(_._2).reduce(_ + _)

    //aggregate 聚合類算子:存在兩次聚合。局部聚合和全局聚合。
    //先局部計算(100+分區一的内容)  (100+分區二的内容)+...
    //再全局計算100+(100+分區一的内容)+ (100+分區二的内容)+...
    val value10: Int = pairRdd.map(_._2).aggregate(100)(_ + _, _ + _)

    //collect 在驅動程式中,以數組的形式傳回資料集的所有元素
    val tuples: Array[((Int, Int), (Int, Int))] = value21.collect()

    //foreach 在資料集的每一個元素上,運作函數func
    val unit: Unit = value11.foreach(println(_))

    //foreach 疊代的是每一個分區的資料
    //如果我們需要去擷取mysql的連接配接,RDD中有10000 條資料,有10個分區。
    //foreach:擷取10000次連接配接,性能低下。
    //foreapartition: 隻需要擷取10次連接配接,每一個分區中的資料,共用一個連接配接。 性能優越。
    value21.foreachPartition(println(_))

    //saveAsTextFile 将資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對于每個元素,Spark将會調用toString方法,将它裝換為檔案中的文本
    value21.saveAsTextFile("路徑")

    //count()	傳回RDD的元素個數
    val tuples1: Array[((Int, Int), (Int, Int))] = value19.collect()

    //first()	傳回RDD的第一個元素(類似于take(1))
    val tuple: ((Int, Int), (Int, Int)) = value19.first()

    //take(n)	傳回一個由資料集的前n個元素組成的數組
    val tuples2: Array[((Int, Int), (Int, Int))] = value19.take(10)
           

注意:

sortBy 是按照 RangePartitioner作為分區器

groupByKey  和 reduceByKey 中優先使用reduceByKey,因為reduceByKey會有一個局部的聚合,性能更好。

ByKey的都是轉換算子,沒有Key的都是action類算子。aggregateByKey和reduceByKey是轉化算子,aggregate和reduce是執行算子

2.3、控制算子

cache 對于重複使用的算子,進行cache做緩存使用,資料隻儲存在記憶體中,性能提升
persist 性能提升
checkPoint 資料容錯,當資料計算的時候,機器挂了,重新追溯到checkPoint的目錄下checkPoint是将RDD持久化到磁盤中,還可以切斷RDD之間的依賴關系

繼續閱讀