天天看点

PySpark学习笔记-RDD(键值对RDD)

1.pairRDD

pairRDD,就是键值对RDD。pairRDD是很多程序的构成要素,因为pairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口。用户也可以通过控制pairRDD在各个节点上的分布情况,大大减少应用的通信开销。

pairRDD是一种特殊的RDD,所以普通RDD支持的算子都适用于pairRDD.

2.ByKey类操作

from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
    conf = SparkConf().setMaster("local").setAppName("word_count")
    sc=SparkContext(conf=conf)
    pair_rdd=sc.parallelize([('a',1),('b',10),('c',4),('b',7),('c',9),('b',10)])
    pairrdd_1=pair_rdd.reduceByKey(lambda x,y:x+y)
    print(pairrdd_1.collect())
    pairrdd_2=pair_rdd.groupByKey().map(lambda row:(row[0],list(row[1])))
    print(pairrdd_2.collect())
    pairrdd_3=pair_rdd.sortByKey()
    print(pairrdd_3.collect())
    pairrdd_4=pair_rdd.sortByKey(ascending=False)
    print(pairrdd_4.collect())
    result=pair_rdd.countByKey()
    print(result)
    pairrdd_6=pair_rdd.sampleByKey(fractions={'a':0.1,'b':0.1,'c':0},withReplacement=False)
    print(pairrdd_6.collect())
    rdd_2= sc.parallelize([('a',1),('b',3),('d',4)])
    pairrdd_7=pair_rdd.subtractByKey(rdd_2)
    print(pairrdd_7.collect())
           

实验结果如下:

pairrdd_1 [('a', 1), ('b', 27), ('c', 13)]
pairrdd_2 [('a', [1]), ('b', [10, 7, 10]), ('c', [4, 9])]
pairrdd_3 [('a', 1), ('b', 10), ('b', 7), ('b', 10), ('c', 4), ('c', 9)]
pairrdd_4 [('c', 4), ('c', 9), ('b', 10), ('b', 7), ('b', 10), ('a', 1)]
result defaultdict(<class 'int'>, {'a': 1, 'b': 3, 'c': 2})
pairrdd_6 [('a', 1), ('b', 10)]
pairrdd_7 [('c', 4), ('c', 9)]

reduceByKey()会合并具有相同键的值。groupByKey()会将具有相同键的值合并到一起,在本实验中,之后groupByKey()之后做了一步map操作,是为了将groupByKey()操作之后的结果显示出来。sortByKey()是按照键值进行排序,可以利用keyfunc参数规定排序方法,ascending指定是否升序排序。countByKey()按键统计数据量,这个一个行动算子,返回dict类型。sampleByKey ()与sample()类似,不同的地方在于sampleyByKey()中fractions的参数类型为dic类型,并且原RDD中每个key都必须在该dict中出现。substrctByKey()主要原RDD中的键出现在otherRDD中,则从原RDD中删除该键值对,这个算子只比较key,不对value进行比较。

combineByKey()是最为常用的基于键进行聚合的函数。和aggregate()一样,combineByKey()也可以让用户返回与输入数据的类型不同的返回值。combineByKey()在遍历分区中的所有元素时,如果当前的元素的键从未出现过,则combineByKey()会调用一个叫作creatCombiner()的函数创建该键对应的累加器的初始值。但需要注意的是,这一过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生;如果当前的键以前出现过,则会使用mergerValue()方法将该键的累加器对应的当前值与这个新的值进行合并。combineByKey()对于每个分区时独立处理的,因此对于同一个键会有多个累加器,如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用mergeCombiners()方法将各个分区的结果进行合并。reduceByKey()和reduce()作用差不多,只不过该方法是合并具有相同键值的值。而groupByKey()则是对具有相同键的值进行分组。sortByKey()会返回一个根据键排序的RDD,该函数中的ascending可以设定是否逆序排序。

from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
    conf = SparkConf().setMaster("local").setAppName("word_count")
    sc=SparkContext(conf=conf)
    pair_rdd=sc.parallelize([('a',1),('b',10),('c',4),('b',7),('c',9),('a',4),('d',7)],1)
    print(pair_rdd.glom().collect())
    rdd_1=pair_rdd.combineByKey(lambda x:(x,0),\
                                lambda x,y:(x[0]+y,x[1]+1),\
                                lambda x,y:(x[0]+y[0],x[1]+y[1]))
    rdd_2=pair_rdd.combineByKey(lambda x:(x,1),\
                                lambda x,y:(x[0]+y,x[1]+1),\
                                lambda x,y:(x[0]+y[0],x[1]+y[1]))
    rdd_3=pair_rdd.combineByKey(lambda x:(x,2),\
                                lambda x,y:(x[0]+y,x[1]+1),\
                                lambda x,y:(x[0]+y[0],x[1]+y[1]))
    print(rdd_1.collect())
    print(rdd_2.collect())
    print(rdd_3.collect())
    pair_rdd_1=sc.parallelize([('a',1),('b',10),('c',4),('b',7),('c',9),('a',4),('d',7)],2)
    print(pair_rdd_1.glom().collect())
    rdd_4=pair_rdd_1.combineByKey(lambda x:(x,0),\
                                lambda x,y:(x[0]+y,x[1]+1),\
                                lambda x,y:(x[0]+y[0],x[1]+y[1]))
    rdd_5=pair_rdd_1.combineByKey(lambda x:(x,1),\
                                lambda x,y:(x[0]+y,x[1]+1),\
                                lambda x,y:(x[0]+y[0],x[1]+y[1]))
    rdd_6=pair_rdd_1.combineByKey(lambda x:(x,2),\
                                lambda x,y:(x[0]+y,x[1]+1),\
                                lambda x,y:(x[0]+y[0],x[1]+y[1]))
    print(rdd_4.collect())
    print(rdd_5.collect())
    print(rdd_6.collect())
           

结果如下:

pair_rdd分区 [[('a', 1), ('b', 10), ('c', 4), ('b', 7), ('c', 9), ('a', 4), ('d', 7)]]
rdd_1 [('a', (5, 1)), ('b', (17, 1)), ('c', (13, 1)), ('d', (7, 0))]
rdd_2 [('a', (5, 2)), ('b', (17, 2)), ('c', (13, 2)), ('d', (7, 1))]
rdd_3 [('a', (5, 3)), ('b', (17, 3)), ('c', (13, 3)), ('d', (7, 2))]
pair_rdd_1分区 [[('a', 1), ('b', 10), ('c', 4)], [('b', 7), ('c', 9), ('a', 4), ('d', 7)]]
rdd_4 [('b', (17, 0)), ('c', (13, 0)), ('d', (7, 0)), ('a', (5, 0))]
rdd_5 [('b', (17, 2)), ('c', (13, 2)), ('d', (7, 1)), ('a', (5, 2))]
rdd_6 [('b', (17, 4)), ('c', (13, 4)), ('d', (7, 2)), ('a', (5, 4))]

分析以上结果可以更加具体地明白combineByKey()的执行过程。当遇到一个新的键时,该键对应的值传入createCombiner()函数,这个与aggregate()不同。

from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
    conf = SparkConf().setMaster("local").setAppName("word_count")
    sc=SparkContext(conf=conf)
    pair_rdd=sc.parallelize([('a',1),('b',10),('c',4),('b',7),('c',9),('a',4),('d',7)],2)
    rdd_1=pair_rdd.combineByKey(lambda x:(x,1),\
                                lambda x,y:(x[0]+y,x[1]+1),\
                                lambda x,y:(x[0]+y[0],x[1]+y[1]))
    print(rdd_1.collect())
    rdd_2=pair_rdd.foldByKey(0,lambda x,y:x+y)
    print(rdd_2.collect())
    rdd_3=pair_rdd.aggregateByKey((0,0),\
                                  lambda x,y:(x[0]+y,x[1]+1),\
                                  lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))
    print(rdd_3.collect())
           

结果如下:

rdd_1 [('b', (17, 2)), ('c', (13, 2)), ('d', (7, 1)), ('a', (5, 2))]
rdd_2 [('b', 17), ('c', 13), ('d', 7), ('a', 5)]
rdd_3 [('b', (17, 2)), ('c', (13, 2)), ('d', (7, 1)), ('a', (5, 2))]

foldByKey()和aggregateByKey()也是聚合函数,其底层都是使用combineByKey()函数实现的。foldByKey()、aggregateByKey()中要用到的creatCombiner()函数由这两个函数中的seqFunc(zeroValues,v)来实现。但是foldByKey()返回结果与原pairRDD类型相同。

注意:ByKey()类的算子并不是只能用在pariRDD上,这些算子一般都有一个函数参数(func,f,keyfunc等),传入函数之后可以自己对RDD进行分组。(下面以groupByKey为例)

from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
    conf = SparkConf().setMaster("local").setAppName("word_count")
    sc=SparkContext(conf=conf)
    rdd=sc.parallelize(range(1,10))
    rdd_1=rdd.groupBy(f=lambda x:x%2).mapValues(lambda x:list(x))
    print(rdd_1.collect())
           

结果如下:

rdd_1 [(1, [1, 3, 5, 7, 9]), (0, [2, 4, 6, 8])]

3.mapValues()、flatMapValues()

mapValues()和map()区别在于mapValues()只是将对应键的值传入参数中,而map()中则是将整个元素传入都参数中。flatMapValues()会将mapValues()得到的结果重新一对一的组合成键值对。

from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
    conf = SparkConf().setMaster("local").setAppName("word_count")
    sc=SparkContext(conf=conf)
    pair_rdd = sc.parallelize([('A', (1, 2, 3)), ('B', (4, 5)),('A',[4,5])])
    rdd_1 = pair_rdd.mapValues(lambda x: [i ** 2 for i in x])
    print(rdd_1.collect())
    rdd_2=pair_rdd.flatMapValues(lambda x:[i**2 for i in x])
    print(rdd_2.collect())
           

结果如下:

rdd_1 [('A', [1, 4, 9]), ('B', [16, 25]), ('A', [16, 25])]
rdd_2 [('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25), ('A', 16), ('A', 25)]

4.Join操作

所有的join操作要实现的功能与SQL查询语言中的相应的功能相同。下面以leftOuterJoin()为例。

from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
    conf = SparkConf().setMaster("local").setAppName("word_count")
    sc=SparkContext(conf=conf)
    rdd_1=sc.parallelize([('a',1),('b',4),('c',19)])
    rdd_2=sc.parallelize([('a',4),('a',1),('b','6'),('d',15)])
    rdd_3=rdd_1.leftOuterJoin(rdd_2)
    print(rdd_3.collect())
    rdd_4=sc.parallelize([(6,'a')])
    rdd_5=rdd_1.leftOuterJoin(rdd_4)
    print(rdd_5.collect())
           

实验结果如下:

rdd_3 [('b', (4, '6')), ('c', (19, None)), ('a', (1, 4)), ('a', (1, 1))]
rdd_4 [('b', (4, None)), ('c', (19, None)), ('a', (1, None))]
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
    conf = SparkConf().setMaster("local").setAppName("word_count")
    sc=SparkContext(conf=conf)
    rdd_1=sc.parallelize([('a',1),('b',4),('c',19)])
    rdd_2=sc.parallelize([('a',4),('a',1),('b','6'),('d',15)])
    rdd_3=rdd_1.cogroup(rdd_2)
    print(rdd_3.collect())
    rdd_4=rdd_2.cogroup(rdd_1)
    print(rdd_4.collect())
           

结果如下:

rdd_3 [('b', [[4], ['6']]), ('c', [[19], []]), ('d', [[], [15]]), ('a', [[1], [4, 1]])]
rdd_4 [('b', [['6'], [4]]), ('d', [[15], []]), ('c', [[], [19]]), ('a', [[4, 1], [1]])]

注意上述rdd中‘c'和‘d'对应的结果。这也可以说明cogroup()有点类似与Hive中的full outer join。

5.keys()、values()

这两个算子与内置类型dict中同名方法意思差不多。

from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
    conf = SparkConf().setMaster("local").setAppName("word_count")
    sc=SparkContext(conf=conf)
    rdd=sc.parallelize(range(1,10))
    rdd_1=rdd.groupBy(f=lambda x:x%2).mapValues(lambda x:list(x))
    rdd_2=rdd_1.keys()
    print(rdd_2.collect())
    rdd_3=rdd_1.values()
    print(rdd_3.collect())
           

结果如下:

rdd_2 [1, 0]
rdd_3 [[1, 3, 5, 7, 9], [2, 4, 6, 8]]

6. collectAsMap()、lookup()

from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
    conf = SparkConf().setMaster("local").setAppName("word_count")
    sc=SparkContext(conf=conf)
    rdd=sc.parallelize([('a',1),('d',7),('b',4),('a',5),('b',3)])
    result_1=rdd.collectAsMap()
    print(type(result_1))
    print(result_1)
    result_2=rdd.lookup('a')
    print(result_2)
           

结果如下:

result_1的type: <class 'dict'>
result_1 {'a': 5, 'd': 7, 'b': 3}
result_2 [1, 5]

collectAsMap()返回的结果为dict。Python中的dict是由hash表实现的,所以对于结果中键‘a’对应的值为原RDD中'a'最后一次出现时的value值。

其他类型的RDD算子:https://blog.csdn.net/yeshang_lady/article/details/87373280