天天看點

Spark GraphX圖算法應用【分區政策、PageRank、ConnectedComponents,TriangleCount】

一.分區政策

  

Spark GraphX圖算法應用【分區政策、PageRank、ConnectedComponents,TriangleCount】

  GraphX采用頂點分割的方式進行分布式圖分區。GraphX不會沿着邊劃分圖形,而是沿着頂點劃分圖形,這可以減少通信和存儲的開銷。從邏輯上講,這對應于為機器配置設定邊并允許頂點跨越多台機器。配置設定邊的方法取決于分區政策PartitionStrategy并且對各種啟發式方法進行了一些折中。使用者可以使用Graph.partitionBy運算符重新劃分圖【可以使用不同分區政策】。預設的分區政策是使用圖形構造中提供的邊的初始分區。但是,使用者可以輕松切換到GraphX中包含的2D分區或其他啟發式方法。

Spark GraphX圖算法應用【分區政策、PageRank、ConnectedComponents,TriangleCount】

  一旦對邊進行了劃分,高效圖并行計算的關鍵挑戰就是将頂點屬性和邊有效結合。由于現實世界中的圖通常具有比頂點更多的邊,是以我們将頂點屬性移到邊上。由于并非所有分區都包含與所有頂點相鄰的邊,是以我們在内部維護一個路由表,該路由表在實作諸如triplets操作所需要的連接配接時,标示在哪裡廣播頂點aggregateMessages。

二.測試資料

  1.users.txt

    

Spark GraphX圖算法應用【分區政策、PageRank、ConnectedComponents,TriangleCount】

  2.followers.txt

Spark GraphX圖算法應用【分區政策、PageRank、ConnectedComponents,TriangleCount】

  3.資料可視化

Spark GraphX圖算法應用【分區政策、PageRank、ConnectedComponents,TriangleCount】

三.PageRank網頁排名

  1.簡介

    使用PageRank測量圖中每個頂點的重要性,假設從邊u到v表示的認可度x。例如,如果一個Twitter使用者被許多其他使用者關注,則該使用者将獲得很高的排名。GraphX帶有PageRank的靜态和動态實作,作為PageRank對象上的方法。靜态PageRant運作固定的疊代次數,而動态PageRank運作直到排名收斂【變化小于指定的門檻值】。GraphOps運作直接方法調用這些算法。

  2.代碼實作

1 package graphx
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.graphx.GraphLoader
 5 import org.apache.spark.sql.SparkSession
 6 
 7 /**
 8   * Created by Administrator on 2019/11/27.
 9   */
10 object PageRank {
11   Logger.getLogger("org").setLevel(Level.WARN)
12   def main(args: Array[String]) {
13     val spark = SparkSession.builder()
14         .master("local[2]")
15         .appName(s"${this.getClass.getSimpleName}")
16         .getOrCreate()
17       val sc = spark.sparkContext
18     val graph = GraphLoader.edgeListFile(sc, "D:\\software\\spark-2.4.4\\data\\graphx\\followers.txt")
19     // 調用PageRank圖計算算法
20     val ranks = graph.pageRank(0.0001).vertices
21     // join
22     val users = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\users.txt").map(line => {
23       val fields = line.split(",")
24       (fields(0).toLong, fields(1))
25     })
26     // join
27     val ranksByUsername = users.join(ranks).map{
28       case (id, (username, rank)) => (username, rank)
29     }
30     // print
31     ranksByUsername.foreach(println)
32   }
33 }      

  3.執行結果

Spark GraphX圖算法應用【分區政策、PageRank、ConnectedComponents,TriangleCount】

四.ConnectedComponents連通體算法

    連通體算法實作把圖劃分為多個子圖【不進行節點切分】,清除孤島子圖【隻要一個節點的子圖】。其使用子圖中編号最小的頂點ID标記子圖。

1 package graphx
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.graphx.GraphLoader
 5 import org.apache.spark.sql.SparkSession
 6 
 7 /**
 8   * Created by Administrator on 2019/11/27.
 9   */
10 object ConnectedComponents {
11   Logger.getLogger("org").setLevel(Level.WARN)
12   def main(args: Array[String]) {
13     val spark = SparkSession.builder()
14       .master("local[2]")
15       .appName(s"${this.getClass.getSimpleName}")
16       .getOrCreate()
17     val sc = spark.sparkContext
18     val graph = GraphLoader.edgeListFile(sc, "D:\\software\\spark-2.4.4\\data\\graphx\\followers.txt")
19     // 調用connectedComponents連通體算法
20     val cc = graph.connectedComponents().vertices
21     // join
22     val users = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\users.txt").map(line => {
23       val fields = line.split(",")
24       (fields(0).toLong, fields(1))
25     })
26     // join
27     val ranksByUsername = users.join(cc).map {
28       case (id, (username, rank)) => (username, rank)
29     }
30     val count = ranksByUsername.count().toInt
31     // print
32     ranksByUsername.map(_.swap).takeOrdered(count).foreach(println)
33   }
34 }      
Spark GraphX圖算法應用【分區政策、PageRank、ConnectedComponents,TriangleCount】

五.TriangleCount三角計數算法

  1.簡介  

    當頂點有兩個相鄰的頂點且它們之間存在邊時,該頂點是三角形的一部分。GraphX在TriangleCount對象中實作三角計數算法,該算法通過确定經過每個頂點的三角形的數量,進而提供聚類的度量。注意,TriangleCount要求邊定義必須為規範方向【srcId < dstId】,并且必須使用Graph.partitionBy對圖進行分區。

1 package graphx
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.graphx.{PartitionStrategy, GraphLoader}
 5 import org.apache.spark.sql.SparkSession
 6 
 7 /**
 8   * Created by Administrator on 2019/11/27.
 9   */
10 object TriangleCount {
11   Logger.getLogger("org").setLevel(Level.WARN)
12   def main(args: Array[String]) {
13     val spark = SparkSession.builder()
14       .master("local[2]")
15       .appName(s"${this.getClass.getSimpleName}")
16       .getOrCreate()
17     val sc = spark.sparkContext
18     val graph = GraphLoader.edgeListFile(sc, "D:\\software\\spark-2.4.4\\data\\graphx\\followers.txt", true)
19       .partitionBy(PartitionStrategy.RandomVertexCut)
20 
21     // 調用triangleCount三角計數算法
22     val triCounts = graph.triangleCount().vertices
23     // map
24     val users = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\users.txt").map(line => {
25       val fields = line.split(",")
26       (fields(0).toLong, fields(1))
27     })
28     // join
29     val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
30       (username, tc)
31     }
32     val count = triCountByUsername.count().toInt
33     // print
34     triCountByUsername.map(_.swap).takeOrdered(count).foreach(println)
35   }
36 }      
Spark GraphX圖算法應用【分區政策、PageRank、ConnectedComponents,TriangleCount】

繼續閱讀