天天看點

大資料系列之PySpark配置及RDD操作

PySpark實作了Spark對于Python的API,本文簡要介紹了PySpark的配置,以及通過PySpark對RDD進行Transform和Action操作。

1、PySpark介紹

PySpark實作了Spark對于Python的API,通過它,使用者可以編寫運作在Spark之上的Python程式,進而利用到Spark分布式計算的特點。

PySpark的整體架構圖如下,可以看到Python API的實作依賴于Java的API,Python程式端的SparkContext通過py4j調用JavaSparkContext,後者是對Scala的SparkContext的一個封裝。而對RDD進行轉換和操作的函數由使用者通過Python程式來定義,這些函數會被序列化然後發送到各個worker,然後每一個worker啟動一個Python程序來執行反序列化之後的函數,通過管道拿到執行之後的結果。

大資料系列之PySpark配置及RDD操作
  • Python程式的啟動

和Scala程式一樣,Python程式也是通過SparkSubmit送出得以執行,在SparkSubmit中會判斷送出的程式是否為Python,如果是,則設定mainClass為PythonRunner。在PythonRunner中,會根據配置選項,以及使用者通過指令行提供的–py-files選項,設定好PYTHONPATH,然後啟動一個Java的GatewayServer用來被Python程式調用,然後以使用者配置的PYSPARK_PYTHON選項作為Python解釋器,執行Python檔案,至此使用者的Python程式得以啟動。

  • SparkContext

和在Scala中一樣,SparkContext是調用Spark進行計算的入口。在Python的context.py中定義了類SparkContext,它封裝了一個JavaSparkContext作為它的_jsc屬性。在初始化SparkContext時,首先會調用java_gateway.py中定義的launch_gateway方法來初始化JavaGateWay,在launch_gateway中會引入在Spark中定義的類到SparkContext的屬性_jvm,比如:java_import(gateway.jvm, “org.apache.spark.SparkConf”)。這樣在Python中就可以通過SparkContext._jvm.SparkConf引用在Scala中定義的SparkConf這個類,可以執行個體化這個類的對象,可以調用對象的方法等。在初始化完畢之後,使用者就可以調用SparkContext中的方法了,比如textFile和parallelize。

  • RDD

Python中的RDD對Spark中的RDD進行了一次封裝,每一個RDD都對應了一個反序列化的函數。這是因為,盡管在Spark中RDD的元素可以具有任意類型,提供給JavaSparkContext中生成的RDD的隻具有Array[Byte]類型,也就是說JavaSparkContext的函數傳回值是JavaRDD[Array[Byte]],這樣,Python程式需要把對象先序列化成byte數組,然後把它分布到各個節點進行計算。計算完之後再反序列化成Python的對象。(這其中有一個特殊情況,就是JavaSparkContext傳回的是JavaRDD[String],可以把它當成是不需要序列化和反序列化的對象。)在Spark中不需要知道Array[Byte]反序列化之後是什麼。如何序列化和反序列化、如何對這些Array[Byte]進行轉換和操作都由Python程式來控制,Spark隻是負責資源的排程,負責如何把這些計算配置設定到各個節點上去執行。

2、PySpark環境配置

安裝好spark後,直接輸入pyspark,可調出pyspark工作界面

[[email protected] spark-2.3.0]# pyspark
Python 2.7.5 (default, Aug  4 2017, 00:39:18) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
2018-06-01 16:31:52 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Python version 2.7.5 (default, Aug  4 2017 00:39:18)
SparkSession available as 'spark'.
>>>
           

1)引入Python中pyspark工作子產品

import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#任何Spark程式都是SparkContext開始的,SparkContext的初始化需要一個SparkConf對象,SparkConf包含了Spark叢集配置的各種參數(比如主節點的URL)。初始化後,就可以使用SparkContext對象所包含的各種方法來建立和操作RDD和共享變量。Spark shell會自動初始化一個SparkContext(在Scala和Python下可以,但不支援Java)。
#getOrCreate表明可以視情況建立session或利用已有的session
           

2)Python腳本執行

python腳本中需要在開頭導入spark相關子產品,調用時使用spark-submit送出,如下所示:

spark-submit --master local xxxx.py
spark-submit --master yarn --deploy-mode cluster xxxx.py
           

3、PySpark使用

3.1 初始化Spark

編寫Spark程式的第一件事情就是建立SparkContext對象,SparkContext負責連接配接到叢集。建立SparkContext先要建立SparkConf對象,該對象可以定義我們Spark程式的相關參數。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
           

其中appName是程式名稱,它會顯示在叢集狀态界面上;master是要送出到的叢集的位址

3.2 初始化RDD

RDD(Resilient Distributed Datasets)是Spark中抽象出來的彈性分布式資料集,其本質上是一個隻讀的分區記錄集合。每個RDD可以分成多個分區,每個分區就是一個資料集片段。

建立RDD有兩種方式:一種是将驅動程式中的已有集合平行化;另外一種是引用外部存儲系統的資料集,例如共享檔案系統,HDFS, HBase, 或者其他類似Hadoop的資料源

  • sc.parallelize初始化RDD

在驅動程式中,對已有的可周遊集合執行SparkContext的parallelize函數,可以建立并行化集合。執行Parallelize函數時,集合元素被複制後用來構成可并行操作的分布式資料集。

a) 利用list建立一個RDD;使用sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame轉成Spark RDD

>>> rdd=sc.parallelize([1,2,3,4,5])
>>> rdd
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175
           

b)getNumPartitions()方法檢視list被分成了幾部分

>>> rdd.getNumPartitions()
2
           

c)glom().collect()檢視分區狀況

>>> rdd.glom().collect()
[[1, 2], [3, 4, 5]]
           
  • 外部資料集初始化RDD

PySpark可以從需要hadoop支援的存儲系統建立分布式資料集,包括本地檔案系統、HDFS、 Cassandra、HBase以及Amazon S3等等。Spark支援文本檔案、sequenceFile以及其他的Hadoop輸入格式。 注:sequenceFile是Hadoop中一個由二進制序列化過的key/value的位元組流組成的文本存儲檔案。文本RDD可以通過SparkContext的textFile函數建立。該方法以檔案位址(URI)作為輸入并按行讀取。

a) 記錄目前pyspark工作環境位置

>>> import os
>>> cwd=os.getcwd()
>>> cwd
'/usr/local/spark/spark-2.3.0'
           

b) 要讀入的檔案的全路徑

>>> rdd=sc.textFile("file:"+cwd+"/test-rdd/test-rdd.txt")
>>> rdd
file:/usr/local/spark/spark-2.3.0/test-rdd/test-rdd.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0
           

c) first()方法取讀入的rdd資料第一個item

>>> rdd.first()
u'Mary,F,7065'
           
3.3 RDD操作

RDDs支援兩種類型的操作:一種是轉換(transformations), 該操作從已有資料集建立新的資料集;另外一種是動作(actions),該操作在資料集上執行計算之後傳回一個值給驅動程式。例如, map就是一個轉換,這個操作在資料集的每個元素上執行一個函數并傳回一個處理之後新的RDD結果。另一方面,reduce是一個動作,這個操作按照某個函數規則聚集RDD中的所有元素并且把最終結果傳回給驅動程式。

Spark中的所有轉換操作都是lazy模式的,也就是說,不是立馬做轉換計算結果,而是将這些轉換操作記錄在相應的資料集上,當需要通過動作(action)把結果傳回給驅動程式時才真正執行。這個設計使Spark運作起來更加高效。例如,如果通過map建立的資料集後續會被reduce用到,那麼隻有reduce的結果會傳回給驅動程式,而不是更大的map結果。預設情況下,RDD上的轉換操作在每次做動作時,都會重新執行計算一次。然而,我們可以使用persist(或者cache)函數将RDD存放在記憶體中,友善後續的快速通路。另外,Spark也支援将RDD存放在磁盤上,或者在多個節點備援存儲。

  • 常見Transformations操作
Transformation 含義
map(func) 對每個RDD元素應用func之後,構造成新的RDD
filter(func) 對每個RDD元素應用func, 将func為true的元素構造成新的RDD
flatMap(func) 和map類似,但是flatMap可以将一個輸出元素映射成0個或多個元素。 (也就是說func傳回的是元素序列而不是單個元素).
mapPartitions(func) 和map類似,但是在RDD的不同分區上獨立執行。是以函數func的參數是一個Python疊代器,輸出結果也應該是疊代器【即func作用為Iterator => Iterator】
mapPartitionsWithIndex(func) 和mapPartitions類似, but also provides func with an integer value representing the index of the partition, 但是還為函數func提供了一個正式參數,用來表示分區的編号。【此時func作用為(Int, Iterator) => Iterator 】
sample(withReplacement, fraction, seed) 抽樣: fraction是抽樣的比例0~1之間的浮點數; withRepacement表示是否有放回抽樣, True是有放回, False是無放回;seed是随機種子。
union(otherDataset) 并集操作,重複元素會保留(可以通過distinct操作去重)
intersection(otherDataset) 交集操作,結果不會包含重複元素
distinct([numTasks])) 去重操作
groupByKey([numTasks]) 把Key相同的資料放到一起【(K, V) => (K, Iterable)】,需要注意的問題:1. 如果分組(grouping)操作是為了後續的聚集(aggregation)操作(例如sum/average), 使用reduceByKey或者aggregateByKey更高效。2.預設情況下,并發度取決于分區數量。我們可以傳入參數numTasks來調整并發任務數。
reduceByKey(func, [numTasks]) 首先按Key分組,然後将相同Key對應的所有Value都執行func操作得到一個值。func必須是(V, V) => V’的計算操作。numTasks作用跟上面提到的groupByKey一樣。
aggregateByKey(zeroValue, seqOp, combOp, [numTasks]) 首先按Key分組,然後對同Key的Vaue做聚集操作。When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks]) 按Key排序。通過第一個參數True/False指定是升序還是降序。
join(otherDataset, [numTasks]) 類似SQL中的連接配接(内連接配接),即(K, V) and (K, W) => (K, (V, W)),傳回所有連接配接對。外連接配接通過:leftOUterJoin(左出現右無比對為空)、rightOuterJoin(右全出現左無比對為空)、fullOuterJoin實作(左右全出現無比對為空)。
cogroup(otherDataset, [numTasks]) 對兩個RDD做groupBy。即(K, V) and (K, W) => (K, Iterable, Iterable(W))。别名groupWith。
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars]) 将驅動程式中的RDD交給shell處理(外部程序),例如Perl或bash腳本。RDD元素作為标準輸入傳給腳本,腳本處理之後的标準輸出會作為新的RDD傳回給驅動程式。
coalesce(numPartitions) 将RDD的分區數減小到numPartitions。當資料集通過過濾減小規模時,使用這個操作可以提升性能。
repartition(numPartitions) 将資料重新随機分區為numPartitions個。這會導緻整個RDD的資料在叢集網絡中洗牌。
repartitionAndSortWithinPartitions(partitioner) 使用partitioner函數充分去,并在分區内排序。這比先repartition然後在分區内sort高效,原因是這樣迫使排序操作被移到了shuffle階段。
  • 常見Actions操作
Action 含義
reduce(func) 使用func函數聚集RDD中的元素(func接收兩個參數傳回一個值)。這個函數應該滿足結合律和交換律以便能夠正确并行計算。
collect() 将RDD轉為數組傳回給驅動程式。這個在執行filter等操作之後傳回足夠小的資料集是比較有用。
count() 傳回RDD中的元素數量。
first() 傳回RDD中的第一個元素。(通take(1))
take(n) 傳回由RDD的前N個元素組成的數組。
takeSample(withReplacement, num, [seed]) 傳回num個元素的數組,這些元素抽樣自RDD,withReplacement表示是否有放回,seed是随機數生成器的種子)。
takeOrdered(n, [ordering]) 傳回RDD的前N個元素,使用自然順序或者通過ordering函數對将個元素轉換為新的Key.
saveAsTextFile(path) 将RDD元素寫入文本檔案。Spark自動調用元素的toString方法做字元串轉換。
saveAsSequenceFile(path)(Java and Scala) 将RDD儲存為Hadoop SequenceFile.這個過程機制如下:1. Pyrolite用來将序列化的Python RDD轉為Java對象RDD;2. Java RDD中的Key/Value被轉為Writable然後寫到檔案。
countByKey() 統計每個Key出現的次數,隻對(K, V)類型的RDD有效,傳回(K, int)詞典。
foreach(func) 在所有RDD元素上執行函數func。

以下是個基本的例子:

  1. 首先從外部檔案中定義一個基本的RDD
  2. lineLengths為map轉換的結果,由于使用惰性算法,lineLengths不會立刻計算出來
  3. 最後運作reduce action,這個時候spark會把計算分成不同的任務運作在單獨的機器上,每台機器會運作自己部分并傳回結果到driver program
>>> lines=sc.textFile("file:"+cwd+"/test-rdd/test-rdd.txt")
>>> lineLengths=lines.map(lambda s:len(s))
>>> lineLengths
PythonRDD[15] at RDD at PythonRDD.scala:48
>>> totalLength = lineLengths.reduce(lambda a, b: a + b)
>>> totalLength
43
           

如果想下次繼續使用lineLengths,可以使用RDD持久化,在reduce()前執行該操作,可以将lineLengths儲存在記憶體中:

>>> lineLengths.persist()
PythonRDD[12] at RDD at PythonRDD.scala:48
           
3.4 RDD持久化

Spark中最重要的能力之一是将資料持久化到記憶體中友善後續操作。當持久化一個RDD的時候,一旦該RDD在記憶體中計算出來,每個節點儲存RDD的部分分區,在其他動作中就可以重用記憶體中的這個RDD(以及源于它的新RDD)。這種機制使得後續的動作(actions)快很多(通常在10倍以上)。可以通過persist() 或者cache()兩個方法将RDD标記為持久化的。該RDD第一次在動作中計算出來之後,就會被儲存在各節點的記憶體中。

3.5 Sample API

以下是一些sample API可練習使用:

  1. Map、Reduce API:最基本入門的API
from pyspark import SparkContext
 
sc = SparkContext('local')
#第二個參數2代表的是分區數,預設為1
old=sc.parallelize([1,2,3,4,5],2)
newMap = old.map(lambda x:(x,x**2))
newReduce = old.reduce(lambda a,b : a+b)
print(newMap.glom().collect())
print(newReduce)
           

結果如下:

>>> old=sc.parallelize([1,2,3,4,5],2)
>>> newMap = old.map(lambda x:(x,x**2)) 
>>> newReduce = old.reduce(lambda a,b : a+b) 
>>> print(newMap.glom().collect())
[[(1, 1), (2, 4)], [(3, 9), (4, 16), (5, 25)]]
>>> print(newReduce)
15
           
  1. flatMap、filter、distinc API:資料的拆分、過濾和去重
sc = SparkContext('local') 
old=sc.parallelize([1,2,3,4,5])
#新的map裡将原來的每個元素拆成了3個
newFlatPartitions = old.flatMap(lambda x : (x, x+1, x*2))
#過濾,隻保留小于6的元素
newFilterPartitions = newFlatPartitions.filter(lambda x: x<6)
#去重
newDiscinctPartitions = newFilterPartitions.distinct()
print(newFlatPartitions.collect())
print(newFilterPartitions.collect())
print(newDiscinctPartitions.collect())
           

結果如下:

>>> old=sc.parallelize([1,2,3,4,5])
>>> newFlatPartitions = old.flatMap(lambda x : (x, x+1, x*2))
>>> newFlatPartitions
PythonRDD[22] at RDD at PythonRDD.scala:48
>>> newFilterPartitions = newFlatPartitions.filter(lambda x: x<6)
>>> newFilterPartitions
PythonRDD[23] at RDD at PythonRDD.scala:48
>>> newDiscinctPartitions = newFilterPartitions.distinct()
>>> newDiscinctPartitions
PythonRDD[28] at RDD at PythonRDD.scala:48
>>> print(newFlatPartitions.collect())
[1, 2, 2, 2, 3, 4, 3, 4, 6, 4, 5, 8, 5, 6, 10]
>>> print(newFilterPartitions.collect())
[1, 2, 2, 2, 3, 4, 3, 4, 4, 5, 5]
>>> print(newDiscinctPartitions.collect())
[2, 4, 1, 3, 5]
           
  1. Sample、taskSample、sampleByKey API:資料的抽樣
sc = SparkContext('local')
old=sc.parallelize(range(8))
samplePartition = [old.sample(withReplacement=True, fraction=0.5) for i in range(5)]
for num, element in zip(range(len(samplePartition)), samplePartition):
    print('sample: %s y=%s' %(str(num),str(element.collect())))
taskSamplePartition  = [old.takeSample(withReplacement=False, num=4) for i in range(5)]
for num, element in zip(range(len(taskSamplePartition)), taskSamplePartition) :
    #注意因為是action,是以element是集合對象,而不是rdd的分區
    print('taskSample: %s y=%s' %(str(num),str(element)))
mapRdd = sc.parallelize([('B',1),('A',2),('C',3),('D',4),('E',5)])
y = [mapRdd.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2, 'D':0.6, 'E':0.8}) for i in range(5)]
for num, element in zip(range(len(y)), y) :
    #注意因為是action,是以element是集合對象,而不是rdd的分區
    print('y: %s y=%s' %(str(num),str(element.collect())))
           

結果如下:

>>> old=sc.parallelize(range(8))
>>> samplePartition = [old.sample(withReplacement=True, fraction=0.5) for i in range(5)]
>>> for num, element in zip(range(len(samplePartition)), samplePartition):
...     print('sample: %s y=%s' %(str(num),str(element.collect())))
... 
sample: 0 y=[6, 7]
sample: 1 y=[0, 2, 3, 3, 5]
sample: 2 y=[0, 0, 2, 6, 6]
sample: 3 y=[]
sample: 4 y=[1, 5, 6]
>>> taskSamplePartition  = [old.takeSample(withReplacement=False, num=4) for i in range(5)] 
>>> for num, element in zip(range(len(taskSamplePartition)), taskSamplePartition) : 
...     print('taskSample: %s y=%s' %(str(num),str(element)))
... 
taskSample: 0 y=[4, 2, 7, 1]
taskSample: 1 y=[1, 7, 6, 2]
taskSample: 2 y=[0, 1, 6, 5]
taskSample: 3 y=[7, 5, 3, 6]
taskSample: 4 y=[3, 5, 7, 2]
>>> mapRdd = sc.parallelize([('B',1),('A',2),('C',3),('D',4),('E',5)])
>>> y = [mapRdd.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2, 'D':0.6, 'E':0.8}) for i in range(5)]
>>> for num, element in zip(range(len(y)), y):
...     print('y: %s y=%s' %(str(num),str(element.collect())))
... 
y: 0 y=[('B', 1), ('A', 2), ('C', 3), ('D', 4), ('E', 5)]
y: 1 y=[('B', 1), ('A', 2), ('E', 5)]
y: 2 y=[('B', 1), ('A', 2), ('E', 5)]
y: 3 y=[('B', 1), ('A', 2), ('D', 4), ('E', 5)]
y: 4 y=[('B', 1), ('A', 2), ('E', 5)]
>>>
           
  1. 交集intersection、并集union、排序sortBy API
sc = SparkContext('local')
rdd1 = sc.parallelize(['C','A','B','B'])
rdd2 = sc.parallelize(['A','A','D','E','B'])
rdd3 = rdd1.union(rdd2)
rdd4 = rdd1.intersection(rdd2)
print(rdd3.collect())
print(rdd4.collect())
print(rdd3.sortBy(lambda x : x[0]).collect())
           

結果如下:

>>> rdd1 = sc.parallelize(['C','A','B','B'])
>>> rdd2 = sc.parallelize(['A','A','D','E','B'])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd4 = rdd1.intersection(rdd2)
>>> print(rdd3.collect())
['C', 'A', 'B', 'B', 'A', 'A', 'D', 'E', 'B']
>>> print(rdd4.collect())
['A', 'B']
>>> print(rdd3.sortBy(lambda x : x[0]).collect())
['A', 'A', 'A', 'B', 'B', 'B', 'C', 'D', 'E']
           
  1. reduceByKey、 reduceByKeyLocal API

    這兩個要計算的效果是一樣的,但是前者是傳輸,後者是動作,使用時候需要注意

sc = SparkContext('local')
oldRdd=sc.parallelize([('Key1',1),('Key3',2),('Key1',3),('Key2',4),('Key2',5)])
newRdd = oldRdd.reduceByKey(lambda accumulate,ele : accumulate+ele)
newActionResult = oldRdd.reduceByKeyLocally(lambda accumulate,ele : accumulate+ele)
print(newRdd.collect())
print(newActionResult)
           

結果如下:

>>> oldRdd=sc.parallelize([('Key1',1),('Key3',2),('Key1',3),('Key2',4),('Key2',5)])
>>> newRdd = oldRdd.reduceByKey(lambda accumulate,ele : accumulate+ele)
>>> newActionResult = oldRdd.reduceByKeyLocally(lambda accumulate,ele : accumulate+ele)
>>> print(newRdd.collect())
[('Key3', 2), ('Key1', 4), ('Key2', 9)]
>>> print(newActionResult)
{'Key3': 2, 'Key2': 9, 'Key1': 4}
           
參考資料
  1. Spark官網:“http://spark.apache.org/docs/latest/rdd-programming-guide.html”
  2. PySpark官方文檔:https://spark.apache.org/docs/latest/api/python/index.html

轉載請注明原文位址:https://blog.csdn.net/solihawk/article/details/116028907

文章會同步在公衆号“牧羊人的方向”更新,感興趣的可以關注公衆号,謝謝!

大資料系列之PySpark配置及RDD操作