天天看點

PySpark與GraphFrames的安裝與使用

PySpark環境搭建

配置hadoop

spark通路本地檔案并執行運算時,可能會遇到權限問題或是dll錯誤。這是因為spark需要使用到Hadoop的winutils和hadoop.dll,首先我們必須配置好Hadoop相關的環境。可以到github下載下傳:​​https://github.com/4ttty/winutils​​

gitcode提供了鏡像加速:​​https://gitcode.net/mirrors/4ttty/winutils​​

我選擇了使用這個倉庫提供的最高的Hadoop版本3.0.0将其解壓到D:\deploy\hadoop-3.0.0目錄下,然後配置環境變量:

PySpark與GraphFrames的安裝與使用

我們還需要将對應的hadoop.dll複制到系統中,用指令表達就是:

copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32      

不過這步需要擁有管理者權限才可以操作。

為了能夠在任何地方使用winutils指令工具,将​

​%HADOOP_HOME%\bin​

​目錄加入環境變量中:

PySpark與GraphFrames的安裝與使用

安裝pyspark與Java

首先,我們安裝spark目前(2022-2-17)的最新版本:

pip install pyspark==3.2.1      

需要注意pyspark的版本決定了jdk的最高版本,例如假如安裝2.4.5版本的pyspark就隻能安裝1.8版本的jdk,否則會報出​

​java.lang.IllegalArgumentException: Unsupported class file major version 55​

​的錯誤。

這是因為pyspark内置了Scala,而Scala是基于jvm的程式設計語言,Scala與jdk的版本存在相容性問題,JDK與scala的版本相容性表:

JDK version Minimum Scala versions Recommended Scala versions
17 2.13.6, 2.12.15 (forthcoming) 2.13.6, 2.12.15 (forthcoming)
16 2.13.5, 2.12.14 2.13.6, 2.12.14
13, 14, 15 2.13.2, 2.12.11 2.13.6, 2.12.14
12 2.13.1, 2.12.9 2.13.6, 2.12.14
11 2.13.0, 2.12.4, 2.11.12 2.13.6, 2.12.14, 2.11.12
8 2.13.0, 2.12.0, 2.11.0, 2.10.2 2.13.6, 2.12.14, 2.11.12, 2.10.7
6, 7 2.11.0, 2.10.0 2.11.12, 2.10.7

目前3.2.1版本的pyspark内置的Scala版本為2.12.15,意味着jdk17與其以下的所有版本都支援。

這裡我依然選擇安裝jdk8的版本:

PySpark與GraphFrames的安裝與使用

測試一下:

>java -version
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)      

jdk11的詳細安裝教程(jdk1.8在官網隻有安裝包,無zip綠化壓縮包):

Java11的環境配置與Python調用Java

graphframes安裝

pip安裝目前最新的graphframes:

pip install graphframes==0.6      

然後在官網下載下傳graphframes的jar包。

下載下傳位址:​​https://spark-packages.org/package/graphframes/graphframes​​

由于安裝的pyspark版本是3.2,是以這裡我選擇了這個jar包:

PySpark與GraphFrames的安裝與使用

然後将該jar包放入pyspark安裝目錄的jars目錄下:

PySpark與GraphFrames的安裝與使用

pyspark安裝位置可以通過pip檢視:

C:\Users\ASUS>pip show pyspark
Name: pyspark
Version: 3.2.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: [email protected]
License: http://www.apache.org/licenses/LICENSE-2.0
Location: d:\miniconda3\lib\site-packages
Requires: py4j
Required-by:      

使用方法

學習pyspark的最佳路徑是官網:​​https://spark.apache.org/docs/latest/quick-start.html​​

在下面的網頁,官方提供了線上jupyter:

​​https://spark.apache.org/docs/latest/api/python/getting_started/index.html​​

PySpark與GraphFrames的安裝與使用

啟動spark并讀取資料

本地模式啟動spark:

from pyspark.sql import SparkSession, Row

spark = SparkSession \
    .builder \
    .appName("Python Spark") \
    .master("local[*]") \
    .getOrCreate()
sc = spark.sparkContext
spark      
PySpark與GraphFrames的安裝與使用

SparkSession輸出的内容中包含了spark的web頁面,新标簽頁打開頁面後大緻效果如上。

點選Environment頁籤可以檢視目前環境中的變量:

PySpark與GraphFrames的安裝與使用

啟動hive支援

找到pyspark的安裝位置,例如我的電腦在D:\Miniconda3\Lib\site-packages\pyspark

手動建立conf目錄并将hive-site.xml配置檔案複制到其中。如果hive使用了MySQL作為原資料庫,則還需要将MySQL對應的驅動jar包放入spark的jars目錄下。

建立spark會話對象時可通過​

​enableHiveSupport()​

​開啟hive支援:

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext
spark      

spark通路hive自己建立的表有可能會出現如下的權限報錯:

Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------      

是因為目前使用者不具備對\tmp\hive的操作權限:

>winutils ls \tmp\hive
drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive      

把\tmp\hive目錄的權限改為777即可順利通路:

>winutils chmod 777 \tmp\hive

>winutils ls \tmp\hive
drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive      

Spark的DataFrame與RDD

從spark2.x開始将RDD和DataFrame的API統一抽象成dataset,DataFrame就是Dataset[Row],RDD則是Dataset.rdd。可以将DataFrame了解為包含結構化資訊的RDD。

将含row的RDD轉換為DataFrame隻需要調用toDF方法或SparkSession的createDataFrame方法即可,也可以傳入schema覆寫類型或名稱設定。

DataFrame的基礎api

DataFrame預設支援DSL風格文法,例如:

//檢視DataFrame中的内容
df.show()

//檢視DataFrame部分列中的内容
df.select(df['name'], df['age'] + 1).show()
df.select("name").show()

//列印DataFrame的Schema資訊
df.printSchema()

//過濾age大于等于 21 的
df.filter(df['age'] > 21).show()

//按年齡進行分組并統計相同年齡的人數
personDF.groupBy("age").count().show()      

将DataFrame注冊成表或視圖之後即可進行純SQL操作:

df.createOrReplaceTempView("people")
//df.createTempView("t_person")

//查詢年齡最大的前兩名
spark.sql("select * from t_person order by age desc limit 2").show()
//顯示表的Schema資訊
spark.sql("desc t_person").show()      

Pyspark可以直接很友善的注冊udf并直接使用:

strlen = spark.udf.register("len", lambda x: len(x))
print(spark.sql("SELECT len('test') length").collect())
print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())      

執行結果:

[Row(length='4')]
[Row(length='3')]      

RDD的簡介

DataFrame的本質是對RDD的包裝,可以了解為DataFrame=RDD[Row]+schema。

RDD(A Resilient Distributed Dataset)叫做彈性可伸縮分布式資料集,是Spark中最基本的資料抽象。它代表一個不可變、自動容錯、可伸縮性、可分區、裡面的元素可并行計算的集合。

在每一個RDD内部具有五大屬性:

  1. 具有一系列的分區
  2. 一個計算函數操作于每一個切片
  3. 具有一個對其他RDD的依賴清單
  4. 對于 key-value RDDs具有一個Partitioner分區器
  5. 存儲每一個切片最佳計算位置

一組分片(Partition),即資料集的基本組成機關。對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會采用預設值。預設值就是程式所配置設定到的CPU Core的數目。

**一個計算每個分區的函數。**Spark中RDD的計算是以分片為機關的,每個RDD都會實作compute函數以達到這個目的。compute函數會對疊代器進行複合,不需要儲存每次計算的結果。

**RDD之間的依賴關系。**RDD的每次轉換都會生成一個新的RDD,是以RDD之間就會形成類似于流水線一樣的前後依賴關系。在部分分區資料丢失時,Spark可以通過這個依賴關系重新計算丢失的分區資料,而不是對RDD的所有分區進行重新計算。

**一個Partitioner,即RDD的分片函數。**目前Spark中實作了兩種類型的分片函數,一個是基于哈希的HashPartitioner,另外一個是基于範圍的RangePartitioner。隻有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

**一個清單,存儲存取每個Partition的優先位置(preferred location)。**對于一個HDFS檔案來說,這個清單儲存的就是每個Partition所在的塊的位置。按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地将計算任務配置設定到其所要處理資料塊的存儲位置。

RDD的API概覽

RDD包含Transformation API和 Action API,Transformation API都是延遲加載的隻是記住這些應用到基礎資料集上的轉換動作,隻有當執行Action API時這些轉換才會真正運作。

Transformation API産生的兩類RDD最重要,分别是MapPartitionsRDD和ShuffledRDD。

産生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是map和flatMap,但任何産生MapPartitionsRDD的算子都可以直接使用mapPartitions或mapPartitionsWithIndex實作。

産生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。

combineByKey到groupByKey 底層均是調用combineByKeyWithClassTag方法:

@Experimental
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners
       ,defaultPartitioner(self))
}

def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  val createCombiner = (v: V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}      

三個重要參數的含義:

  • createCombiner:根據每個分區的第一個元素操作産生一個初始值
  • mergeValue:對每個分區内部的元素進行疊代合并
  • mergeCombiners:對所有分區的合并結果進行合并

groupByKey的partitioner未指定時會傳入預設的defaultPartitioner。例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length)
a.groupByKey.collect

res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))      

aggregateByKey:每個分區使用zeroValue作為初始值,疊代每一個元素用seqOp進行合并,對所有分區的結果用combOp進行合并。例如:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))      

reduceByKey :每個分區疊代每一個元素用func進行合并,對所有分區的結果用func再進行合并,例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect      

Action API有:

動作 含義
reduce(func) 通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可并聯的
collect() 在驅動程式中,以數組的形式傳回資料集的所有元素
count() 傳回RDD的元素個數
first() 傳回RDD的第一個元素(類似于take(1))
take(n) 傳回一個由資料集的前n個元素組成的數組
takeSample(withReplacement*,*num, [seed]) 傳回一個數組,該數組由從資料集中随機采樣的num個元素組成,可以選擇是否用随機數替換不足的部分,seed用于指定随機數生成器種子
takeOrdered(n, [ordering]) 排序并取前N個元素
saveAsTextFile(path) 将資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對于每個元素,Spark将會調用toString方法,将它裝換為檔案中的文本
saveAsSequenceFile(path) 将資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。
saveAsObjectFile(path) 将RDD中的元素用NullWritable作為key,實際元素作為value儲存為sequencefile格式
countByKey() 針對(K,V)類型的RDD,傳回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func) 在資料集的每一個元素上,運作函數func進行更新。

spark模拟實作mapreduce版wordcount:

object MapreduceWordcount {
  def main(args: Array[String]): Unit = {
    import org.apache.spark._
    val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
    sc.setLogLevel("WARN")
    
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapred.TextInputFormat
    import org.apache.spark.rdd.HadoopRDD

    import scala.collection.mutable.ArrayBuffer
    
    def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {
      for (word <- v.toString.split("\\s+"))
        collect += ((word, 1))
    }
    def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {
      collect += ((key, value.sum))
    }
    val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((split, it) =>{
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        it.foreach(kv => map(kv._1, kv._2, collect))
        collect.toIterator
      })
      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
      .mapPartitions(it => {
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        var lastKey: String = ""
        var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
        for ((currKey, value) <- it) {
          if (!currKey.equals(lastKey)) {
            if (values.length != 0)
              reduce(lastKey, values.toIterator, collect)
            values.clear()
          }
          values += value
          lastKey = currKey
        }
        if (values.length != 0) reduce(lastKey, values.toIterator, collect)
        collect.toIterator
      })
    rdd.foreach(println)
  }
}      

各類RDD

PySpark與GraphFrames的安裝與使用
  • ShuffledRDD :表示需要走Shuffle過程的網絡傳輸
  • CoalescedRDD :用于将一台機器的多個分區合并成一個分區
  • CartesianRDD :對兩個RDD的所有元素産生笛卡爾積
  • MapPartitionsRDD :用于對每個分區的資料進行特定的處理
  • CoGroupedRDD :用于将2~4個rdd,按照key進行連接配接聚合
  • SubtractedRDD :用于對2個RDD求差集
  • UnionRDD和PartitionerAwareUnionRDD :用于對2個RDD求并集
  • ZippedPartitionsRDD2:zip拉鍊操作産生的RDD
  • ZippedWithIndexRDD:給每一個元素标記一個自增編号
  • PartitionwiseSampledRDD:用于對rdd的元素按照指定的百分比進行随機采樣

當我們需要給Datafream添加自增列時,可以使用zipWithUniqueId方法:

from pyspark.sql.types import StructType, LongType

schema = data.schema.add(StructField("id", LongType()))
rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1]))
data = rowRDD.toDF(schema)
data.show()      

API用法詳情可參考:​​https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD​​

cache&checkpoint

RDD通過persist方法或cache方法可以将前面的計算結果緩存,但是并不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD将會被緩存在計算節點的記憶體中,并供後面重用。

rdd.persist()      

checkpoint的源碼注釋可以看到:

  • 标記該RDD作為檢查點。
  • 它将被儲存在通過SparkContext#setCheckpointDir方法設定的檢查點目錄中
  • 它所引用的所有父RDD引用将全部被移除
  • 這個方法在這個RDD上必須在所有job執行前運作。
  • 強烈建議将這個RDD緩存在記憶體中,否則這個儲存檔案的計算任務将重新計算。

從中我們得知,在執行checkpoint方法時,最好同時,将該RDD緩存起來,否則,checkpoint也會産生一個計算任務。

sc.setCheckpointDir("checkpoint")
rdd.cache()
rdd.checkpoint()      

graphframes 的用法

GraphFrame是将Spark中的Graph算法統一到DataFrame接口的Graph操作接口,為Scala、Java和Python提供了統一的圖處理API。

Graphframes是開源項目,源碼工程如下:​​https://github.com/graphframes/graphframes​​

可以參考:

  • 官網:​​https://graphframes.github.io/graphframes/docs/_site/index.html​​
  • GraphFrames使用者指南-Python — Databricks文檔:​​https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html​​

在GraphFrames中圖的頂點(Vertex)和邊(edge)都是以DataFrame形式存儲的:

  • 頂點DataFrame:必須包含列名為“id”的列,用于作為頂點的唯一辨別
  • 邊DataFrame:必須包含列名為“src”和“dst”的列,根據唯一辨別id辨別關系

建立圖的示例:

from graphframes import GraphFrame

vertices = spark.createDataFrame([
    ("a", "Alice", 34),
    ("b", "Bob", 36),
    ("c", "Charlie", 30),
    ("d", "David", 29),
    ("e", "Esther", 32),
    ("f", "Fanny", 36),
    ("g", "Gabby", 60)], ["id", "name", "age"])

edges = spark.createDataFrame([
    ("a", "b", "friend"),
    ("b", "c", "follow"),
    ("c", "b", "follow"),
    ("f", "c", "follow"),
    ("e", "f", "follow"),
    ("e", "d", "friend"),
    ("d", "a", "friend"),
    ("a", "e", "friend")
], ["src", "dst", "relationship"])

# 生成圖
g = GraphFrame(vertices, edges)      

GraphFrame提供三種視圖:

print("頂點表視圖:")
graph.vertices.show() # graph.vertices 就是原始的vertices
print("邊表視圖:")
graph.edges.show() # graph.edges 就是原始的 edges
print("三元組視圖:")
graph.triplets.show()      

擷取頂點的度、入度和出度:

# 頂點的度
graph.degrees.show()
# 頂點的入度
graph.inDegrees.show()
# 頂點的出度
graph.outDegrees.show()      

Motif finding (模式發現)

示例:

# 多個路徑條件
motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
# 在搜尋的結果上進行過濾
motif.filter("b.age > 30")
# 不需要傳回路徑中的元素時,可以使用匿名頂點和邊
motif = graph.find("(start)-[]->()")
# 設定路徑不存在的條件
motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")      

假設我們要想給使用者推薦關注的人,可以找出這樣的關系:A關注B,B關注C,但是A未關注C。找出這樣的關系就可以把C推薦給A:

# Motif: A->B->C but not A->C
results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")
# 排除自己
results = results.filter("A.id != C.id")
# 選擇需要的列
results = results.select(results.A.id.alias("A"), results.C.id.alias("C"))
results.show()      

結果:

+---+---+
|  A|  C|
+---+---+
|  e|  c|
|  e|  a|
|  d|  b|
|  a|  d|
|  f|  b|
|  d|  e|
|  a|  f|
|  a|  c|
+---+---+      

Motif在查找路徑過程的過程中,還可以沿着路徑攜帶狀态。例如我們想要找出關系鍊有4個頂點,而且其中3條邊全部都是"friend"關系:

from pyspark.sql.functions import col, lit, when
from functools import reduce

chain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")


def sumFriends(cnt, relationship):
    "定義下一個頂點更新狀态的條件:如果關系為friend則cnt+1"
    return when(relationship == "friend", cnt+1).otherwise(cnt)


# 将更新方法應用到整個鍊的,鍊上每有一個關系是 friend 就加一,鍊上共三個關系。
condition = reduce(lambda cnt, e: sumFriends(
    cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))

chainWith2Friends2 = chain4.where(condition >= 3)
chainWith2Friends2.show()      

結果:

+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
|              a|            ab|              b|            bc|              c|            cd|              d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|   {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+      

Subgraphs 子圖

可以直接過濾其頂點或邊,​

​dropIsolatedVertices()​

​方法用于删除孤立沒有連接配接的點:

graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()      

還可以基于模式發現擷取到的邊建立Subgraphs :

paths = graph.find("(a)-[e]->(b)")\
    .filter("e.relationship = 'follow'")\
    .filter("a.age < b.age")
# 抽取邊資訊e2 = paths.select("e.src", "e.dst", "e.relationship")
e2 = paths.select("e.*")
# 建立Subgraphs
g2 = GraphFrame(graph.vertices, e2)      

GraphFrames支援的GraphX算法

  • PageRank:查找圖中的重要頂點。
  • 廣度優先搜尋(BFS):查找從一組頂點到另一組頂點的最短路徑
  • 連通元件(ConnectedComponents):為具備連接配接關系的頂點配置設定相同的元件ID
  • 強連通元件(StronglyConnectedConponents):根據每個頂點的強連通分量配置設定SCC。
  • 最短路徑(Shortest paths):查找從每個頂點到目标頂點集的最短路徑。
  • 三角形計數(TriangleCount):計算每個頂點所屬的三角形的數量,經常用于确定組的穩定性(互相連接配接的數量代表了穩定性)或作為其他網絡度量(如聚類系數)的一部分,在社交網絡分析中用來檢測社群。
  • 标簽傳播算法(LPA):檢測圖中的社群。

pageRank算法:

results = graph.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.sort("pagerank", ascending=False).show()      

結果:

+---+-------+---+-------------------+
| id|   name|age|           pagerank|
+---+-------+---+-------------------+
|  b|    Bob| 36| 2.7025217677349773|
|  c|Charlie| 30| 2.6667877057849627|
|  a|  Alice| 34| 0.4485115093698443|
|  e| Esther| 32| 0.3613490987992571|
|  f|  Fanny| 36|0.32504910549694244|
|  d|  David| 29|0.32504910549694244|
|  g|  Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+      

可以設定起始頂點:

graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")
graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)      

廣度優先搜尋BFS:

搜尋從姓名叫Esther到年齡小于32的最小路徑:

paths = graph.bfs("name = 'Esther'", "age < 32")
paths.show()      
+--------------+--------------+---------------+
|          from|            e0|             to|
+--------------+--------------+---------------+
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+--------------+--------------+---------------+      

可以指定隻能在指定的邊搜尋:

graph.bfs("name = 'Esther'",
          "age < 32",
          edgeFilter="relationship != 'friend'",
          maxPathLength=4
          ).show()      
+---------------+--------------+--------------+--------------+----------------+
|           from|            e0|            v1|            e1|              to|
+---------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+----------------+      

Connected components 連通元件:

必須先設定檢查點:

sc.setCheckpointDir("checkpoint")

graph.connectedComponents().show()      

結果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
|  g|  Gabby| 60|146028888064|
+---+-------+---+------------+      

可以看到僅g點在一個連通區域内,可以調用​

​dropIsolatedVertices()​

​方法,删除這種孤立的沒有連接配接的點:

graph.dropIsolatedVertices().connectedComponents().show()      

結果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
+---+-------+---+------------+      

Strongly connected components 強連通元件:

graph.stronglyConnectedComponents(maxIter=10).show()      

Shortest paths 最短路徑:

每個頂點到a或d的最短路徑:

graph.shortestPaths(landmarks=["a", "d"]).show()      
+---+-------+---+----------------+
| id|   name|age|       distances|
+---+-------+---+----------------+
|  g|  Gabby| 60|              {}|
|  f|  Fanny| 36|              {}|
|  e| Esther| 32|{a -> 2, d -> 1}|
|  d|  David| 29|{a -> 1, d -> 0}|
|  c|Charlie| 30|              {}|
|  b|    Bob| 36|              {}|
|  a|  Alice| 34|{a -> 0, d -> 2}|
+---+-------+---+----------------+      

Triangle count 三角形計數:

graph.triangleCount().show()      
+-----+---+-------+---+
|count| id|   name|age|
+-----+---+-------+---+
|    1|  a|  Alice| 34|
|    0|  b|    Bob| 36|
|    0|  c|Charlie| 30|
|    1|  d|  David| 29|
|    1|  e| Esther| 32|
|    0|  g|  Gabby| 60|
|    0|  f|  Fanny| 36|
+-----+---+-------+---+      

說明頂點a/e/d構成三角形。

标簽傳播算法(LPA):

graph.labelPropagation(maxIter=5).orderBy("label").show()      
+---+-------+---+-------------+
| id|   name|age|        label|
+---+-------+---+-------------+
|  g|  Gabby| 60| 146028888064|
|  f|  Fanny| 36|1047972020224|
|  b|    Bob| 36|1047972020224|
|  a|  Alice| 34|1382979469312|
|  c|Charlie| 30|1382979469312|
|  e| Esther| 32|1460288880640|
|  d|  David| 29|1460288880640|
+---+-------+---+-------------+      

PySpark3.X與pandas融合

Pyspark從3.0版本開始出現了pandas_udf裝飾器、applyInPandas和mapInPandas,基于這些方法,我們就可以使用熟悉的pandas的文法處理spark對象的資料。

首先建立幾條測試資料,并啟動 Apache Arrow:

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
df.show()      

自定義UDF和UDAF

pyspark暫不支援自定義UDTF。

使用pandas_udf裝飾器我們可以建立出基于pandas的udf自定義函數,在DSL的文法中可以被直接使用:

from pyspark.sql.functions import pandas_udf
import pandas as pd


@pandas_udf("double")
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b


df.select(multiply_func("id", "v").alias("product")).show()      

注冊函數和視圖後,可以直接在SQL中使用:

df.createOrReplaceTempView("t")
spark.udf.register("multiply", multiply_func)
spark.sql('select multiply(id, v) product from t').show()      

結果均為:

+-------+
|product|
+-------+
|    1.0|
|    2.0|
|    6.0|
|   10.0|
|   20.0|
+-------+      

還支援聚合函數和視窗函數:

from pyspark.sql import Window


@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()


# 對字段'v'進行求均值
df.select(mean_udf('v').alias("mean_v")).show()
# 按照‘id’分組,求'v'的均值
df.groupby("id").agg(mean_udf('v').alias("mean_v")).show()
# 按照‘id’分組,求'v'的均值,并指派給新的一列
df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()      

注冊到udf之後同樣可以直接使用SQL實作:

spark.udf.register("mean2", mean_udf)
spark.sql('select mean2(v) mean_v from t').show()
spark.sql('select id,mean2(v) mean_v from t group by id').show()
spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()      

結果均為:

+--------+
| mean_v |
+--------+
|     4.2|
+--------+

+---+--------+
| id| mean_v |
+---+--------+
|  1|     1.5|
|  2|     6.0|
+---+--------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+      

分組聚合與JOIN

applyInPandas需要在datafream調用groupby之後才能使用:

def subtract_mean(pdf):
    v = pdf.v
    pdf['v1'] = v - v.mean()
    pdf['v2'] = v + v.mean()
    return pdf


t = df.groupby("id")
t.applyInPandas(
    subtract_mean, schema="id long, v double, v1 double, v2 double").show()      

結果:

+---+----+----+----+
| id|   v|  v1|  v2|
+---+----+----+----+
|  1| 1.0|-0.5| 2.5|
|  1| 2.0| 0.5| 3.5|
|  2| 3.0|-3.0| 9.0|
|  2| 5.0|-1.0|11.0|
|  2|10.0| 4.0|16.0|
+---+----+----+----+      

subtract_mean函數接收的是對應id的dataframe資料,schema指定了傳回值的名稱和類型清單。

通過以下代碼我們可以知道,applyInPandas可以借助cogroup進行表連接配接:

val a = sc.parallelize(List(1, 2, 1, 3))
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
val e = a.map((_, "e"))
scala> b.cogroup(c).foreach(println)
(3,(CompactBuffer(b),CompactBuffer(c)))
(1,(CompactBuffer(b, b),CompactBuffer(c, c)))
(2,(CompactBuffer(b),CompactBuffer(c)))      

示例:

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    # l、r is a pandas.DataFrame
    # 這裡是按照id分組
    # 那麼,l和r分别是對應id的df1和df2資料
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+      

map疊代

執行以下代碼:

def filter_func(iterator):
    for i, pdf in enumerate(iterator):
        print(i, pdf.values.tolist())
        yield pdf


df.mapInPandas(filter_func, schema=df.schema).show()      

背景看到執行結果為:

0 [[2.0, 5.0]]                                                     
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]      

前台結果幾乎保持原樣。可以知道iterator是一個分區疊代器,疊代出目前分區的每一行資料都被封裝成一個pandas對象。

Pyspark與Pandas的互動

将spark的Datafream對象轉換為原生的pandas對象隻需調用toPandas()方法即可:

sdf.toPandas()      

将原生的pandas對象轉換為spark對象可以使用spark的頂級方法:

spark.createDataFrame(pdf)      

習慣使用pandas的童鞋,還可以直接使用pandas-on-Spark,在spark3.2.0版本時已經比對到pandas 1.3版本的API。通過pandas-on-Spark,我們可以完全用pandas的api操作資料,而底層執行卻是spark的并行化。

使用pandas-on-Spark最好設定一下環境變量:

import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"      

将spark對象轉換為pandas-on-Spark對象:

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

pdf = df.to_pandas_on_spark()
print(type(pdf))
pdf      
PySpark與GraphFrames的安裝與使用

pandas-on-Spark對象也可以還原成spark對象:

pdf.to_spark()      

另外spark提供直接将檔案讀取成pandas-on-Spark對象的api,例如:

import pyspark.pandas as ps

pdf = ps.read_csv("example_csv.csv")      

ps對象與原生pandas對象的API幾乎完全一緻。

ps.sql("SELECT count(*) as num FROM {pdf}")      

繼續閱讀