Spark執行模型可以分為三部分:建立邏輯計劃,将其翻譯為實體計劃,在叢集上執行task。
可以在http://<driver-node>:4040上檢視關于Spark Jobs的資訊。對于已經完成的Spark應用,可以在http://<server-url>:18080上檢視資訊。
下面來浏覽一下這三個階段。
邏輯執行計劃第一階段,邏輯執行計劃被建立。這個計劃展示了哪些steps被執行。回顧一下,當對一個Dataset執行一個轉換操作,會有一個新的Dataset被建立。這時,新的Dataset會指向其父Dataset,最終形成一個有向無環圖(DAG)。
實體執行計劃行動操作會觸發邏輯DAG圖向實體執行計劃的轉換。Spark Catalyst query optimizer會為DataFrames建立實體執行計劃,如下圖所示:
實體執行計劃辨別執行計劃的資源,例如記憶體分區和計算任務。
檢視邏輯執行計劃和實體執行計劃
可以調用explain(true)方法檢視邏輯和實體執行計劃。如下例所示:
實體計劃由FileScan、Filter、Project、HashAggregate、Exchange以及HashAggregate組成。
Exchange是由groupBy轉換導緻的shuffle。Spark在每次shuffle之前對Exchange的資料進行hash aggregation。在shuffle後會針對之前的子aggragation進行一次hash aggregation。
在叢集上執行tasks
第三階段,tasks在叢集上被排程執行。scheduler将根據轉換操作将DAG圖劃分為stage。窄依賴轉換操作(沒有資料移動的轉換)将被分組到一個單一的stage中。
以下是關于執行組成的一些總結:
- Task:單台機器上運作的執行單元。
- Stage:基于partitions的一組task,執行并行計算。
- Job:具有一個或多個stages。
- Pipelining:當資料集轉換操作時沒有資料移動時,将Datasets折疊為單一stage。
- DAG:資料集操作時的邏輯視圖。
Tasks的數量取決于partitions:在第一個階段讀取檔案時,有2個partitions;shuffle過後,partitions的數量為200.可以通過rdd.partitions.size方法檢視Dataset的partition數量。
轉自:
Spark任務如何執行?www.jianshu.com
1.整體運作流程
使用下列代碼對SparkSQL流程進行分析,讓大家明白LogicalPlan的幾種狀态,了解SparkSQL整體執行流程
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
(1)檢視teenagers的Schema資訊
scala> teenagers.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
(2)檢視運作流程
scala> teenagers.queryExecution
res3: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias('name),unresolvedalias('age)]
'Filter (('age >= 13) && ('age <= 19))
'UnresolvedRelation [people], None
== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
Filter ((age#1 >= 13) && (age#1 <= 19))
Subquery people
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
Scan PhysicalRDD[name#0,age#1]
Code Generation: true
QueryExecution中表示的是整體Spark SQL運作流程,從上面的輸出結果可以看到,一個SQL語句要執行需要經過下列步驟:
== (1)Parsed Logical Plan ==
'Project [unresolvedalias('name),unresolvedalias('age)]
'Filter (('age >= 13) && ('age <= 19))
'UnresolvedRelation [people], None
== (2)Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
Filter ((age#1 >= 13) && (age#1 <= 19))
Subquery people
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
== (3)Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
== (4)Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
Scan PhysicalRDD[name#0,age#1]
//啟動動态位元組碼生成技術(bytecode generation,CG),提升查詢效率
Code Generation: true
2.全表查詢運作流程
執行語句:
val all= sqlContext.sql("SELECT * FROM people")
運作流程:
scala> all.queryExecution
res9: org.apache.spark.sql.SQLContext#QueryExecution =
//注意*号被解析為unresolvedalias(*)
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
'UnresolvedRelation [people], None
== Analyzed Logical Plan ==
//unresolvedalias(*)被analyzed為Schema中所有的字段
//UnresolvedRelation [people]被analyzed為Subquery people
name: string, age: int
Project [name#0,age#1]
Subquery people
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
== Optimized Logical Plan ==
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
== Physical Plan ==
Scan PhysicalRDD[name#0,age#1]
Code Generation: true
3. filter查詢運作流程
執行語句:
scala> val filterQuery= sqlContext.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
filterQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]
執行流程:
scala> filterQuery.queryExecution
res0: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
'Filter (('age >= 13) && ('age <= 19))
'UnresolvedRelation [people], None
== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
//多出了Filter,後同
Filter ((age#1 >= 13) && (age#1 <= 19))
Subquery people
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20
== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20
== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
Scan PhysicalRDD[name#0,age#1]
Code Generation: true
4. join查詢運作流程
執行語句:
val joinQuery= sqlContext.sql("SELECT * FROM people a, people b where a.age=b.age")
檢視整體執行流程
scala> joinQuery.queryExecution
res0: org.apache.spark.sql.SQLContext#QueryExecution =
//注意Filter
//Join Inner
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
'Filter ('a.age = 'b.age)
'Join Inner, None
'UnresolvedRelation [people], Some(a)
'UnresolvedRelation [people], Some(b)
== Analyzed Logical Plan ==
name: string, age: int, name: string, age: int
Project [name#0,age#1,name#2,age#3]
Filter (age#1 = age#3)
Join Inner, None
Subquery a
Subquery people
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
Subquery b
Subquery people
LogicalRDD [name#2,age#3], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
== Optimized Logical Plan ==
Project [name#0,age#1,name#2,age#3]
Join Inner, Some((age#1 = age#3))
LogicalRDD [name#0,age#1], MapPartitionsRDD[4]...
//檢視其Physical Plan
scala> joinQuery.queryExecution.sparkPlan
res16: org.apache.spark.sql.execution.SparkPlan =
TungstenProject [name#0,age#1,name#2,age#3]
SortMergeJoin [age#1], [age#3]
Scan PhysicalRDD[name#0,age#1]
Scan PhysicalRDD[name#2,age#3]
前面的例子與下面的例子等同,隻不過其運作方式略有不同,執行語句:
scala> val innerQuery= sqlContext.sql("SELECT * FROM people a inner join people b on a.age=b.age")
innerQuery: org.apache.spark.sql.DataFrame = [name: string, age: int, name: string, age: int]
檢視整體執行流程:
scala> innerQuery.queryExecution
res2: org.apache.spark.sql.SQLContext#QueryExecution =
//注意Join Inner
//另外這裡面沒有Filter
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
'Join Inner, Some(('a.age = 'b.age))
'UnresolvedRelation [people], Some(a)
'UnresolvedRelation [people], Some(b)
== Analyzed Logical Plan ==
name: string, age: int, name: string, age: int
Project [name#0,age#1,name#4,age#5]
Join Inner, Some((age#1 = age#5))
Subquery a
Subquery people
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
Subquery b
Subquery people
LogicalRDD [name#4,age#5], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
//注意Optimized Logical Plan與Analyzed Logical Plan
//并沒有進行特别的優化,突出這一點是為了比較後面的子查詢
//其Analyzed和Optimized間的差別
== Optimized Logical Plan ==
Project [name#0,age#1,name#4,age#5]
Join Inner, Some((age#1 = age#5))
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder ...
//檢視其Physical Plan
scala> innerQuery.queryExecution.sparkPlan
res14: org.apache.spark.sql.execution.SparkPlan =
TungstenProject [name#0,age#1,name#6,age#7]
SortMergeJoin [age#1], [age#7]
Scan PhysicalRDD[name#0,age#1]
Scan PhysicalRDD[name#6,age#7]
5. 子查詢運作流程
執行語句:
scala> val subQuery=sqlContext.sql("SELECT * FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19")
subQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]
檢視整體執行流程:
scala> subQuery.queryExecution
res4: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
'Filter ('a.age <= 19)
'Subquery a
'Project [unresolvedalias(*)]
'Filter ('age >= 13)
'UnresolvedRelation [people], None
== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
Filter (age#1 <= 19)
Subquery a
Project [name#0,age#1]
Filter (age#1 >= 13)
Subquery people
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
//這裡需要注意Optimized與Analyzed間的差別
//Filter被進行了優化
== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
Scan PhysicalRDD[name#0,age#1]
Code Generation: true
6. 聚合SQL運作流程
執行語句:
scala> val aggregateQuery=sqlContext.sql("SELECT a.name,sum(a.age) FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19 group by a.name")
aggregateQuery: org.apache.spark.sql.DataFrame = [name: string, _c1: bigint]
運作流程檢視:
scala> aggregateQuery.queryExecution
res6: org.apache.spark.sql.SQLContext#QueryExecution =
//注意'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
//即group by a.name被 parsed為unresolvedalias('a.name)
== Parsed Logical Plan ==
'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
'Filter ('a.age <= 19)
'Subquery a
'Project [unresolvedalias(*)]
'Filter ('age >= 13)
'UnresolvedRelation [people], None
== Analyzed Logical Plan ==
name: string, _c1: bigint
Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
Filter (age#1 <= 19)
Subquery a
Project [name#0,age#1]
Filter (age#1 >= 13)
Subquery people
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
== Optimized Logical Plan ==
Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
Filter ((age#1 >= 13) && (age#1 <= 19))
LogicalRDD [name#0,age#1], MapPartitions...
//檢視其Physical Plan
scala> aggregateQuery.queryExecution.sparkPlan
res10: org.apache.spark.sql.execution.SparkPlan =
TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Final,isDistinct=false)], output=[name#0,_c1#14L])
TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Partial,isDistinct=false)], output=[name#0,currentSum#17L])
Filter ((age#1 >= 13) && (age#1 <= 19))
Scan PhysicalRDD[name#0,age#1]
其它SQL語句,大家可以使用同樣的方法檢視其執行流程,以掌握Spark SQL背後實作的基本思想。
轉自:
Spark修煉之道(進階篇)——Spark入門到精通:第九節 Spark SQL運作流程解析-阿裡雲開發者社群developer.aliyun.com