天天看点

Pair RDD----键值对操作1. 创建Pair RDD2. Pair RDD的转换操作3. Pair RDD的行动操作

Spark 为包含键值对类型的 RDD 提供了一些专有的操作。这些 RDD 被称为 pair RDD。 PairRDD 是很多程序的构成要素, 因为它们提供了并行操作各个键或跨节点重新进行数据分组的操作接口。

1. 创建Pair RDD

很多存储键值对的数据格式会在读取时直接返回由其键值对数据组成的 pair RDD。此外,当需要把一个普通的 RDD 转为 pair RDD 时,可以调用 map() 函数来实现,传递的函数需要返回键值对。

1.1. 文本文件

当我们将一个文本文件读取为 RDD 时,输入的每一行都会成为 RDD 的一个元素。也可以将多个完整的文本文件一次性读取为一个 pair RDD,其中键是文件名,值是文件内容。

input = sc.textFile("file:///home/holden/repos/spark/README.md")
           

如果文件足够小,那么可以使用 SparkContext.wholeTextFiles() 方法,该方法会返回一个 pair RDD,其中键是输入文件的文件名。

result.saveAsTextFile(outputFile)
           

saveAsTextFile() 方法接收一个路径,并将RDD 中的内容都输入到路径对应的文件中。 

1.2. JSON

读取 JSON 数据的最简单的方式是将数据作为文本文件读取, 然后使用 JSON 解析器来对 RDD 中的值进行映射操作。

import json
data = input.map(lambda x: json.loads(x))
           

写出 JSON 文件比读取它要简单得多,由结构化数据组成的 RDD 转为字符串 RDD,然后使用 Spark 的文本文件 API 写出去。

(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile))
           

1.3. CSV

读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,再对数据进行处理。 

import csv
import StringIO
...
def loadRecord(line):
"""解析一行CSV记录"""
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
    return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
           

保存CSV和 JSON 数据一样,写出 CSV/TSV 数据相当简单,同样可以通过重用输出编码器来加速。

def writeRecords(records):
    """写出一些CSV记录"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
           

1.4. SequenceFile

SequenceFile 是由没有相对关系结构的键值对文件组成的常用 Hadoop 格式。 SequenceFile文件有同步标记, Spark 可以用它来定位到文件中的某个点,然后再与记录的边界对齐。这可以让 Spark 使用多个节点高效地并行读取 SequenceFile 文件。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile(path, keyClass, valueClass, minPartitions)。

data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
           

1.5. 对象文件

对象文件看起来就像是对 SequenceFile 的简单封装,它允许存储只包含值的 RDD。和SequenceFile 不一样的是,对象文件是使用 Java 序列化写出的。

2. Pair RDD的转换操作

(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)

reduceByKey(func) 合并具有相同键的值 rdd.reduceByKey((x, y) => x + y) {(1,2), (3,10)}

groupByKey() 对具有相同键的值进行分组 rdd.groupByKey() {(1,[2]),(3, [4,6])}

combineByKey( createCombiner,mergeValue,mergeCombiners,partitioner) 使用不同的返回类型合并具有相同键的值

mapValues(func) 对 pair RDD 中的每个值应用一个函数而不改变键 rdd.mapValues(x => x+1) {(1,3), (3,5), (3,7)}

flatMapValues(func) 对 pair RDD 中的每个值应用一个返回迭代器的函数, 然后对返回的每个元素都生成一个对应原键的键值对记录。 通常用于符号化 rdd.flatMapValues(x => (x to 5)) {(1,2), (1,3), (1,4), (1,5), (3,4), (3,5)}

keys() 返回一个仅包含键的 RDD rdd.keys() {1, 3, 3}

values() 返回一个仅包含值的 RDD rdd.values() {2, 4, 6}

sortByKey() 返回一个根据键排序的 RDD rdd.sortByKey() {(1,2), (3,4), (3,6)}

针对两个pair RDD的转化操作( rdd = {(1, 2), (3, 4), (3, 6)}  other = {(3, 9)})

subtractByKey 删掉 RDD 中键与 other RDD 中的键相同的元素 rdd.subtractByKey(other) {(1, 2)}

join 对两个 RDD 进行内连接 rdd.join(other) {(3, (4, 9)), (3,(6, 9))}

rightOuterJoin 对两个 RDD 进行连接操作,确保第一个 RDD 的键必须存在(右外连接)rdd.rightOuterJoin(other) {(3,(Some(4),9)),(3,(Some(6),9))}

leftOuterJoin 对两个 RDD 进行连接操作,确保第二个 RDD 的键必须存在(左外连接)rdd.leftOuterJoin(other) {(1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))}

cogroup 将两个 RDD 中拥有相同键的数据分组到一起 rdd.cogroup(other) {(1,([2],[])), (3,([4, 6],[9]))}

3. Pair RDD的行动操作

(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)

countByKey() 对每个键对应的元素分别计数 rdd.countByKey() {(1, 1), (3, 2)}

collectAsMap() 将结果以映射表的形式返回,以便查询 rdd.collectAsMap() Map{(1, 2), (3, 4), (3, 6)}

lookup(key) 返回给定键对应的所有值 rdd.lookup(3) [4, 6]

继续阅读