作者:
王道遠,花名健身,阿裡雲EMR技術專家,Apache Spark活躍貢獻者,主要關注大資料計算優化相關工作。
Hive以及Spark SQL等大資料計算引擎為我們操作存儲在HDFS上結構化資料提供了易于上手的SQL接口,大大降低了ETL等操作的門檻,也是以在實際生産中有着廣泛的應用。SQL是非過程化語言,我們寫SQL的時候并不能控制具體的執行過程,它們依賴執行引擎決定。而Hive和Spark SQL作為Map-Reduce模型的分布式執行引擎,其執行過程首先就涉及到如何将輸入資料切分成一個個任務,配置設定給不同的Map任務。在本文中,我們就來講解Hive和Spark SQL是如何切分輸入路徑的。
Hive
Hive是起步較早的SQL on Hadoop項目,最早也是誕生于Hadoop中,是以輸入劃分這部分的代碼與Hadoop相關度非常高。現在Hive普遍使用的輸入格式是
CombineHiveInputFormat
,它繼承于
HiveInputFormat
,而
HiveInputFormat
實作了Hadoop的
InputFormat
接口,其中的
getSplits
方法用來擷取具體的劃分結果,劃分出的一份輸入資料被稱為一個“Split”。在執行時,每個Split對應到一個map任務。在劃分Split時,首先挑出不能合并到一起的目錄——比如開啟了事務功能的路徑。這些不能合并的目錄必須單獨處理,剩下的路徑交給私有方法
getCombineSplits
,這樣Hive的一個map task最多可以處理多個目錄下的檔案。在實際操作中,我們一般隻要通過
set mapred.max.split.size=xx;
即可控制檔案合并的大小。當一個檔案過大時,父類的
getSplits
也會幫我們完成相應的切分工作。
Spark SQL
Spark的表有兩種:DataSource表和Hive表。另外Spark後續版本中DataSource V2也将逐漸流行,目前還在不斷發展中,暫時就不在這裡讨論。我們知道Spark SQL其實底層是Spark RDD,而RDD執行時,每個map task會處理RDD的一個Partition中的資料(注意這裡的Partition是RDD的概念,要和表的Partition進行區分)。是以,Spark SQL作業的任務切分關鍵在于底層RDD的partition如何切分。
Data Source表
Spark SQL的DataSource表在最終執行的RDD類為
FileScanRDD
,由
FileSourceScanExec
建立出來。在建立這種RDD的時候,具體的Partition直接作為參數傳給了構造函數,是以劃分輸入的方法也在
DataSourceScanExec.scala
檔案中。具體分兩步:首先把檔案劃分為
PartitionFile
,再将較小的
PartitionFile
進行合并。
第一步部分代碼如下:
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(
partition.values, file.getPath.toUri.toString,
offset, size, partitionDeleteDeltas, hosts)
}
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(partition.values, file.getPath.toUri.toString,
0, file.getLen, partitionDeleteDeltas, hosts))
}
我們可以看出,Spark SQL首先根據檔案類型判斷單個檔案是否能夠切割,如果可以則按
maxSplitBytes
進行切割。如果一個檔案剩餘部分無法填滿
maxSplitBytes
,也單獨作為一個Partition。
第二部分代碼如下所示:
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
這樣我們就可以依次周遊第一步切好的塊,再按照
maxSplitBytes
進行合并。注意合并檔案時還需加上打開檔案的預估代價
openCostInBytes
。那麼
maxSplitBytes
和
openCostInBytes
這兩個關鍵參數怎麼來的呢?
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
不難看出,主要是
spark.sql.files.maxPartitionBytes
、
spark.sql.files.openCostInBytes
、排程器預設并發度以及所有輸入檔案實際大小所控制。
Hive表
Spark SQL中的Hive表底層的RDD類為
HadoopRDD
HadoopTableReader
類實作。不過這次,具體的Partition劃分還是依賴
HadoopRDD
的
getPartitions
方法,具體實作如下:
override def getPartitions: Array[Partition] = {
...
try {
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
} catch {
...
}
}
不難看出,在處理Hive表的時候,Spark SQL把任務劃分又交給了Hadoop的InputFormat那一套。不過需要注意的是,并不是所有Hive表都歸為這一類,Spark SQL會預設對ORC和Parquet的表進行轉化,用自己的Data Source實作
OrcFileFormat
ParquetFileFormat
來把這兩種表作為Data Source表來處理。
總結
切分輸入路徑隻是大資料處理的第一步,雖然不起眼,但是也絕對不可或缺。低效的檔案劃分可能會給端到端的執行速度帶來巨大的負面影響,更有可能影響到輸出作業的檔案布局,進而影響到整個資料流水線上所有作業的執行效率。萬事開頭難,為程式輸入選擇合适的配置參數,可以有效改善程式執行效率。
留個思考題給讀者們:如何設定參數徹底關閉Spark SQL data source表的檔案合并?
加入釘群或微信群,微信聯系小編回複你的方案,即可收到社群禮物。
阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。