Spark2.3.0版本: Spark2.3.0 RDD操作 RDD支援兩種類型的操作:
轉移(transformations):從現有資料集建立一個新資料集
動作(actions):在資料集上進行計算後将值傳回給驅動程式
例如,map是一個轉移操作,傳遞給每個資料集元素一個函數并傳回一個新RDD表示傳回結果。 另一方面,reduce是一個動作操作,使用一些函數聚合RDD的所有元素并将最終結果傳回給驅動程式(盡管還有一個并行的reduceByKey傳回分布式資料集)。
在 Spark 中,所有的轉換操作(transformations)都是惰性(lazy)的,它們不會馬上計算它們的結果。相反的,它們僅僅記錄轉換操作是應用到哪些基礎資料集(例如一個檔案)上的(remember the transformations applied to some base dataset )。隻有當動作(action)操作 需要傳回一個結果給驅動程式的時候, 轉換操作才開始計算。 這個設計能夠讓 Spark 運作得更加高效。例如,我們可以知道:通過 map 建立的新資料集将在 reduce 中使用,并且僅僅傳回 reduce 的結果給驅動程式,而不是将整個大的映射過的資料集傳回。
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#1-%E5%9F%BA%E7%A1%80 1. 基礎
為了說明RDD基礎知識,請考慮以下簡單程式:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
第一行定義了一個來自外部檔案的基本RDD。 這個資料集并未加載到記憶體中或做其他處理:lines 僅僅是一個指向檔案的指針。 第二行将lineLength定義為map轉換函數的結果。 其次,由于轉換函數的惰性(lazy),lineLengths不會立即計算。 最後,我們運作reduce,這是一個動作函數。 此時,Spark 把計算分成多個任務(task),并且讓它們運作在多台機器上。每台機器都運作自己的 map 和本地 reduce。然後僅僅将結果傳回給驅動程式。
如果稍後還會再次使用lineLength,我們可以在運作reduce之前添加:
lineLengths.persist(StorageLevel.MEMORY_ONLY());
這将導緻lineLength在第一次計算之後被儲存在記憶體中。
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#2-%E4%BC%A0%E9%80%92%E5%87%BD%E6%95%B0%E7%BB%99spark 2. 傳遞函數給Spark
Spark的API很大程度上依賴于驅動程式中傳遞過來的函數在叢集上運作。 在Java中,函數由org.apache.spark.api.java.function接口實作。 建立這樣的功能有兩種方法:
(1)在類中實作Function接口,作為匿名内部類或命名的内部類,并将其執行個體傳遞給Spark。
(2)在Java 8中,使用lambda表達式來簡潔地定義一個實作。
匿名内部類
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
或者命名内部類
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
下表中列出一些基本的函數接口:
函數名 | 實作的方法 | 用途 |
---|---|---|
Function<T,R> | R call(T) | 接收一個輸入值并傳回一個輸出值,用于類似map()和filter()等操作中 |
Function2<T1,T2,R> | R call(T1,T2) | 接收兩個輸入值并傳回一個輸出值,用于類似aggregate()和fold()等操作中 |
FlatMapFunction<T,R> | Iterable<R> call(T) | 接收一個輸入值并傳回任意個輸出,用于類似flatMap()這樣的操作中 |
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#3-%E4%BD%BF%E7%94%A8%E9%94%AE%E5%80%BC%E5%AF%B9 3. 使用鍵值對
雖然大多數Spark操作适用于包含任何類型對象的RDD上,但是幾個特殊操作隻能在鍵值對的RDD上使用。 最常見的是分布式“shuffle”操作,例如按鍵分組或聚合元素。
在Java中,使用Scala标準庫中的scala.Tuple2類來表示鍵值對。 可以如下簡單地調用:
new Tuple2(a,b)
來建立一個元組,然後用tuple._1()和tuple._2()通路它的字段。
鍵值對的RDD由JavaPairRDD類表示。 您可以使用特殊版本的map操作(如mapToPair和flatMapToPair)從JavaRDD來建構JavaPairRDD。 JavaPairRDD将具有标準的RDD的函數以及特殊的鍵值對函數。
例如,以下代碼在鍵值對上使用reduceByKey操作來計算每行文本在檔案中的出現次數:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
例如,我們也可以使用counts.sortByKey()來按字母順序來對鍵值對排序,最後将counts.collect()作為對象數組傳回到驅動程式。
注意:當使用一個自定義對象作為 key 在使用鍵值對操作的時候,你需要確定自定義 equals() 方法和 hashCode() 方法是比對的。更加詳細的内容,檢視 Object.hashCode() 文檔)中的契約概述。
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#4-%E8%BD%AC%E6%8D%A2%E6%93%8D%E4%BD%9Ctransformations 4. 轉換操作(Transformations)
下面列出了Spark支援的一些常見轉換函數。 有關詳細資訊,請參閱RDD API文檔(Scala,Java,Python,R)和RDD函數doc(Scala,Java)。
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#41-mapfunc-%E6%98%A0%E5%B0%84 4.1 map(func) 映射
将函數應用于RDD中的每個元素,将傳回值構成新的RDD。
List<String> aList = Lists.newArrayList("a", "B", "c", "b");
JavaRDD<String> rdd = sc.parallelize(aList);
// 小寫轉大寫
JavaRDD<String> upperLinesRDD = rdd.map(new Function<String, String>() {
@Override
public String call(String str) throws Exception {
if (StringUtils.isBlank(str)) {
return str;
}
return str.toUpperCase();
}
});
// A B C B
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#42-filterfunc-%E8%BF%87%E6%BB%A4 4.2 filter(func) 過濾
傳回通過選擇func傳回true的元素形成的新RDD。
List<String> list = Lists.newArrayList("a", "B", "c", "b");
JavaRDD<String> rdd = sc.parallelize(list);
// 隻傳回以a開頭的字元串
JavaRDD<String> filterRDD = rdd.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String str) throws Exception {
return !str.startsWith("a");
}
});
// B c b
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#43-flatmapfunc-%E4%B8%80%E8%A1%8C%E8%BD%AC%E5%A4%9A%E8%A1%8C 4.3 flatMap(func) 一行轉多行
類似于map函數,但是每個輸入項可以映射為0個輸出項或更多輸出項(是以func應該傳回一個序列而不是一個條目)。
List<String> list = Lists.newArrayList("a 1", "B 2");
JavaRDD<String> rdd = sc.parallelize(list);
// 一行轉多行 以空格分割
JavaRDD<String> resultRDD = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
if (StringUtils.isBlank(s)) {
return null;
}
String[] array = s.split(" ");
return Arrays.asList(array).iterator();
}
});
// a
// 1
// B
// 2
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#44-distinctnumtasks 4.4 distinct([numTasks]))
去重
List<String> aList = Lists.newArrayList("1", "3", "2", "3");
JavaRDD<String> aRDD = sc.parallelize(aList);
// 去重
JavaRDD<String> rdd = aRDD.distinct(); // 1 2 3
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#45-unionotherdataset-%E5%B9%B6%E9%9B%86 4.5 union(otherDataset) 并集
生成一個包含兩個RDD中所有元素的RDD. 如果輸入的RDD中有重複資料,union()操作也會包含這些重複的資料.
List<String> aList = Lists.newArrayList("1", "2", "3");
List<String> bList = Lists.newArrayList("3", "4", "5");
JavaRDD<String> aRDD = sc.parallelize(aList);
JavaRDD<String> bRDD = sc.parallelize(bList);
// 并集
JavaRDD<String> rdd = aRDD.union(bRDD); // 1 2 3 3 4 5
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#46-intersectionotherdataset-%E4%BA%A4%E9%9B%86 4.6 intersection(otherDataset) 交集
求兩個RDD共同的元素的RDD. intersection()在運作時也會去掉所有重複的元素,盡管intersection()與union()的概念相似,但性能卻差的很多,因為它需要通過網絡混洗資料來發現共同的元素.
List<String> aList = Lists.newArrayList("1", "2", "3");
List<String> bList = Lists.newArrayList("3", "4", "5");
JavaRDD<String> aRDD = sc.parallelize(aList);
JavaRDD<String> bRDD = sc.parallelize(bList);
// 交集
JavaRDD<String> rdd = aRDD.intersection(bRDD); // 3
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#47-subtractotherdataset-%E5%B7%AE%E9%9B%86 4.7 subtract(otherDataset) 差集
subtract接受另一個RDD作為參數,傳回一個由隻存在第一個RDD中而不存在第二個RDD中的所有元素組成的RDD
List<String> aList = Lists.newArrayList("1", "2", "3");
List<String> bList = Lists.newArrayList("3", "4", "5");
JavaRDD<String> aRDD = sc.parallelize(aList);
JavaRDD<String> bRDD = sc.parallelize(bList);
// 差集
JavaRDD<String> rdd = aRDD.subtract(bRDD); // 1 2
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#48-groupbykey-%E5%88%86%E7%BB%84 4.8 groupByKey 分組
根據鍵值對的key進行分組.對(K,V)鍵值對的資料集進行調用時,傳回(K,Iterable <V>)鍵值對的資料集。
注意
如果分組是為了在每個key上執行聚合(如求總和或平均值),則使用reduceByKey或aggregateByKey會有更好的性能。
預設情況下,輸出中的并行級别取決于父RDD的分區數。 可以設定可選參數numTasks來設定任務數量(By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.)。
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("Banana", 10);
Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("Pear", 5);
Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("Banana", 9);
Tuple2<String, Integer> t4 = new Tuple2<String, Integer>("Apple", 4);
List<Tuple2<String, Integer>> list = Lists.newArrayList();
list.add(t1);
list.add(t2);
list.add(t3);
list.add(t4);
JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(list);
// 分組
JavaPairRDD<String, Iterable<Integer>> groupRDD = rdd.groupByKey();
// Apple --- 4
// Pear --- 5
// Banana --- 10 9
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#49-reducebykeyfunc-numtasks-%E5%88%86%E7%BB%84%E8%81%9A%E5%90%88 4.9 reduceByKey(func, [numTasks]) 分組聚合
當在(K,V)鍵值對的資料集上調用時,傳回(K,V)鍵值對的資料集,其中使用給定的reduce函數func聚合每個鍵的值,該函數類型必須是(V,V)=> V。
類似于groupByKey,可以通過設定可選的第二個參數來配置reduce任務的數量。
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("Banana", 10);
Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("Pear", 5);
Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("Banana", 9);
Tuple2<String, Integer> t4 = new Tuple2<String, Integer>("Apple", 4);
List<Tuple2<String, Integer>> list = Lists.newArrayList();
list.add(t1);
list.add(t2);
list.add(t3);
list.add(t4);
JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(list);
// 分組計算
JavaPairRDD<String, Integer> reduceRDD = rdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// Apple --- 4
// Pear --- 5
// Banana --- 19
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#5-%E5%8A%A8%E4%BD%9C%E6%93%8D%E4%BD%9C-action 5. 動作操作 (Action)
下面列出了Spark支援的一些常見操作。
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#51-reduce 5.1 reduce
接收一個函數作為參數,這個函數要操作兩個相同元素類型的RDD并傳回一個同樣類型的新元素.
List<String> aList = Lists.newArrayList("aa", "bb", "cc", "dd");
JavaRDD<String> rdd = sc.parallelize(aList);
String result = rdd.reduce(new Function2<String, String, String>() {
@Override
public String call(String v1, String v2) throws Exception {
return v1 + "#" + v2;
}
});
System.out.println(result); // aa#bb#cc#dd
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#52-collect 5.2 collect
将整個RDD的内容傳回.
List<String> list = Lists.newArrayList("aa", "bb", "cc", "dd");
JavaRDD<String> rdd = sc.parallelize(list);
List<String> collect = rdd.collect();
System.out.println(collect); // [aa, bb, cc, dd]
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#53-taken 5.3 take(n)
傳回RDD中的n個元素,并且嘗試隻通路盡量少的分區,是以該操作會得到一個不均衡的集合.需要注意的是,這些操作傳回元素的順序與你的預期可能不一樣.
List<String> list = Lists.newArrayList("aa", "bb", "cc", "dd");
JavaRDD<String> rdd = sc.parallelize(list);
List<String> collect = rdd.take(3);
System.out.println(collect); // [aa, bb, cc]
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB7cea32fdbc8832b3da225f87d68e5c3e%3Fmethod%3Ddownload%26read%3Dtrue#55-takesample 5.5 takeSample
有時需要在驅動器程式中對我們的資料進行采樣,takeSample(withReplacement, num, seed)函數可以讓我們從資料中擷取一個采樣,并指定是否替換.
版本
2.1.1
原文:
http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations