天天看點

build 之前執行task_Spark執行模型

Spark執行模型可以分為三部分:建立邏輯計劃,将其翻譯為實體計劃,在叢集上執行task。

可以在http://<driver-node>:4040上檢視關于Spark Jobs的資訊。對于已經完成的Spark應用,可以在http://<server-url>:18080上檢視資訊。

下面來浏覽一下這三個階段。

邏輯執行計劃

第一階段,邏輯執行計劃被建立。這個計劃展示了哪些steps被執行。回顧一下,當對一個Dataset執行一個轉換操作,會有一個新的Dataset被建立。這時,新的Dataset會指向其父Dataset,最終形成一個有向無環圖(DAG)。

實體執行計劃

行動操作會觸發邏輯DAG圖向實體執行計劃的轉換。Spark Catalyst query optimizer會為DataFrames建立實體執行計劃,如下圖所示:

build 之前執行task_Spark執行模型
build 之前執行task_Spark執行模型

實體執行計劃辨別執行計劃的資源,例如記憶體分區和計算任務。

檢視邏輯執行計劃和實體執行計劃

可以調用explain(true)方法檢視邏輯和實體執行計劃。如下例所示:

build 之前執行task_Spark執行模型
build 之前執行task_Spark執行模型

實體計劃由FileScan、Filter、Project、HashAggregate、Exchange以及HashAggregate組成。

Exchange

是由groupBy轉換導緻的shuffle。Spark在每次shuffle之前對Exchange的資料進行hash aggregation。在shuffle後會針對之前的子aggragation進行一次hash aggregation。

build 之前執行task_Spark執行模型
build 之前執行task_Spark執行模型
build 之前執行task_Spark執行模型

在叢集上執行tasks

第三階段,tasks在叢集上被排程執行。scheduler将根據轉換操作将DAG圖劃分為stage。窄依賴轉換操作(沒有資料移動的轉換)将被分組到一個單一的stage中。

build 之前執行task_Spark執行模型

以下是關于執行組成的一些總結:

  • Task:單台機器上運作的執行單元。
  • Stage:基于partitions的一組task,執行并行計算。
  • Job:具有一個或多個stages。
  • Pipelining:當資料集轉換操作時沒有資料移動時,将Datasets折疊為單一stage。
  • DAG:資料集操作時的邏輯視圖。

Tasks的數量取決于partitions:在第一個階段讀取檔案時,有2個partitions;shuffle過後,partitions的數量為200.可以通過rdd.partitions.size方法檢視Dataset的partition數量。

轉自:

Spark任務如何執行?​www.jianshu.com

build 之前執行task_Spark執行模型

1.整體運作流程

使用下列代碼對SparkSQL流程進行分析,讓大家明白LogicalPlan的幾種狀态,了解SparkSQL整體執行流程

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
           

(1)檢視teenagers的Schema資訊

scala> teenagers.printSchema
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
           

(2)檢視運作流程

scala> teenagers.queryExecution
res3: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias('name),unresolvedalias('age)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true
           

QueryExecution中表示的是整體Spark SQL運作流程,從上面的輸出結果可以看到,一個SQL語句要執行需要經過下列步驟:

== (1)Parsed Logical Plan ==
'Project [unresolvedalias('name),unresolvedalias('age)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== (2)Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== (3)Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== (4)Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

//啟動動态位元組碼生成技術(bytecode generation,CG),提升查詢效率
Code Generation: true
           

2.全表查詢運作流程

執行語句:

val all= sqlContext.sql("SELECT * FROM people")
           

運作流程:

scala> all.queryExecution
res9: org.apache.spark.sql.SQLContext#QueryExecution =
//注意*号被解析為unresolvedalias(*)
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
//unresolvedalias(*)被analyzed為Schema中所有的字段
//UnresolvedRelation [people]被analyzed為Subquery people
name: string, age: int
Project [name#0,age#1]
 Subquery people
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Scan PhysicalRDD[name#0,age#1]

Code Generation: true
           

3. filter查詢運作流程

執行語句:

scala> val filterQuery= sqlContext.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
filterQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]
           

執行流程:

scala> filterQuery.queryExecution
res0: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 //多出了Filter,後同
 Filter ((age#1 >= 13) && (age#1 <= 19))
  Subquery people
   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20

== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true
           

4. join查詢運作流程

執行語句:

val joinQuery= sqlContext.sql("SELECT * FROM people a, people b where a.age=b.age")
           

檢視整體執行流程

scala> joinQuery.queryExecution
res0: org.apache.spark.sql.SQLContext#QueryExecution =
//注意Filter
//Join Inner
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter ('a.age = 'b.age)
  'Join Inner, None
   'UnresolvedRelation [people], Some(a)
   'UnresolvedRelation [people], Some(b)

== Analyzed Logical Plan ==
name: string, age: int, name: string, age: int
Project [name#0,age#1,name#2,age#3]
 Filter (age#1 = age#3)
  Join Inner, None
   Subquery a
    Subquery people
     LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
   Subquery b
    Subquery people
     LogicalRDD [name#2,age#3], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Project [name#0,age#1,name#2,age#3]
 Join Inner, Some((age#1 = age#3))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4]...

//檢視其Physical Plan
scala> joinQuery.queryExecution.sparkPlan
res16: org.apache.spark.sql.execution.SparkPlan =
TungstenProject [name#0,age#1,name#2,age#3]
 SortMergeJoin [age#1], [age#3]
  Scan PhysicalRDD[name#0,age#1]
  Scan PhysicalRDD[name#2,age#3]
           

前面的例子與下面的例子等同,隻不過其運作方式略有不同,執行語句:

scala> val innerQuery= sqlContext.sql("SELECT * FROM people a inner join people b on a.age=b.age")
innerQuery: org.apache.spark.sql.DataFrame = [name: string, age: int, name: string, age: int]
           

檢視整體執行流程:

scala> innerQuery.queryExecution
res2: org.apache.spark.sql.SQLContext#QueryExecution =
//注意Join Inner
//另外這裡面沒有Filter
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Join Inner, Some(('a.age = 'b.age))
  'UnresolvedRelation [people], Some(a)
  'UnresolvedRelation [people], Some(b)

== Analyzed Logical Plan ==
name: string, age: int, name: string, age: int
Project [name#0,age#1,name#4,age#5]
 Join Inner, Some((age#1 = age#5))
  Subquery a
   Subquery people
    LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22
  Subquery b
   Subquery people
    LogicalRDD [name#4,age#5], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

//注意Optimized Logical Plan與Analyzed Logical Plan
//并沒有進行特别的優化,突出這一點是為了比較後面的子查詢
//其Analyzed和Optimized間的差別
== Optimized Logical Plan ==
Project [name#0,age#1,name#4,age#5]
 Join Inner, Some((age#1 = age#5))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder ...

//檢視其Physical Plan
scala> innerQuery.queryExecution.sparkPlan
res14: org.apache.spark.sql.execution.SparkPlan =
TungstenProject [name#0,age#1,name#6,age#7]
 SortMergeJoin [age#1], [age#7]
  Scan PhysicalRDD[name#0,age#1]
  Scan PhysicalRDD[name#6,age#7]
           

5. 子查詢運作流程

執行語句:

scala> val subQuery=sqlContext.sql("SELECT * FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19")
subQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]
           

檢視整體執行流程:

scala> subQuery.queryExecution
res4: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
 'Filter ('a.age <= 19)
  'Subquery a
   'Project [unresolvedalias(*)]
    'Filter ('age >= 13)
     'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, age: int
Project [name#0,age#1]
 Filter (age#1 <= 19)
  Subquery a
   Project [name#0,age#1]
    Filter (age#1 >= 13)
     Subquery people
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

//這裡需要注意Optimized與Analyzed間的差別
//Filter被進行了優化
== Optimized Logical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
Filter ((age#1 >= 13) && (age#1 <= 19))
 Scan PhysicalRDD[name#0,age#1]

Code Generation: true
           

6. 聚合SQL運作流程

執行語句:

scala> val aggregateQuery=sqlContext.sql("SELECT a.name,sum(a.age) FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19 group by a.name")
aggregateQuery: org.apache.spark.sql.DataFrame = [name: string, _c1: bigint]
           

運作流程檢視:

scala> aggregateQuery.queryExecution
res6: org.apache.spark.sql.SQLContext#QueryExecution =
//注意'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
//即group by a.name被 parsed為unresolvedalias('a.name)
== Parsed Logical Plan ==
'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]
 'Filter ('a.age <= 19)
  'Subquery a
   'Project [unresolvedalias(*)]
    'Filter ('age >= 13)
     'UnresolvedRelation [people], None

== Analyzed Logical Plan ==
name: string, _c1: bigint
Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
 Filter (age#1 <= 19)
  Subquery a
   Project [name#0,age#1]
    Filter (age#1 >= 13)
     Subquery people
      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  LogicalRDD [name#0,age#1], MapPartitions...

//檢視其Physical Plan
scala> aggregateQuery.queryExecution.sparkPlan
res10: org.apache.spark.sql.execution.SparkPlan =
TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Final,isDistinct=false)], output=[name#0,_c1#14L])
 TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Partial,isDistinct=false)], output=[name#0,currentSum#17L])
  Filter ((age#1 >= 13) && (age#1 <= 19))
   Scan PhysicalRDD[name#0,age#1]
           

其它SQL語句,大家可以使用同樣的方法檢視其執行流程,以掌握Spark SQL背後實作的基本思想。

轉自:

Spark修煉之道(進階篇)——Spark入門到精通:第九節 Spark SQL運作流程解析-阿裡雲開發者社群​developer.aliyun.com

build 之前執行task_Spark執行模型