天天看點

Learning Spark筆記7-聚合

聚合

當資料集中的資料是key/value時,通常是使用相同的鍵彙總所有元素的統計資訊。

reduceByKey()類似于reduce();他們都是使用函數來合并值。reduceByKey()運作幾個并行的reduce操作,每個操作合并相同的鍵的值。因為資料集有大量的key,reduceByKey()沒有實作一個動作傳回一個值。相反的,它傳回一個新的RDD,包含鍵和reduce後的值。

foldByKey()與fold()類似;

我們可以使用reduceByKey()和mapValues()來計算每個關鍵字的平均值,方式與如何使用fold()和map()來計算整個RDD平均值相似。

Example 4-7. Per-key average with reduceByKey() and mapValues() in Python

rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

Example 4-8. Per-key average with reduceByKey() and mapValues() in Scala

rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

我們可以在示例4-9到4-11中使用類似的方法來實作經典的分布式單詞計數問題。 我們将使用上一章中的flatMap(),以便我們可以生成一對RDD單詞和數字1,然後使用reduceByKey()将所有單詞合并在一起,如執行個體4-7和4-8所示。

Example 4-9. Word count in Python

rdd = sc.textFile("s3://...")

words = rdd.flatMap(lambda x: x.split(" "))

result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

Example 4-10. Word count in Scala

val input = sc.textFile("s3://...")

val words = input.flatMap(x => x.split(" "))

val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)

Example 4-11. Word count in Java

JavaRDD<String> input = sc.textFile("s3://...")

JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {

 public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); }

});

JavaPairRDD<String, Integer> result = words.mapToPair(

 new PairFunction<String, String, Integer>() {

 public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); }

}).reduceByKey(

 new Function2<Integer, Integer, Integer>() {

 public Integer call(Integer a, Integer b) { return a + b; }

});

實際上我們可以直接在第一個RDD上使用input.flatMap(x =>x.split(" ")).countByValue()來計算單詞計數。

combineByKey()是最常用的每鍵聚合函數。大多數的每鍵combiner都使用它來實作。像是aggregate(),combineByKey()允許使用者傳回與輸入資料不同的值。

通過key來組合資料我們的選擇有很多。大多數都是在combineByKey()之上實作的。在任何情況下,使用Spark指定的聚合函數都比分組然後合并操作更快。

調整并行度

到現在為止,我們讨論的所有轉換都是分布式的,但是我們沒有真正看到Spark怎麼樣切分工作。當在RDD上執行操作的時候,每個RDD都有固定的分區數量來決定并行度。

當執行聚合或分組操作時,我們可以讓Spark使用指定數量的分區。Spark總是會根據您的群集大小來推測一個合理的預設值,但在某些情況下,您将需要調整并行級别以獲得更好的性能。

這章讨論的大多數操作,當建立一個分組的或聚合的RDD時,都接受第二個參數用來指定分區個數

Example 4-15. reduceByKey() with custom parallelism in Python

data = [("a", 3), ("b", 4), ("a", 1)]

sc.parallelize(data).reduceByKey(lambda x, y: x + y) # Default parallelism

sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # Custom parallelism

Example 4-16. reduceByKey() with custom parallelism in Scala

val data = Seq(("a", 3), ("b", 4), ("a", 1))

sc.parallelize(data).reduceByKey((x, y) => x + y) // Default parallelism

sc.parallelize(data).reduceByKey((x, y) => x + y) // Custom parallelism

有時候我們想要在分組和聚合操作之外來改變RDD的分區數量,Spark提供了repartition()函數,該函數可以在網際網路上混洗資料然後建立一個新的分區集合。要記住的是重新分區資料的操作是非常昂貴的。Spark有一個優化版本coalesce(),當減少RDD分區數量時,避免移動資料。想要知道你是否可以安全的調用coalesce(),可以檢查RDD的大小在java/scala中使用rdd.partitions.size()或在Python使用rdd.getNumPartitions(),確定您将它合并到比目前擁有的分區更少的分區上。

繼續閱讀