1.mapPartitions
映射分區。對rdd中的每個分區進行映射。
2.union
rdd1.union(rdd2)。該結果的分區數是rdd1和rdd2的分區數之和。
3.intersection
計算兩個rdd的交集,需要shuffle過程。交集後的rdd分區數是參與計算的兩個分區數的最大值。
4.distinct
去重。内部通過reduceByKey實作。
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
5.groupByKey
參與計算的rdd需要是一個二進制組。相同key的二進制組組合在一起,value形成一個集合。需要shuffle
6.reduceByKey(func)
參與計算的rdd需要是一個二進制組。相同key的二進制組組合在一起,根據給定的函數func,對value進行處理
7.aggregateByKey(U)(func1,func2)
将相同key的二進制組組合在一起。不同與groupByKey和reduceByKey。舉例可以讓此算子更容易被了解
var conf = new SparkConf().setAppName("evenodd").setMaster("local[4]")
conf.set("spark.default.parallelism","3")
var sc = new SparkContext(conf)
var rdd1 = sc.parallelize(1 to 7,2)
var rdd2 = rdd1.map(e=>{
var t = if (e%2 == 0) "even" else "odd"
(t,e)
})
val z:List[Int] = Nil
def f1(a:List[Int],b:Int):List[Int] = {
b :: a
}
def f2(a:List[Int],b:List[Int]) = {
a++b
}
var rdd3 = rdd2.aggregateByKey(z)(f1,f2)
這段代碼中,aggregateByKey的主要作用是将rdd2以相同key分組,然後将value放入List中。z變量是一個空的List,f1是将一個Int變量加入List,f2是将兩個List合并成為一個List。在每個分區内進行預聚合時,我們需要一個初始值,那就是z,然後,以z為初始value去一一合并分區内的相同key,當分區内聚合完成後,分區之間調用f2進行聚合。
8.join
類似于資料庫中的連接配接語句。join是内連接配接,另外,還有leftOuterJoin、rightOuterJoin、fullOuterJoin
9.cogroup
協分組。rdd1.cogroup(rdd2),将兩個rdd進行聯合分組。結果為(k,(List[v1],List[v2])),List[v1]是rdd1中以k為key的value的集合,同理,List[v2]是rdd2中以k為key的value的集合。
10.coalesce
再分區。可以将rdd重新分區。降低rdd分區數量時,可以選擇是否進行shuffle過程,對于大量資料,推薦不使用shuffle過程。增加rdd分區數量時,必須進行shuffle過程。
11.repartition
也是再分區,但是此算子必須使用shuffle過程。是以當減少分區數量時,建議使用coalesce算子不進行shuffle過程。當增加分區數量時,這兩種方法是一樣的。
12.repartitionAndSortWithinPartitions
分區并排序。
13.cartesian
兩個rdd的笛卡爾積。不需要shuffle。部分源代碼:
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
14.比較groupByKey和reduceByKey
groupByKey不會進行map端combine操作。
reduceByKey預設進行map端combine操作,減少網絡負載。
若兩種方式均可完成的操作,推薦使用reduceByKey
15.combineByKey
可以對每一個分區預聚合。當reduceByKey(func)完成不了,但是又想減少網絡帶寬,就要自定義combineByKey的參數。比如将相同key的value結果放入list代碼。舉例說明
def f1(x:Int):List[Int] = {
x :: Nil
}
def f2(x:List[Int],y:Int) = {
y :: x
}
def f3(x:List[Int],y:List[Int]) = {
x ++ y
}
val rdd5 = rdd1.combineByKey(f1 _,f2 _,f3 _,new HashPartitioner(2),mapSideCombine = true)
f1是将x:Int變成一個List,f2是将Int變量加入一個List,f3将兩個List合并。我們需要初始值,f1的作用就是生成初始值,在map端,f2會發揮作用,生成List,shuffle之後,f3方法會發揮作用,将兩個List合并。若mapSideCombine參數為false,則不會在map端聚合。f3函數不會生效。