天天看點

sparkSQL的sqlContext和hHiveContext的彙總(二)

sqlContext總的一個過程如下:

1.SQL語句經過SqlParse解析成UnresolvedLogicalPlan;

2.使用analyzer結合資料資料字典(catalog)進行綁定,生成resolvedLogicalPlan;

3.使用optimizer對resolvedLogicalPlan進行優化,生成optimizedLogicalPlan;

4.使用SparkPlan将LogicalPlan轉換成PhysicalPlan;

5.使用prepareForExecution()将PhysicalPlan轉換成可執行實體計劃;

6.使用execute()執行可執行實體計劃;

7.生成SchemaRDD。

在整個運作過程中涉及到多個SparkSQL的元件,如SqlParse、analyzer、optimizer、SparkPlan等等

eg:

import org.apache.spark.sql._

val sparkConf = new SparkConf().setAppName("new_proj")

implicit val sc = new SparkContext(sparkConf)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val projects = sqlContext.read.json("/part-m-00000.json")

hiveContext總的一個過程如下:

1.SQL語句經過HiveQl.parseSql解析成Unresolved LogicalPlan,在這個解析過程中對hiveql語句使用getAst()擷取AST樹,然後再進行解析;

2.使用analyzer結合資料hive源資料Metastore(新的catalog)進行綁定,生成resolved LogicalPlan;

3.使用optimizer對resolved LogicalPlan進行優化,生成optimized LogicalPlan,優化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))進行預處理;

4.使用hivePlanner将LogicalPlan轉換成PhysicalPlan;

5.使用prepareForExecution()将PhysicalPlan轉換成可執行實體計劃;

6.使用execute()執行可執行實體計劃;

7.執行後,使用map(_.copy)将結果導入SchemaRDD。

eg:

Object hiveContextDemo{

 def main(args:Array[String]):Unit={

 if(args.length==0){

 println("Args:<runLocal><kuduMaster>"+"<kuduAccountMartTableName>")

return

}

val runLocal =args(0).equalsIgnoreCase("1")

val kuduMaster=args(1)

val kuduAccountMartTableName=args(2)

val sc:SparkContext=if(runLocal){

val sparkConfig=new SparkConf()

sparkConfig.set("spark.broadcast.compress","false")

sparkConfig.set("spark.shuffle.compress","false")

sparkConfig.set("spark.shuffle.spill.compress","false")

new SparkContext("local","TableStatsSinglePathMain",sparkConfig)

}else{

val sparkconfig=new SparkConf().setAppName("TableStatsSinglePathMain")

new SparkContext(sparkConfig)

}

val hiveContext=new HiveContext(sc);

val kuduOptions=Map(

"kudu.table"->kuduAccountMasterTableName,

"kudu.master"->kuduMaster)

hiveContext.read.options(kuduOptions).format("org.kududb.spark.kudu").load.registerTempTable("account_mart_tmp")

println("---------------")

val values=hiveContext.sql("select account_id,sum(win_count) from account_mart_tmp group by account_id").take(100)

println("--------")

values.foreach(println)

println("--------")

sc.stop

}

}

}

eg2:

Object Example{

def splitArry(args:Array[String]){

args.map((arg:String)=>println("one arg is:"+arg+"!"))

}

def main(args:Array[String]){

implicit val sc=new SparkContext(new SparkConf().setAppName("Exaple").setMaster("yarn-client"))

implicit val hc=new HiveContext(sc)

val df=hc.parquetFile("/apps/risk/det/madmen20/bre/sorce=live")

df.printSchema()

df.take(1).foreach(println)

splitArray(args)

}

}

備注:下面HiveContext的執行個體來源git上面的demo,便于進一步了解HiveContext。

繼續閱讀