Apache Spark是大資料處理領域最常用的計算引擎之一,被應用在各種各樣的場景中,除了易用的API,穩定高效的處理引擎,可擴充性也是Spark能夠得到廣泛應用的一個重要原因。Spark中最常見的擴充就是基于DataSource API添加對新資料源的支援,除了Spark内置的HDFS,S3,Kafka等資料源,Parquet,Orc,Avro等資料類型,還有很多第三方的DataSource Plugin使得Spark成為大資料領域可以處理資料源類型最豐富的計算引擎。當然,除了DataSource,Spark還有支援很多其他的擴充點,今天我們要介紹的是Spark SQL Catalyst的擴充點,以及如何通過這些擴充點實作一些有意思的功能,打造自定義的Spark SQL引擎。
在Spark2.2版本中,引入了新的擴充點,使得使用者可以在Spark session中自定義自己的parser,analyzer,optimizer以及physical planning stragegy rule。
【注意】本篇文章需要讀者對Scala語言,SQL引擎基本概念以及Spark Catalyst引擎有基本的了解。
Spark Catalyst擴充點
Spark catalyst的擴充點在SPARK-18127中被引入,Spark使用者可以在SQL處理的各個階段擴充自定義實作,非常強大高效,下面我們具體看看其提供的接口和在Spark中的實作。
SparkSessionExtensions
SparkSessionExtensions儲存了所有使用者自定義的擴充規則,自定義規則儲存在成員變量中,對于不同階段的自定義規則,SparkSessionExtensions提供了不同的接口。
新增自定義規則
使用者可以通過SparkSessionExtensions提供的inject開頭的方法添加新的自定義規則,具體的inject接口如下:
- injectOptimizerRule – 添加optimizer自定義規則,optimizer負責邏輯執行計劃的優化。
- injectParser – 添加parser自定義規則,parser負責SQL解析。
- injectPlannerStrategy – 添加planner strategy自定義規則,planner負責實體執行計劃的生成。
- injectResolutionRule – 添加Analyzer自定義規則到Resolution階段,analyzer負責邏輯執行計劃生成。
- injectPostHocResolutionRule – 添加Analyzer自定義規則到Post Resolution階段。
- injectCheckRule – 添加Analyzer自定義Check規則。
【注】Spark Catalyst的SQL處理分成parser,analyzer,optimizer以及planner等多個步驟,其中analyzer,optimizer等步驟内部也分為多個階段,以Analyzer為例,analyse規則切分到不同的batch中,每個batch的執行政策可能不盡相同,有的隻會執行一遍,有的會疊代執行直到滿足一定條件。具體每個步驟的每個階段的具體實作請參考Spark源碼,本文篇幅有限,不再詳述。
擷取自定義規則
SparkSessionExtensions對應每一種自定義規則也都有一個build開頭的方法用于擷取對應類型的自定義規則,Spark session在初始化的時候,通過這些方法擷取自定義規則并傳遞給parser,analyzer,optimizer以及planner等對象。
-
buildOptimizerRules
-
buildParser
-
buildPlannerStrategies
-
buildResolutionRules
-
buildPostHocResolutionRules
-
buildCheckRules
配置自定義規則
在Spark中,使用者自定義的規則可以通過兩種方式配置生效:
- 使用SparkSession.Builder中的withExtenstion方法,withExtension方法是一個高階函數,接收一個自定義函數作為參數,這個自定義函數以SparkSessionExtensions作為參數,使用者可以實作這個函數,通過SparkSessionExtensions的inject開頭的方法添加使用者自定義規則。
- 通過Spark配置參數,具體參數名為spark.sql.extensions。使用者可以将1中的自定義函數實作定義為一個類,将完整類名作為參數值。
具體的用法使用者可以參考org.apache.spark.sql.SparkSessionExtensionSuite測試用例中的Spark代碼。
擴充Spark Catalyst實作SQL檢查
有許多使用者使用Spark SQL建構資料分析查詢平台,會有許多的業務使用者使用這個平台進行資料分析處理,由于業務使用者SQL開發能力參差不齊,作為平台方很難限制使用者,一個不合理的SQL查詢不僅可能導緻容易出錯,很難維護,可能還會直接搞垮整個平台。通過Spark Catalyst擴充點,平台方可以解析所有使用者的SQL,制定具體的SQL使用規範,對于不合理的SQL請求直接拒絕。本文我們以一個非常簡單的規範為例,展示如何通過自定義規則檢查SQL。SELECT 是一個非常常見的SQL查詢方式,用于擷取表的所有列資料,但是這種SQL的可維護性相對來說會比較差,表可能增加新列或者删除已有列,甚至列的展示順序也可能發生變化,這些都會影響SQL執行的結果以及依賴此查詢的後續查詢,一個規範嚴格的資料平台SQL規範可能不允許這種情況發生。作為本文的示例,我們可以添加一個簡單的檢查,發現SELECT 請求就報錯,不允許執行。
建立一個自定義Parser
通過內建ParserInterface,實作自定義Parser。
class StrictParser(parser: ParserInterface) extends ParserInterface {
/**
* Parse a string to a [[LogicalPlan]].
*/
override def parsePlan(sqlText: String): LogicalPlan = {
val logicalPlan = parser.parsePlan(sqlText)
logicalPlan transform {
case project @ Project(projectList, _) =>
projectList.foreach {
name =>
if (name.isInstanceOf[UnresolvedStar]) {
throw new RuntimeException("You must specify your project column set," +
" * is not allowed.")
}
}
project
}
logicalPlan
}
/**
* Parse a string to an [[Expression]].
*/
override def parseExpression(sqlText: String): Expression = parser.parseExpression(sqlText)
/**
* Parse a string to a [[TableIdentifier]].
*/
override def parseTableIdentifier(sqlText: String): TableIdentifier =
parser.parseTableIdentifier(sqlText)
/**
* Parse a string to a [[FunctionIdentifier]].
*/
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
parser.parseFunctionIdentifier(sqlText)
/**
* Parse a string to a [[StructType]]. The passed SQL string should be a comma separated
* list of field definitions which will preserve the correct Hive metadata.
*/
override def parseTableSchema(sqlText: String): StructType =
parser.parseTableSchema(sqlText)
/**
* Parse a string to a [[DataType]].
*/
override def parseDataType(sqlText: String): DataType = parser.parseDataType(sqlText)
}
建立擴充點函數
type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
type ExtensionsBuilder = SparkSessionExtensions => Unit
val parserBuilder: ParserBuilder = (_, parser) => new StrictParser(parser)
val extBuilder: ExtensionsBuilder = { e => e.injectParser(parserBuilder)}
這裡面有兩個函數,extBuilder函數用于SparkSession建構,SparkSessionExtensions.injectParser函數本身也是一個高階函數,接收parserBuilder作為參數,将原生parser作為參數傳遞給自定義的StrictParser,并将StrictParser作為自定義parser插入SparkSessionExtensions中。
在SparkSession中啟用自定義Parser
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local[2]")
.withExtensions(extBuilder)
.getOrCreate()
測試代碼
測試代碼很簡單,我們首先建構一個簡單的測試集,驗證包含SELECT *的SQL語句是否按照預想抛出異常即可,測試代碼如下:
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local[2]")
.withExtensions(extBuilder)
.getOrCreate()
val df = spark.read.json("examples/src/main/resources/people.json")
df.toDF().write.saveAsTable("person")
spark.sql("select * from person limit 3").show
spark.stop()
執行的結果如下:

擴充Spark Catalyst優化SQL執行
本次測試用例優化規則參考
http://blog.madhukaraphatak.com/introduction-to-spark-two-part-6/,由于Spark版本依賴不同,具體實作代碼略有不同,具體的測試代碼如下:
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local[2]")
.getOrCreate()
val df = spark.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", ";")
.load("examples/src/main/resources/people.csv")
df.toDF.write.saveAsTable("person")
// scalastyle:off
println(spark.sql("select age * 1 from person").queryExecution.optimizedPlan
.numberedTreeString)
// scalastyle:on
spark.stop
在示例代碼中,我們加載了一個csv檔案,然後用age字段乘1,我們可以通過queryExecution.optimizedPlan擷取優化後的執行計劃。LogicalPlan以樹狀結構組織,通過numberedTreeString我們可以得到優化後執行計劃的字元串資訊。
00 Project [(cast(age#26 as double) * 1.0) AS (CAST(age AS DOUBLE) * CAST(1 AS DOUBLE))#28]
01 +- Relation[name#25,age#26,job#27] parquet
執行計劃包括兩個部分:
01 Relation - 标示我們通過csv檔案建立的表。
00 Project - 标示Project投影操作。
由于age字段預設是string類型,可以看到spark預設會cast成double類型,然後乘以1.0.
建立自定義Optimizer規則
從之前的執行計劃可以看到,每個age字段都會乘以1.0,但是實際上我們知道隻并不必要,任何值乘以1都等于本身,我們可以據此添加自定義Optimizer規則省略掉乘以1.0的計算。
case class MultiplyOptimizationRule(spark: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case mul @ Multiply(left, right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>
left
}
}
在SparkSession中啟用自定義Optimizer規則
type ExtensionsBuilder = SparkSessionExtensions => Unit
val extBuilder: ExtensionsBuilder = { e => e.injectOptimizerRule(MultiplyOptimizationRule)}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local[2]")
.withExtensions(extBuilder)
.getOrCreate()
測試用例
使用同樣的測試用例,啟用自定義Optimizer規則,具體代碼如下:
type ExtensionsBuilder = SparkSessionExtensions => Unit
val extBuilder: ExtensionsBuilder = { e => e.injectOptimizerRule(MultiplyOptimizationRule)}
// $example on:init_session$
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local[2]")
.withExtensions(extBuilder)
.getOrCreate()
val df = spark.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", ";")
.load("examples/src/main/resources/people.csv")
df.toDF.write.saveAsTable("person")
// scalastyle:off
println(spark.sql("select age * 1 from person").queryExecution.optimizedPlan
.numberedTreeString)
// scalastyle:on
spark.stop
添加自定義優化規則後,新的執行計劃如下:
00 Project [cast(age#26 as double) AS (CAST(age AS DOUBLE) * CAST(1 AS DOUBLE))#28]
01 +- Relation[name#25,age#26,job#27] parquet
通過對比之前的執行計劃,我們可以看到新的執行計劃沒有了乘以1.0的步驟。
總結
在Spark2.2版本中,引入了新的擴充點,使得使用者可以在Spark session中自定義自己的parser,analyzer,optimizer以及physical planning stragegy rule。通過兩個簡單的示例,我們展示了如何通過Spark提供的擴充點實作SQL檢查以及定制化的執行計劃優化。Spark Catalyst高度的可擴充性使得我們可以非常友善的定制适合自己實際使用場景的SQL引擎,拓展了更多的可能性。我們可以實作特定的SQL方言,針對特殊的資料源做更深入的優化,進行SQL規範檢查,針對特定執行環境制定特定的優化政策等等,本文抛磚引玉,希望能對讀者有所裨益。
歡迎加群指正交流