l 主機作業系統:Windows 64位,雙核4線程,主頻2.2G,10G記憶體
l 虛拟軟體:VMware® Workstation 9.0.0 build-812388
l 虛拟機作業系統:CentOS 64位,單核
l 虛拟機運作環境:
Ø JDK:1.7.0_55 64位
Ø Hadoop:2.2.0(需要編譯為64位)
Ø Scala:2.10.4
Ø Spark:1.1.0(需要編譯)
Ø Hive:0.13.1
叢集包含三個節點,節點之間可以免密碼SSH通路,節點IP位址和主機名分布如下:
序号
IP位址
機器名
類型
核數/記憶體
使用者名
目錄
1
192.168.0.61
hadoop1
NN/DN/RM
Master/Worker
1核/3G
hadoop
/app 程式所在路徑
/app/scala-...
/app/hadoop
/app/complied
2
192.168.0.62
hadoop2
DN/NM/Worker
1核/2G
3
192.168.0.63
hadoop3
SparkSQL引入了一種新的RDD——SchemaRDD,SchemaRDD由行對象(Row)以及描述行對象中每列資料類型的Schema組成;SchemaRDD很象傳統資料庫中的表。SchemaRDD可以通過RDD、Parquet檔案、JSON檔案、或者通過使用hiveql查詢hive資料來建立。SchemaRDD除了可以和RDD一樣操作外,還可以通過registerTempTable注冊成臨時表,然後通過SQL語句進行操作。
值得注意的是:
lSpark1.1使用registerTempTable代替1.0版本的registerAsTable
lSpark1.1在hiveContext中,hql()将被棄用,sql()将代替hql()來送出查詢語句,統一了接口。
l使用registerTempTable系統資料庫是一個臨時表,生命周期隻在所定義的sqlContext或hiveContext執行個體之中。換而言之,在一個sqlontext(或hiveContext)中registerTempTable的表不能在另一個sqlContext(或hiveContext)中使用。
另外,Spark1.1提供了文法解析器選項spark.sql.dialect,就目前而言,Spark1.1提供了兩種文法解析器:sql文法解析器和hiveql文法解析器。
lsqlContext現在隻支援sql文法解析器(SQL-92文法)
lhiveContext現在支援sql文法解析器和hivesql文法解析器,預設為hivesql文法解析器,使用者可以通過配置切換成sql文法解析器,來運作hiveql不支援的文法,如select 1。
l切換可以通過下列方式完成:
l在sqlContexet中使用setconf配置spark.sql.dialect
l在hiveContexet中使用setconf配置spark.sql.dialect
l在sql指令中使用 set spark.sql.dialect=value
SparkSQL1.1對資料的查詢分成了2個分支:sqlContext 和 hiveContext。至于兩者之間的關系,hiveSQL繼承了sqlContext,是以擁有sqlontext的特性之外,還擁有自身的特性(最大的特性就是支援hive)。
使用如下指令打開/etc/profile檔案:
sudo vi /etc/profile

設定如下參數:
export SPARK_HOME=/app/hadoop/spark-1.1.0
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export HIVE_HOME=/app/hadoop/hive-0.13.1
export PATH=$PATH:$HIVE_HOME/bin
export CLASSPATH=$CLASSPATH:$HIVE_HOME/bin
$cd /app/hadoop/hadoop-2.2.0/sbin
$./start-dfs.sh
$cd /app/hadoop/spark-1.1.0/sbin
$./start-all.sh
在spark用戶端(在hadoop1節點),使用spark-shell連接配接叢集
$cd /app/hadoop/spark-1.1.0/bin
$./spark-shell --master spark://hadoop1:7077 --executor-memory 1g
啟動後檢視啟動情況,如下圖所示:
Spark1.1.0開始提供了兩種方式将RDD轉換成SchemaRDD:
l通過定義Case Class,使用反射推斷Schema(case class方式)
l通過可程式設計接口,定義Schema,并應用到RDD上(applySchema 方式)
前者使用簡單、代碼簡潔,适用于已知Schema的源資料上;後者使用較為複雜,但可以在程式運作過程中實行,适用于未知Schema的RDD上。
對于Case Class方式,首先要定義Case Class,在RDD的Transform過程中使用Case Class可以隐式轉化成SchemaRDD,然後再使用registerTempTable注冊成表。注冊成表後就可以在sqlContext對表進行操作,如select 、insert、join等。注意,case class可以是嵌套的,也可以使用類似Sequences 或 Arrays之類複雜的資料類型。
下面的例子是定義一個符合資料檔案/sparksql/people.txt類型的case clase(Person),然後将資料檔案讀入後隐式轉換成SchemaRDD:people,并将people在sqlContext中注冊成表rddTable,最後對表進行查詢,找出年紀在13-19歲之間的人名。
第一步 上傳測試資料
在HDFS中建立/class6目錄,把配套資源/data/class5/people.txt上傳到該目錄上
$hadoop fs -mkdir /class6
$hadoop fs -copyFromLocal /home/hadoop/upload/class6/people.* /class6
$hadoop fs -ls /
第二步 定義sqlContext并引入包
//sqlContext示範
scala>val sqlContext=new org.apache.spark.sql.SQLContext(sc)
scala>import sqlContext.createSchemaRDD
第三步 定義Person類,讀入資料并注冊為臨時表
//RDD1示範
scala>case class Person(name:String,age:Int)
scala>val rddpeople=sc.textFile("hdfs://hadoop1:9000/class6/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))
scala>rddpeople.registerTempTable("rddTable")
第四步 在查詢年紀在13-19歲之間的人員
scala>sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
上面步驟均為trnsform未觸發action動作,在該步驟中查詢資料并列印觸發了action動作,如下圖所示:
通過監控頁面,檢視任務運作情況:
applySchema 方式比較複雜,通常有3步過程:
l從源RDD建立rowRDD
l建立與rowRDD比對的Schema
l将Schema通過applySchema應用到rowRDD
第一步 導入包建立Schema
//導入SparkSQL的資料類型和Row
scala>import org.apache.spark.sql._
//建立于資料結構比對的schema
scala>val schemaString = "name age"
scala>val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
第二步 建立rowRDD并讀入資料
//建立rowRDD
scala>val rowRDD = sc.textFile("hdfs://hadoop1:9000/class6/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim))
//用applySchema将schema應用到rowRDD
scala>val rddpeople2 = sqlContext.applySchema(rowRDD, schema)
scala>rddpeople2.registerTempTable("rddTable2")
第三步 查詢擷取資料
scala>sqlContext.sql("SELECT name FROM rddTable2 WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
同樣得,sqlContext可以讀取parquet檔案,由于parquet檔案中保留了schema的資訊,是以不需要使用case class來隐式轉換。sqlContext讀入parquet檔案後直接轉換成SchemaRDD,也可以将SchemaRDD儲存成parquet檔案格式。
第一步 儲存成parquest格式檔案
// 把上面步驟中的rddpeople儲存為parquet格式檔案到hdfs中
scala>rddpeople.saveAsParquetFile("hdfs://hadoop1:9000/class6/people.parquet")
第二步 讀入parquest格式檔案,系統資料庫parquetTable
//parquet示範
scala>val parquetpeople = sqlContext.parquetFile("hdfs://hadoop1:9000/class6/people.parquet")
scala>parquetpeople.registerTempTable("parquetTable")
第三步 查詢年齡大于等于25歲的人名
scala>sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)
sparkSQL1.1.0開始提供對json檔案格式的支援,這意味着開發者可以使用更多的資料源,如鼎鼎大名的NOSQL資料庫MongDB等。sqlContext可以從jsonFile或jsonRDD擷取schema資訊,來建構SchemaRDD,注冊成表後就可以使用。
ljsonFile - 加載JSON檔案目錄中的資料,檔案的每一行是一個JSON對象
ljsonRdd - 從現有的RDD加載資料,其中RDD的每個元素包含一個JSON對象的字元串
第二步 讀取資料并注冊jsonTable表
//json示範
scala>val jsonpeople = sqlContext.jsonFile("hdfs://hadoop1:9000/class6/people.json")
jsonpeople.registerTempTable("jsonTable")
第三步 查詢年齡大于等于25的人名
scala>sqlContext.sql("SELECT name FROM jsonTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)
在sqlContext或hiveContext中來源于不同資料源的表在各自生命周期中可以混用,即sqlContext與hiveContext之間表不能混合使用
//sqlContext中來自rdd的表rddTable和來自parquet檔案的表parquetTable混合使用
scala>sqlContext.sql("select a.name,a.age,b.age from rddTable a join parquetTable b on a.name=b.name").collect().foreach(println)
使用hiveContext之前首先要确認以下兩點:
l使用的Spark是支援hive
lHive的配置檔案hive-site.xml已經存在conf目錄中
前者可以檢視lib目錄下是否存在以datanucleus開頭的3個JAR來确定,後者注意是否在hive-site.xml裡配置了uris來通路Hive Metastore。
在hadoop1節點中使用如下指令啟動Hive
$nohup hive --service metastore > metastore.log 2>&1 &
在SPARK_HOME/conf目錄下建立hive-site.xml檔案,修改配置後需要重新啟動Spark-Shell
【注】如果在第6課《SparkSQL(二)--SparkSQL簡介》配置,
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop1:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
</configuration>
要使用hiveContext,需要先建構hiveContext:
scala>val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
然後就可以對Hive資料進行操作了,下面我們将使用Hive中的銷售資料,首先切換資料庫到hive并檢視有幾個表:
//銷售資料示範
scala>hiveContext.sql("use hive")
scala>hiveContext.sql("show tables").collect().foreach(println)
//所有訂單中每年的銷售單數、銷售總額
//三個表連接配接後以count(distinct a.ordernumber)計銷售單數,sum(b.amount)計銷售總額
scala>hiveContext.sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear").collect().foreach(println)
結果如下:
[2004,1094,3265696]
[2005,3828,13247234]
[2006,3772,13670416]
[2007,4885,16711974]
[2008,4861,14670698]
[2009,2619,6322137]
[2010,94,210924]
第一步 實作分析
所有訂單每年最大金額訂單的銷售額:
1、先求出每份訂單的銷售額以其發生時間
select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber
2、以第一步的查詢作為子表,和表tbDate 連接配接,求出每年最大金額訂單的銷售額
select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear
第二步 實作SQL語句
scala>hiveContext.sql("select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear").collect().foreach(println)
[2010,13063]
[2004,23612]
[2005,38180]
[2006,36124]
[2007,159126]
[2008,55828]
[2009,25810]
第三步 監控任務運作情況
所有訂單中每年最暢銷貨品:
1、求出每年每個貨品的銷售金額
scala>select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid
2、求出每年單品銷售的最大金額
scala>select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear
3、求出每年與銷售額最大相符的貨品就是最暢銷貨品
scala>select distinct e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear
scala>hiveContext.sql("select distinct e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear").collect().foreach(println)
[2004,JY424420810101,53374]
[2005,24124118880102,56569]
[2006,JY425468460101,113684]
[2007,JY425468460101,70226]
[2008,E2628204040101,97981]
[2009,YL327439080102,30029]
[2010,SQ429425090101,4494]
第一步 建立hiveTable從本地檔案系統加載資料
//建立一個hiveTable并将資料加載,注意people.txt第二列有空格,是以age取string類型
scala>hiveContext.sql("CREATE TABLE hiveTable(name string,age string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ")
scala>hiveContext.sql("LOAD DATA LOCAL INPATH '/home/hadoop/upload/class6/people.txt' INTO TABLE hiveTable")
第二步 建立parquet表,從HDFS加載資料
//建立一個源自parquet檔案的表parquetTable2,然後和hiveTable混合使用
scala>hiveContext.parquetFile("hdfs://hadoop1:9000/class6/people.parquet").registerTempTable("parquetTable2")
第三步 兩個表混合使用
scala>hiveContext.sql("select a.name,a.age,b.age from hiveTable a join parquetTable2 b on a.name=b.name").collect().foreach(println)
sparkSQL的cache可以使用兩種方法來實作:
lCacheTable()方法
lCACHE TABLE指令
千萬不要先使用cache SchemaRDD,然後registerAsTable;使用RDD的cache()将使用原生态的cache,而不是針對SQL優化後的記憶體列存儲。
第一步 對rddTable表進行緩存
//cache使用
scala>sqlContext.cacheTable("rddTable")
在監控界面上看到該表資料已經緩存
第二步 對parquetTable表進行緩存
scala>sqlContext.sql("CACHE TABLE parquetTable")
scala>sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
第三步 解除緩存
//uncache使用
scala>sqlContext.uncacheTable("rddTable")
scala>sqlContext.sql("UNCACHE TABLE parquetTable")
SparkSQL除了支援HiveQL和SQL-92文法外,還支援DSL(Domain Specific Language)。在DSL中,使用Scala符号'+标示符表示基礎表中的列,Spark的execution engine會将這些标示符隐式轉換成表達式。另外可以在API中找到很多DSL相關的方法,如where()、select()、limit()等等,詳細資料可以檢視Catalyst子產品中的DSL子子產品,下面為其中定義幾種常用方法:
//DSL示範
scala>import sqlContext._
scala>val teenagers_dsl = rddpeople.where('age >= 10).where('age <= 19).select('name)
scala>teenagers_dsl.map(t => "Name: " + t(0)).collect().foreach(println)
Spark之是以萬人矚目,除了記憶體計算還有其ALL-IN-ONE的特性,實作了One stack rule them all。下面簡單模拟了幾個綜合應用場景,不僅使用了sparkSQL,還使用了其他Spark元件:
lSQL On Spark:使用sqlContext查詢年紀大于等于10歲的人名
lHive On Spark:使用了hiveContext計算每年銷售額
l店鋪分類,根據銷售額對店鋪分類,使用sparkSQL和MLLib聚類算法
lPageRank,計算最有價值的網頁,使用sparkSQL和GraphX的PageRank算法
以下實驗采用IntelliJ IDEA調試代碼,最後生成LearnSpark.jar,然後使用spark-submit送出給叢集運作。
在src->main->scala下建立class6包,在該包中添加SQLOnSpark對象檔案,具體代碼如下:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
case class Person(name: String, age: Int)
object SQLOnSpark {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SQLOnSpark")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext._
val people: RDD[Person] = sc.textFile("hdfs://hadoop1:9000/class6/people.txt")
.map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 10 and age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
sc.stop()
}
}
先對該代碼進行編譯,然後運作該程式,需要注意的是在IDEA中需要在SparkConf添加setMaster("local")設定為本地運作。運作時可以通過運作視窗進行觀察:
列印運作結果
【注】可以參見第3課《Spark程式設計模型(下)--IDEA搭建及實戰》進行打包
第一步 配置打包資訊
在項目結構界面中選擇"Artifacts",在右邊操作界面選擇綠色"+"号,選擇添加JAR包的"From modules with dependencies"方式,出現如下界面,在該界面中選擇主函數入口為SQLOnSpark:
第二步 填寫該JAR包名稱和調整輸出内容
打包路徑為/home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar
【注意】的是預設情況下"Output Layout"會附帶Scala相關的類包,由于運作環境已經有Scala相關類包,是以在這裡去除這些包隻保留項目的輸出内容
第三步 輸出打封包件
點選菜單Build->Build Artifacts,彈出選擇動作,選擇Build或者Rebuild動作
第四步 複制打封包件到Spark根目錄下
cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar
cp LearnSpark.jar /app/hadoop/spark-1.1.0/
ll /app/hadoop/spark-1.1.0/
通過如下指令調用打包中的SQLOnSpark方法,運作結果如下:
cd /app/hadoop/spark-1.1.0
bin/spark-submit --master spark://hadoop1:7077 --class class6.SQLOnSpark --executor-memory 1g LearnSpark.jar
在class6包中添加HiveOnSpark對象檔案,具體代碼如下:
import org.apache.spark.sql.hive.HiveContext
object HiveOnSpark {
case class Record(key: Int, value: String)
val sparkConf = new SparkConf().setAppName("HiveOnSpark")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
import hiveContext._
sql("use hive")
sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear")
.collect().foreach(println)
按照3.1.3SQL On Spark方法進行打包
【注】需要啟動Hive服務,參見2.3.1
bin/spark-submit --master spark://hadoop1:7077 --class class6.HiveOnSpark --executor-memory 1g LearnSpark.jar
通過監控頁面看到名為HiveOnSpark的作業運作情況:
分類在實際應用中非常普遍,比如對客戶進行分類、對店鋪進行分類等等,對不同類别采取不同的政策,可以有效的降低企業的營運成本、增加收入。機器學習中的聚類就是一種根據不同的特征資料,結合使用者指定的類别數量,将資料分成幾個類的方法。下面舉個簡單的例子,按照銷售數量和銷售金額這兩個特征資料,進行聚類,分出3個等級的店鋪。
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
object SQLMLlib {
//屏蔽不必要的日志顯示在終端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//設定運作環境
val sparkConf = new SparkConf().setAppName("SQLMLlib")
//使用sparksql查出每個店的銷售數量和金額
hiveContext.sql("use hive")
hiveContext.sql("SET spark.sql.shuffle.partitions=20")
val sqldata = hiveContext.sql("select a.locationid, sum(b.qty) totalqty,sum(b.amount) totalamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.locationid")
//将查詢資料轉換成向量
val parsedData = sqldata.map {
case Row(_, totalqty, totalamount) =>
val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)
Vectors.dense(features)
}
//對資料集聚類,3個類,20次疊代,形成資料模型
//注意這裡會使用設定的partition數20
val numClusters = 3
val numIterations = 20
val model = KMeans.train(parsedData, numClusters, numIterations)
//用模型對讀入的資料進行分類,并輸出
//由于partition沒設定,輸出為200個小檔案,可以使用bin/hdfs dfs -getmerge 合并下載下傳到本地
val result2 = sqldata.map {
case Row(locationid, totalqty, totalamount) =>
val linevectore = Vectors.dense(features)
val prediction = model.predict(linevectore)
locationid + " " + totalqty + " " + totalamount + " " + prediction
}.saveAsTextFile(args(0))
通過如下指令調用打包中的SQLOnSpark方法:
bin/spark-submit --master spark://hadoop1:7077 --class class6.SQLMLlib --executor-memory 1g LearnSpark.jar /class6/output1
運作過程,可以發現聚類過程都是使用20個partition:
檢視運作結果,分為20個檔案存放在HDFS中
使用getmerge将結果轉到本地檔案,并檢視結果:
cd /home/hadoop/upload
hdfs dfs -getmerge /class6/output1 result.txt
最後使用R做示意圖,用3種不同的顔色表示不同的類别。
PageRank,即網頁排名,又稱網頁級别、Google左側排名或佩奇排名,是Google創始人拉裡·佩奇和謝爾蓋·布林于1997年建構早期的搜尋系統原型時提出的連結分析算法。目前很多重要的連結分析算法都是在PageRank算法基礎上衍生出來的。PageRank是Google用于用來辨別網頁的等級/重要性的一種方法,是Google用來衡量一個網站的好壞的唯一标準。在揉合了諸如Title辨別和Keywords辨別等所有其它因素之後,Google通過PageRank來調整結果,使那些更具“等級/重要性”的網頁在搜尋結果中令網站排名獲得提升,進而提高搜尋結果的相關性和品質。
Spark GraphX引入了google公司的圖處理引擎pregel,可以友善的實作PageRank的計算。
下面執行個體采用的資料是wiki資料中含有Berkeley标題的網頁之間連接配接關系,資料為兩個檔案:graphx-wiki-vertices.txt和graphx-wiki-edges.txt ,可以分别用于圖計算的頂點和邊。把這兩個檔案上傳到本地檔案系統/home/hadoop/upload/class6目錄中(注:這兩個檔案可以從該系列附屬資源/data/class6中擷取)
第一步 上傳資料
第二步 啟動SparkSQL
參見第6課《SparkSQL(一)--SparkSQL簡介》3.2.3啟動SparkSQL
$cd /app/hadoop/spark-1.1.0
$bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g
第三步 定義表并加載資料
建立vertices和edges兩個表并加載資料:
spark-sql>show databases;
spark-sql>use hive;
spark-sql>CREATE TABLE vertices(ID BigInt,Title String) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'; LOAD DATA LOCAL INPATH '/home/hadoop/upload/class6/graphx-wiki-vertices.txt' INTO TABLE vertices;
spark-sql>CREATE TABLE edges(SRCID BigInt,DISTID BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'; LOAD DATA LOCAL INPATH '/home/hadoop/upload/class6/graphx-wiki-edges.txt' INTO TABLE edges;
檢視建立結果
spark-sql>show tables;
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx._
object SQLGraphX {
//屏蔽日志
val sparkConf = new SparkConf().setAppName("PageRank")
val verticesdata = hiveContext.sql("select id, title from vertices")
val edgesdata = hiveContext.sql("select srcid,distid from edges")
//裝載頂點和邊
val vertices = verticesdata.map { case Row(id, title) => (id.toString.toLong, title.toString)}
val edges = edgesdata.map { case Row(srcid, distid) => Edge(srcid.toString.toLong, distid.toString.toLong, 0)}
//建構圖
val graph = Graph(vertices, edges, "").persist()
//pageRank算法裡面的時候使用了cache(),故前面persist的時候隻能使用MEMORY_ONLY
println("**********************************************************")
println("PageRank計算,擷取最有價值的資料")
val prGraph = graph.pageRank(0.001).cache()
val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {
(v, title, rank) => (rank.getOrElse(0.0), title)
titleAndPrGraph.vertices.top(10) {
Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
}.foreach(t => println(t._2._2 + ": " + t._2._1))
bin/spark-submit --master spark://hadoop1:7077 --class class6.SQLGraphX --executor-memory 1g LearnSpark.jar
運作結果:
在現實資料處理過程中,這種涉及多個系統處理的場景很多。通常各個系統之間的資料通過磁盤落地再交給下一個處理系統進行處理。對于Spark來說,通過多個元件的配合,可以以流水線的方式來處理資料。從上面的代碼可以看出,程式除了最後有磁盤落地外,都是在記憶體中計算的。避免了多個系統中互動資料的落地過程,提高了效率。這才是spark生态系統真正強大之處:One stack rule them all。另外sparkSQL+sparkStreaming可以架構目前非常熱門的Lambda架構體系,為CEP提供解決方案。也正是如此強大,才吸引了廣大開源愛好者的目光,促進了Spark生态的高速發展。
作者:石山園 出處:http://www.cnblogs.com/shishanyuan/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。如果覺得還有幫助的話,可以點一下右下角的【推薦】,希望能夠持續的為大家帶來好的技術文章!想跟我一起進步麼?那就【關注】我吧。