聚合
當資料集中的資料是key/value時,通常是使用相同的鍵彙總所有元素的統計資訊。
reduceByKey()類似于reduce();他們都是使用函數來合并值。reduceByKey()運作幾個并行的reduce操作,每個操作合并相同的鍵的值。因為資料集有大量的key,reduceByKey()沒有實作一個動作傳回一個值。相反的,它傳回一個新的RDD,包含鍵和reduce後的值。
foldByKey()與fold()類似;
我們可以使用reduceByKey()和mapValues()來計算每個關鍵字的平均值,方式與如何使用fold()和map()來計算整個RDD平均值相似。
Example 4-7. Per-key average with reduceByKey() and mapValues() in Python
rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
Example 4-8. Per-key average with reduceByKey() and mapValues() in Scala
rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
我們可以在示例4-9到4-11中使用類似的方法來實作經典的分布式單詞計數問題。 我們将使用上一章中的flatMap(),以便我們可以生成一對RDD單詞和數字1,然後使用reduceByKey()将所有單詞合并在一起,如執行個體4-7和4-8所示。
Example 4-9. Word count in Python
rdd = sc.textFile("s3://...")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
Example 4-10. Word count in Scala
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
Example 4-11. Word count in Java
JavaRDD<String> input = sc.textFile("s3://...")
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); }
});
JavaPairRDD<String, Integer> result = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); }
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
實際上我們可以直接在第一個RDD上使用input.flatMap(x =>x.split(" ")).countByValue()來計算單詞計數。
combineByKey()是最常用的每鍵聚合函數。大多數的每鍵combiner都使用它來實作。像是aggregate(),combineByKey()允許使用者傳回與輸入資料不同的值。
通過key來組合資料我們的選擇有很多。大多數都是在combineByKey()之上實作的。在任何情況下,使用Spark指定的聚合函數都比分組然後合并操作更快。
調整并行度
到現在為止,我們讨論的所有轉換都是分布式的,但是我們沒有真正看到Spark怎麼樣切分工作。當在RDD上執行操作的時候,每個RDD都有固定的分區數量來決定并行度。
當執行聚合或分組操作時,我們可以讓Spark使用指定數量的分區。Spark總是會根據您的群集大小來推測一個合理的預設值,但在某些情況下,您将需要調整并行級别以獲得更好的性能。
這章讨論的大多數操作,當建立一個分組的或聚合的RDD時,都接受第二個參數用來指定分區個數
Example 4-15. reduceByKey() with custom parallelism in Python
data = [("a", 3), ("b", 4), ("a", 1)]
sc.parallelize(data).reduceByKey(lambda x, y: x + y) # Default parallelism
sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # Custom parallelism
Example 4-16. reduceByKey() with custom parallelism in Scala
val data = Seq(("a", 3), ("b", 4), ("a", 1))
sc.parallelize(data).reduceByKey((x, y) => x + y) // Default parallelism
sc.parallelize(data).reduceByKey((x, y) => x + y) // Custom parallelism
有時候我們想要在分組和聚合操作之外來改變RDD的分區數量,Spark提供了repartition()函數,該函數可以在網際網路上混洗資料然後建立一個新的分區集合。要記住的是重新分區資料的操作是非常昂貴的。Spark有一個優化版本coalesce(),當減少RDD分區數量時,避免移動資料。想要知道你是否可以安全的調用coalesce(),可以檢查RDD的大小在java/scala中使用rdd.partitions.size()或在Python使用rdd.getNumPartitions(),確定您将它合并到比目前擁有的分區更少的分區上。