PySpark環境搭建
配置hadoop
spark通路本地檔案并執行運算時,可能會遇到權限問題或是dll錯誤。這是因為spark需要使用到Hadoop的winutils和hadoop.dll,首先我們必須配置好Hadoop相關的環境。可以到github下載下傳:https://github.com/4ttty/winutils
gitcode提供了鏡像加速:https://gitcode.net/mirrors/4ttty/winutils
我選擇了使用這個倉庫提供的最高的Hadoop版本3.0.0将其解壓到D:\deploy\hadoop-3.0.0目錄下,然後配置環境變量:
我們還需要将對應的hadoop.dll複制到系統中,用指令表達就是:
copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32
不過這步需要擁有管理者權限才可以操作。
為了能夠在任何地方使用winutils指令工具,将
%HADOOP_HOME%\bin
目錄加入環境變量中:
安裝pyspark與Java
首先,我們安裝spark目前(2022-2-17)的最新版本:
pip install pyspark==3.2.1
需要注意pyspark的版本決定了jdk的最高版本,例如假如安裝2.4.5版本的pyspark就隻能安裝1.8版本的jdk,否則會報出
java.lang.IllegalArgumentException: Unsupported class file major version 55
的錯誤。
這是因為pyspark内置了Scala,而Scala是基于jvm的程式設計語言,Scala與jdk的版本存在相容性問題,JDK與scala的版本相容性表:
JDK version | Minimum Scala versions | Recommended Scala versions |
17 | 2.13.6, 2.12.15 (forthcoming) | 2.13.6, 2.12.15 (forthcoming) |
16 | 2.13.5, 2.12.14 | 2.13.6, 2.12.14 |
13, 14, 15 | 2.13.2, 2.12.11 | 2.13.6, 2.12.14 |
12 | 2.13.1, 2.12.9 | 2.13.6, 2.12.14 |
11 | 2.13.0, 2.12.4, 2.11.12 | 2.13.6, 2.12.14, 2.11.12 |
8 | 2.13.0, 2.12.0, 2.11.0, 2.10.2 | 2.13.6, 2.12.14, 2.11.12, 2.10.7 |
6, 7 | 2.11.0, 2.10.0 | 2.11.12, 2.10.7 |
目前3.2.1版本的pyspark内置的Scala版本為2.12.15,意味着jdk17與其以下的所有版本都支援。
這裡我依然選擇安裝jdk8的版本:
測試一下:
>java -version
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)
jdk11的詳細安裝教程(jdk1.8在官網隻有安裝包,無zip綠化壓縮包):
Java11的環境配置與Python調用Java
graphframes安裝
pip安裝目前最新的graphframes:
pip install graphframes==0.6
然後在官網下載下傳graphframes的jar包。
下載下傳位址:https://spark-packages.org/package/graphframes/graphframes
由于安裝的pyspark版本是3.2,是以這裡我選擇了這個jar包:
然後将該jar包放入pyspark安裝目錄的jars目錄下:
pyspark安裝位置可以通過pip檢視:
C:\Users\ASUS>pip show pyspark
Name: pyspark
Version: 3.2.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: [email protected]
License: http://www.apache.org/licenses/LICENSE-2.0
Location: d:\miniconda3\lib\site-packages
Requires: py4j
Required-by:
使用方法
學習pyspark的最佳路徑是官網:https://spark.apache.org/docs/latest/quick-start.html
在下面的網頁,官方提供了線上jupyter:
https://spark.apache.org/docs/latest/api/python/getting_started/index.html
啟動spark并讀取資料
本地模式啟動spark:
from pyspark.sql import SparkSession, Row
spark = SparkSession \
.builder \
.appName("Python Spark") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
spark
SparkSession輸出的内容中包含了spark的web頁面,新标簽頁打開頁面後大緻效果如上。
點選Environment頁籤可以檢視目前環境中的變量:
啟動hive支援
找到pyspark的安裝位置,例如我的電腦在D:\Miniconda3\Lib\site-packages\pyspark
手動建立conf目錄并将hive-site.xml配置檔案複制到其中。如果hive使用了MySQL作為原資料庫,則還需要将MySQL對應的驅動jar包放入spark的jars目錄下。
建立spark會話對象時可通過
enableHiveSupport()
開啟hive支援:
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
spark
spark通路hive自己建立的表有可能會出現如下的權限報錯:
Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------
是因為目前使用者不具備對\tmp\hive的操作權限:
>winutils ls \tmp\hive
drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May 4 2020 \tmp\hive
把\tmp\hive目錄的權限改為777即可順利通路:
>winutils chmod 777 \tmp\hive
>winutils ls \tmp\hive
drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May 4 2020 \tmp\hive
Spark的DataFrame與RDD
從spark2.x開始将RDD和DataFrame的API統一抽象成dataset,DataFrame就是Dataset[Row],RDD則是Dataset.rdd。可以将DataFrame了解為包含結構化資訊的RDD。
将含row的RDD轉換為DataFrame隻需要調用toDF方法或SparkSession的createDataFrame方法即可,也可以傳入schema覆寫類型或名稱設定。
DataFrame的基礎api
DataFrame預設支援DSL風格文法,例如:
//檢視DataFrame中的内容
df.show()
//檢視DataFrame部分列中的内容
df.select(df['name'], df['age'] + 1).show()
df.select("name").show()
//列印DataFrame的Schema資訊
df.printSchema()
//過濾age大于等于 21 的
df.filter(df['age'] > 21).show()
//按年齡進行分組并統計相同年齡的人數
personDF.groupBy("age").count().show()
将DataFrame注冊成表或視圖之後即可進行純SQL操作:
df.createOrReplaceTempView("people")
//df.createTempView("t_person")
//查詢年齡最大的前兩名
spark.sql("select * from t_person order by age desc limit 2").show()
//顯示表的Schema資訊
spark.sql("desc t_person").show()
Pyspark可以直接很友善的注冊udf并直接使用:
strlen = spark.udf.register("len", lambda x: len(x))
print(spark.sql("SELECT len('test') length").collect())
print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())
執行結果:
[Row(length='4')]
[Row(length='3')]
RDD的簡介
DataFrame的本質是對RDD的包裝,可以了解為DataFrame=RDD[Row]+schema。
RDD(A Resilient Distributed Dataset)叫做彈性可伸縮分布式資料集,是Spark中最基本的資料抽象。它代表一個不可變、自動容錯、可伸縮性、可分區、裡面的元素可并行計算的集合。
在每一個RDD内部具有五大屬性:
- 具有一系列的分區
- 一個計算函數操作于每一個切片
- 具有一個對其他RDD的依賴清單
- 對于 key-value RDDs具有一個Partitioner分區器
- 存儲每一個切片最佳計算位置
一組分片(Partition),即資料集的基本組成機關。對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會采用預設值。預設值就是程式所配置設定到的CPU Core的數目。
**一個計算每個分區的函數。**Spark中RDD的計算是以分片為機關的,每個RDD都會實作compute函數以達到這個目的。compute函數會對疊代器進行複合,不需要儲存每次計算的結果。
**RDD之間的依賴關系。**RDD的每次轉換都會生成一個新的RDD,是以RDD之間就會形成類似于流水線一樣的前後依賴關系。在部分分區資料丢失時,Spark可以通過這個依賴關系重新計算丢失的分區資料,而不是對RDD的所有分區進行重新計算。
**一個Partitioner,即RDD的分片函數。**目前Spark中實作了兩種類型的分片函數,一個是基于哈希的HashPartitioner,另外一個是基于範圍的RangePartitioner。隻有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
**一個清單,存儲存取每個Partition的優先位置(preferred location)。**對于一個HDFS檔案來說,這個清單儲存的就是每個Partition所在的塊的位置。按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地将計算任務配置設定到其所要處理資料塊的存儲位置。
RDD的API概覽
RDD包含Transformation API和 Action API,Transformation API都是延遲加載的隻是記住這些應用到基礎資料集上的轉換動作,隻有當執行Action API時這些轉換才會真正運作。
Transformation API産生的兩類RDD最重要,分别是MapPartitionsRDD和ShuffledRDD。
産生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是map和flatMap,但任何産生MapPartitionsRDD的算子都可以直接使用mapPartitions或mapPartitionsWithIndex實作。
産生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。
combineByKey到groupByKey 底層均是調用combineByKeyWithClassTag方法:
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners
,defaultPartitioner(self))
}
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
三個重要參數的含義:
- createCombiner:根據每個分區的第一個元素操作産生一個初始值
- mergeValue:對每個分區内部的元素進行疊代合并
- mergeCombiners:對所有分區的合并結果進行合并
groupByKey的partitioner未指定時會傳入預設的defaultPartitioner。例如:
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length)
a.groupByKey.collect
res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))
aggregateByKey:每個分區使用zeroValue作為初始值,疊代每一個元素用seqOp進行合并,對所有分區的結果用combOp進行合并。例如:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
reduceByKey :每個分區疊代每一個元素用func進行合并,對所有分區的結果用func再進行合并,例如:
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
Action API有:
動作 | 含義 |
reduce(func) | 通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可并聯的 |
collect() | 在驅動程式中,以數組的形式傳回資料集的所有元素 |
count() | 傳回RDD的元素個數 |
first() | 傳回RDD的第一個元素(類似于take(1)) |
take(n) | 傳回一個由資料集的前n個元素組成的數組 |
takeSample(withReplacement*,*num, [seed]) | 傳回一個數組,該數組由從資料集中随機采樣的num個元素組成,可以選擇是否用随機數替換不足的部分,seed用于指定随機數生成器種子 |
takeOrdered(n, [ordering]) | 排序并取前N個元素 |
saveAsTextFile(path) | 将資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對于每個元素,Spark将會調用toString方法,将它裝換為檔案中的文本 |
saveAsSequenceFile(path) | 将資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。 |
saveAsObjectFile(path) | 将RDD中的元素用NullWritable作為key,實際元素作為value儲存為sequencefile格式 |
countByKey() | 針對(K,V)類型的RDD,傳回一個(K,Int)的map,表示每一個key對應的元素個數。 |
foreach(func) | 在資料集的每一個元素上,運作函數func進行更新。 |
spark模拟實作mapreduce版wordcount:
object MapreduceWordcount {
def main(args: Array[String]): Unit = {
import org.apache.spark._
val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
sc.setLogLevel("WARN")
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.rdd.HadoopRDD
import scala.collection.mutable.ArrayBuffer
def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {
for (word <- v.toString.split("\\s+"))
collect += ((word, 1))
}
def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {
collect += ((key, value.sum))
}
val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
.asInstanceOf[HadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((split, it) =>{
val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
it.foreach(kv => map(kv._1, kv._2, collect))
collect.toIterator
})
.repartitionAndSortWithinPartitions(new HashPartitioner(2))
.mapPartitions(it => {
val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
var lastKey: String = ""
var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
for ((currKey, value) <- it) {
if (!currKey.equals(lastKey)) {
if (values.length != 0)
reduce(lastKey, values.toIterator, collect)
values.clear()
}
values += value
lastKey = currKey
}
if (values.length != 0) reduce(lastKey, values.toIterator, collect)
collect.toIterator
})
rdd.foreach(println)
}
}
各類RDD
- ShuffledRDD :表示需要走Shuffle過程的網絡傳輸
- CoalescedRDD :用于将一台機器的多個分區合并成一個分區
- CartesianRDD :對兩個RDD的所有元素産生笛卡爾積
- MapPartitionsRDD :用于對每個分區的資料進行特定的處理
- CoGroupedRDD :用于将2~4個rdd,按照key進行連接配接聚合
- SubtractedRDD :用于對2個RDD求差集
- UnionRDD和PartitionerAwareUnionRDD :用于對2個RDD求并集
- ZippedPartitionsRDD2:zip拉鍊操作産生的RDD
- ZippedWithIndexRDD:給每一個元素标記一個自增編号
- PartitionwiseSampledRDD:用于對rdd的元素按照指定的百分比進行随機采樣
當我們需要給Datafream添加自增列時,可以使用zipWithUniqueId方法:
from pyspark.sql.types import StructType, LongType
schema = data.schema.add(StructField("id", LongType()))
rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1]))
data = rowRDD.toDF(schema)
data.show()
API用法詳情可參考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD
cache&checkpoint
RDD通過persist方法或cache方法可以将前面的計算結果緩存,但是并不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD将會被緩存在計算節點的記憶體中,并供後面重用。
rdd.persist()
checkpoint的源碼注釋可以看到:
- 标記該RDD作為檢查點。
- 它将被儲存在通過SparkContext#setCheckpointDir方法設定的檢查點目錄中
- 它所引用的所有父RDD引用将全部被移除
- 這個方法在這個RDD上必須在所有job執行前運作。
- 強烈建議将這個RDD緩存在記憶體中,否則這個儲存檔案的計算任務将重新計算。
從中我們得知,在執行checkpoint方法時,最好同時,将該RDD緩存起來,否則,checkpoint也會産生一個計算任務。
sc.setCheckpointDir("checkpoint")
rdd.cache()
rdd.checkpoint()
graphframes 的用法
GraphFrame是将Spark中的Graph算法統一到DataFrame接口的Graph操作接口,為Scala、Java和Python提供了統一的圖處理API。
Graphframes是開源項目,源碼工程如下:https://github.com/graphframes/graphframes
可以參考:
- 官網:https://graphframes.github.io/graphframes/docs/_site/index.html
- GraphFrames使用者指南-Python — Databricks文檔:https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html
在GraphFrames中圖的頂點(Vertex)和邊(edge)都是以DataFrame形式存儲的:
- 頂點DataFrame:必須包含列名為“id”的列,用于作為頂點的唯一辨別
- 邊DataFrame:必須包含列名為“src”和“dst”的列,根據唯一辨別id辨別關系
建立圖的示例:
from graphframes import GraphFrame
vertices = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)], ["id", "name", "age"])
edges = spark.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
], ["src", "dst", "relationship"])
# 生成圖
g = GraphFrame(vertices, edges)
GraphFrame提供三種視圖:
print("頂點表視圖:")
graph.vertices.show() # graph.vertices 就是原始的vertices
print("邊表視圖:")
graph.edges.show() # graph.edges 就是原始的 edges
print("三元組視圖:")
graph.triplets.show()
擷取頂點的度、入度和出度:
# 頂點的度
graph.degrees.show()
# 頂點的入度
graph.inDegrees.show()
# 頂點的出度
graph.outDegrees.show()
Motif finding (模式發現)
示例:
# 多個路徑條件
motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
# 在搜尋的結果上進行過濾
motif.filter("b.age > 30")
# 不需要傳回路徑中的元素時,可以使用匿名頂點和邊
motif = graph.find("(start)-[]->()")
# 設定路徑不存在的條件
motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")
假設我們要想給使用者推薦關注的人,可以找出這樣的關系:A關注B,B關注C,但是A未關注C。找出這樣的關系就可以把C推薦給A:
# Motif: A->B->C but not A->C
results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")
# 排除自己
results = results.filter("A.id != C.id")
# 選擇需要的列
results = results.select(results.A.id.alias("A"), results.C.id.alias("C"))
results.show()
結果:
+---+---+
| A| C|
+---+---+
| e| c|
| e| a|
| d| b|
| a| d|
| f| b|
| d| e|
| a| f|
| a| c|
+---+---+
Motif在查找路徑過程的過程中,還可以沿着路徑攜帶狀态。例如我們想要找出關系鍊有4個頂點,而且其中3條邊全部都是"friend"關系:
from pyspark.sql.functions import col, lit, when
from functools import reduce
chain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
def sumFriends(cnt, relationship):
"定義下一個頂點更新狀态的條件:如果關系為friend則cnt+1"
return when(relationship == "friend", cnt+1).otherwise(cnt)
# 将更新方法應用到整個鍊的,鍊上每有一個關系是 friend 就加一,鍊上共三個關系。
condition = reduce(lambda cnt, e: sumFriends(
cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))
chainWith2Friends2 = chain4.where(condition >= 3)
chainWith2Friends2.show()
結果:
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| a| ab| b| bc| c| cd| d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}| {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
Subgraphs 子圖
可以直接過濾其頂點或邊,
dropIsolatedVertices()
方法用于删除孤立沒有連接配接的點:
graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()
還可以基于模式發現擷取到的邊建立Subgraphs :
paths = graph.find("(a)-[e]->(b)")\
.filter("e.relationship = 'follow'")\
.filter("a.age < b.age")
# 抽取邊資訊e2 = paths.select("e.src", "e.dst", "e.relationship")
e2 = paths.select("e.*")
# 建立Subgraphs
g2 = GraphFrame(graph.vertices, e2)
GraphFrames支援的GraphX算法
- PageRank:查找圖中的重要頂點。
- 廣度優先搜尋(BFS):查找從一組頂點到另一組頂點的最短路徑
- 連通元件(ConnectedComponents):為具備連接配接關系的頂點配置設定相同的元件ID
- 強連通元件(StronglyConnectedConponents):根據每個頂點的強連通分量配置設定SCC。
- 最短路徑(Shortest paths):查找從每個頂點到目标頂點集的最短路徑。
- 三角形計數(TriangleCount):計算每個頂點所屬的三角形的數量,經常用于确定組的穩定性(互相連接配接的數量代表了穩定性)或作為其他網絡度量(如聚類系數)的一部分,在社交網絡分析中用來檢測社群。
- 标簽傳播算法(LPA):檢測圖中的社群。
pageRank算法:
results = graph.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.sort("pagerank", ascending=False).show()
結果:
+---+-------+---+-------------------+
| id| name|age| pagerank|
+---+-------+---+-------------------+
| b| Bob| 36| 2.7025217677349773|
| c|Charlie| 30| 2.6667877057849627|
| a| Alice| 34| 0.4485115093698443|
| e| Esther| 32| 0.3613490987992571|
| f| Fanny| 36|0.32504910549694244|
| d| David| 29|0.32504910549694244|
| g| Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+
可以設定起始頂點:
graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")
graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)
廣度優先搜尋BFS:
搜尋從姓名叫Esther到年齡小于32的最小路徑:
paths = graph.bfs("name = 'Esther'", "age < 32")
paths.show()
+--------------+--------------+---------------+
| from| e0| to|
+--------------+--------------+---------------+
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+--------------+--------------+---------------+
可以指定隻能在指定的邊搜尋:
graph.bfs("name = 'Esther'",
"age < 32",
edgeFilter="relationship != 'friend'",
maxPathLength=4
).show()
+---------------+--------------+--------------+--------------+----------------+
| from| e0| v1| e1| to|
+---------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+----------------+
Connected components 連通元件:
必須先設定檢查點:
sc.setCheckpointDir("checkpoint")
graph.connectedComponents().show()
結果:
+---+-------+---+------------+
| id| name|age| component|
+---+-------+---+------------+
| a| Alice| 34|412316860416|
| b| Bob| 36|412316860416|
| c|Charlie| 30|412316860416|
| d| David| 29|412316860416|
| e| Esther| 32|412316860416|
| f| Fanny| 36|412316860416|
| g| Gabby| 60|146028888064|
+---+-------+---+------------+
可以看到僅g點在一個連通區域内,可以調用
dropIsolatedVertices()
方法,删除這種孤立的沒有連接配接的點:
graph.dropIsolatedVertices().connectedComponents().show()
結果:
+---+-------+---+------------+
| id| name|age| component|
+---+-------+---+------------+
| a| Alice| 34|412316860416|
| b| Bob| 36|412316860416|
| c|Charlie| 30|412316860416|
| d| David| 29|412316860416|
| e| Esther| 32|412316860416|
| f| Fanny| 36|412316860416|
+---+-------+---+------------+
Strongly connected components 強連通元件:
graph.stronglyConnectedComponents(maxIter=10).show()
Shortest paths 最短路徑:
每個頂點到a或d的最短路徑:
graph.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+----------------+
| id| name|age| distances|
+---+-------+---+----------------+
| g| Gabby| 60| {}|
| f| Fanny| 36| {}|
| e| Esther| 32|{a -> 2, d -> 1}|
| d| David| 29|{a -> 1, d -> 0}|
| c|Charlie| 30| {}|
| b| Bob| 36| {}|
| a| Alice| 34|{a -> 0, d -> 2}|
+---+-------+---+----------------+
Triangle count 三角形計數:
graph.triangleCount().show()
+-----+---+-------+---+
|count| id| name|age|
+-----+---+-------+---+
| 1| a| Alice| 34|
| 0| b| Bob| 36|
| 0| c|Charlie| 30|
| 1| d| David| 29|
| 1| e| Esther| 32|
| 0| g| Gabby| 60|
| 0| f| Fanny| 36|
+-----+---+-------+---+
說明頂點a/e/d構成三角形。
标簽傳播算法(LPA):
graph.labelPropagation(maxIter=5).orderBy("label").show()
+---+-------+---+-------------+
| id| name|age| label|
+---+-------+---+-------------+
| g| Gabby| 60| 146028888064|
| f| Fanny| 36|1047972020224|
| b| Bob| 36|1047972020224|
| a| Alice| 34|1382979469312|
| c|Charlie| 30|1382979469312|
| e| Esther| 32|1460288880640|
| d| David| 29|1460288880640|
+---+-------+---+-------------+
PySpark3.X與pandas融合
Pyspark從3.0版本開始出現了pandas_udf裝飾器、applyInPandas和mapInPandas,基于這些方法,我們就可以使用熟悉的pandas的文法處理spark對象的資料。
首先建立幾條測試資料,并啟動 Apache Arrow:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df.show()
自定義UDF和UDAF
pyspark暫不支援自定義UDTF。
使用pandas_udf裝飾器我們可以建立出基于pandas的udf自定義函數,在DSL的文法中可以被直接使用:
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf("double")
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
df.select(multiply_func("id", "v").alias("product")).show()
注冊函數和視圖後,可以直接在SQL中使用:
df.createOrReplaceTempView("t")
spark.udf.register("multiply", multiply_func)
spark.sql('select multiply(id, v) product from t').show()
結果均為:
+-------+
|product|
+-------+
| 1.0|
| 2.0|
| 6.0|
| 10.0|
| 20.0|
+-------+
還支援聚合函數和視窗函數:
from pyspark.sql import Window
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
# 對字段'v'進行求均值
df.select(mean_udf('v').alias("mean_v")).show()
# 按照‘id’分組,求'v'的均值
df.groupby("id").agg(mean_udf('v').alias("mean_v")).show()
# 按照‘id’分組,求'v'的均值,并指派給新的一列
df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()
注冊到udf之後同樣可以直接使用SQL實作:
spark.udf.register("mean2", mean_udf)
spark.sql('select mean2(v) mean_v from t').show()
spark.sql('select id,mean2(v) mean_v from t group by id').show()
spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()
結果均為:
+--------+
| mean_v |
+--------+
| 4.2|
+--------+
+---+--------+
| id| mean_v |
+---+--------+
| 1| 1.5|
| 2| 6.0|
+---+--------+
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
+---+----+------+
分組聚合與JOIN
applyInPandas需要在datafream調用groupby之後才能使用:
def subtract_mean(pdf):
v = pdf.v
pdf['v1'] = v - v.mean()
pdf['v2'] = v + v.mean()
return pdf
t = df.groupby("id")
t.applyInPandas(
subtract_mean, schema="id long, v double, v1 double, v2 double").show()
結果:
+---+----+----+----+
| id| v| v1| v2|
+---+----+----+----+
| 1| 1.0|-0.5| 2.5|
| 1| 2.0| 0.5| 3.5|
| 2| 3.0|-3.0| 9.0|
| 2| 5.0|-1.0|11.0|
| 2|10.0| 4.0|16.0|
+---+----+----+----+
subtract_mean函數接收的是對應id的dataframe資料,schema指定了傳回值的名稱和類型清單。
通過以下代碼我們可以知道,applyInPandas可以借助cogroup進行表連接配接:
val a = sc.parallelize(List(1, 2, 1, 3))
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
val e = a.map((_, "e"))
scala> b.cogroup(c).foreach(println)
(3,(CompactBuffer(b),CompactBuffer(c)))
(1,(CompactBuffer(b, b),CompactBuffer(c, c)))
(2,(CompactBuffer(b),CompactBuffer(c)))
示例:
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
# l、r is a pandas.DataFrame
# 這裡是按照id分組
# 那麼,l和r分别是對應id的df1和df2資料
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
map疊代
執行以下代碼:
def filter_func(iterator):
for i, pdf in enumerate(iterator):
print(i, pdf.values.tolist())
yield pdf
df.mapInPandas(filter_func, schema=df.schema).show()
背景看到執行結果為:
0 [[2.0, 5.0]]
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]
前台結果幾乎保持原樣。可以知道iterator是一個分區疊代器,疊代出目前分區的每一行資料都被封裝成一個pandas對象。
Pyspark與Pandas的互動
将spark的Datafream對象轉換為原生的pandas對象隻需調用toPandas()方法即可:
sdf.toPandas()
将原生的pandas對象轉換為spark對象可以使用spark的頂級方法:
spark.createDataFrame(pdf)
習慣使用pandas的童鞋,還可以直接使用pandas-on-Spark,在spark3.2.0版本時已經比對到pandas 1.3版本的API。通過pandas-on-Spark,我們可以完全用pandas的api操作資料,而底層執行卻是spark的并行化。
使用pandas-on-Spark最好設定一下環境變量:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
将spark對象轉換為pandas-on-Spark對象:
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
pdf = df.to_pandas_on_spark()
print(type(pdf))
pdf
pandas-on-Spark對象也可以還原成spark對象:
pdf.to_spark()
另外spark提供直接将檔案讀取成pandas-on-Spark對象的api,例如:
import pyspark.pandas as ps
pdf = ps.read_csv("example_csv.csv")
ps對象與原生pandas對象的API幾乎完全一緻。
ps.sql("SELECT count(*) as num FROM {pdf}")