[[email protected] spark-2.3.0]# pyspark
Python 2.7.5 (default, Aug 4 2017, 00:39:18)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
2018-06-01 16:31:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0
/_/
Using Python version 2.7.5 (default, Aug 4 2017 00:39:18)
SparkSession available as 'spark'.
>>>
1)引入Python中pyspark工作模块
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数(比如主节点的URL)。初始化后,就可以使用SparkContext对象所包含的各种方法来创建和操作RDD和共享变量。Spark shell会自动初始化一个SparkContext(在Scala和Python下可以,但不支持Java)。
#getOrCreate表明可以视情况新建session或利用已有的session
>>> import os
>>> cwd=os.getcwd()
>>> cwd
'/usr/local/spark/spark-2.3.0'
b) 要读入的文件的全路径
>>> rdd=sc.textFile("file:"+cwd+"/test-rdd/test-rdd.txt")
>>> rdd
file:/usr/local/spark/spark-2.3.0/test-rdd/test-rdd.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0
和mapPartitions类似, but also provides func with an integer value representing the index of the partition, 但是还为函数func提供了一个正式参数,用来表示分区的编号。【此时func作用为(Int, Iterator) => Iterator 】
首先按Key分组,然后对同Key的Vaue做聚集操作。When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])
按Key排序。通过第一个参数True/False指定是升序还是降序。
join(otherDataset, [numTasks])
类似SQL中的连接(内连接),即(K, V) and (K, W) => (K, (V, W)),返回所有连接对。外连接通过:leftOUterJoin(左出现右无匹配为空)、rightOuterJoin(右全出现左无匹配为空)、fullOuterJoin实现(左右全出现无匹配为空)。
cogroup(otherDataset, [numTasks])
对两个RDD做groupBy。即(K, V) and (K, W) => (K, Iterable, Iterable(W))。别名groupWith。
最后运行reduce action,这个时候spark会把计算分成不同的任务运行在单独的机器上,每台机器会运行自己部分并返回结果到driver program
>>> lines=sc.textFile("file:"+cwd+"/test-rdd/test-rdd.txt")
>>> lineLengths=lines.map(lambda s:len(s))
>>> lineLengths
PythonRDD[15] at RDD at PythonRDD.scala:48
>>> totalLength = lineLengths.reduce(lambda a, b: a + b)
>>> totalLength
43
sc = SparkContext('local')
old=sc.parallelize(range(8))
samplePartition = [old.sample(withReplacement=True, fraction=0.5) for i in range(5)]
for num, element in zip(range(len(samplePartition)), samplePartition):
print('sample: %s y=%s' %(str(num),str(element.collect())))
taskSamplePartition = [old.takeSample(withReplacement=False, num=4) for i in range(5)]
for num, element in zip(range(len(taskSamplePartition)), taskSamplePartition) :
#注意因为是action,所以element是集合对象,而不是rdd的分区
print('taskSample: %s y=%s' %(str(num),str(element)))
mapRdd = sc.parallelize([('B',1),('A',2),('C',3),('D',4),('E',5)])
y = [mapRdd.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2, 'D':0.6, 'E':0.8}) for i in range(5)]
for num, element in zip(range(len(y)), y) :
#注意因为是action,所以element是集合对象,而不是rdd的分区
print('y: %s y=%s' %(str(num),str(element.collect())))
结果如下:
>>> old=sc.parallelize(range(8))
>>> samplePartition = [old.sample(withReplacement=True, fraction=0.5) for i in range(5)]
>>> for num, element in zip(range(len(samplePartition)), samplePartition):
... print('sample: %s y=%s' %(str(num),str(element.collect())))
...
sample: 0 y=[6, 7]
sample: 1 y=[0, 2, 3, 3, 5]
sample: 2 y=[0, 0, 2, 6, 6]
sample: 3 y=[]
sample: 4 y=[1, 5, 6]
>>> taskSamplePartition = [old.takeSample(withReplacement=False, num=4) for i in range(5)]
>>> for num, element in zip(range(len(taskSamplePartition)), taskSamplePartition) :
... print('taskSample: %s y=%s' %(str(num),str(element)))
...
taskSample: 0 y=[4, 2, 7, 1]
taskSample: 1 y=[1, 7, 6, 2]
taskSample: 2 y=[0, 1, 6, 5]
taskSample: 3 y=[7, 5, 3, 6]
taskSample: 4 y=[3, 5, 7, 2]
>>> mapRdd = sc.parallelize([('B',1),('A',2),('C',3),('D',4),('E',5)])
>>> y = [mapRdd.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2, 'D':0.6, 'E':0.8}) for i in range(5)]
>>> for num, element in zip(range(len(y)), y):
... print('y: %s y=%s' %(str(num),str(element.collect())))
...
y: 0 y=[('B', 1), ('A', 2), ('C', 3), ('D', 4), ('E', 5)]
y: 1 y=[('B', 1), ('A', 2), ('E', 5)]
y: 2 y=[('B', 1), ('A', 2), ('E', 5)]
y: 3 y=[('B', 1), ('A', 2), ('D', 4), ('E', 5)]
y: 4 y=[('B', 1), ('A', 2), ('E', 5)]
>>>