天天看點

淺析Hive/Spark SQL讀檔案時的輸入任務劃分

作者:

王道遠,花名健身,阿裡雲EMR技術專家,Apache Spark活躍貢獻者,主要關注大資料計算優化相關工作。

Hive以及Spark SQL等大資料計算引擎為我們操作存儲在HDFS上結構化資料提供了易于上手的SQL接口,大大降低了ETL等操作的門檻,也是以在實際生産中有着廣泛的應用。SQL是非過程化語言,我們寫SQL的時候并不能控制具體的執行過程,它們依賴執行引擎決定。而Hive和Spark SQL作為Map-Reduce模型的分布式執行引擎,其執行過程首先就涉及到如何将輸入資料切分成一個個任務,配置設定給不同的Map任務。在本文中,我們就來講解Hive和Spark SQL是如何切分輸入路徑的。

Hive

Hive是起步較早的SQL on Hadoop項目,最早也是誕生于Hadoop中,是以輸入劃分這部分的代碼與Hadoop相關度非常高。現在Hive普遍使用的輸入格式是

CombineHiveInputFormat

,它繼承于

HiveInputFormat

,而

HiveInputFormat

實作了Hadoop的

InputFormat

接口,其中的

getSplits

方法用來擷取具體的劃分結果,劃分出的一份輸入資料被稱為一個“Split”。在執行時,每個Split對應到一個map任務。在劃分Split時,首先挑出不能合并到一起的目錄——比如開啟了事務功能的路徑。這些不能合并的目錄必須單獨處理,剩下的路徑交給私有方法

getCombineSplits

,這樣Hive的一個map task最多可以處理多個目錄下的檔案。在實際操作中,我們一般隻要通過

set mapred.max.split.size=xx;

即可控制檔案合并的大小。當一個檔案過大時,父類的

getSplits

也會幫我們完成相應的切分工作。

Spark SQL

Spark的表有兩種:DataSource表和Hive表。另外Spark後續版本中DataSource V2也将逐漸流行,目前還在不斷發展中,暫時就不在這裡讨論。我們知道Spark SQL其實底層是Spark RDD,而RDD執行時,每個map task會處理RDD的一個Partition中的資料(注意這裡的Partition是RDD的概念,要和表的Partition進行區分)。是以,Spark SQL作業的任務切分關鍵在于底層RDD的partition如何切分。

Data Source表

Spark SQL的DataSource表在最終執行的RDD類為

FileScanRDD

,由

FileSourceScanExec

建立出來。在建立這種RDD的時候,具體的Partition直接作為參數傳給了構造函數,是以劃分輸入的方法也在

DataSourceScanExec.scala

檔案中。具體分兩步:首先把檔案劃分為

PartitionFile

,再将較小的

PartitionFile

進行合并。

第一步部分代碼如下:

if (fsRelation.fileFormat.isSplitable(
    fsRelation.sparkSession, fsRelation.options, file.getPath)) {
    (0L until file.getLen by maxSplitBytes).map { offset =>
    val remaining = file.getLen - offset
    val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
    val hosts = getBlockHosts(blockLocations, offset, size)
    PartitionedFile(
      partition.values, file.getPath.toUri.toString,
      offset, size, partitionDeleteDeltas, hosts)
    }
  } else {
    val hosts = getBlockHosts(blockLocations, 0, file.getLen)
    Seq(PartitionedFile(partition.values, file.getPath.toUri.toString,
    0, file.getLen, partitionDeleteDeltas, hosts))
  }           

我們可以看出,Spark SQL首先根據檔案類型判斷單個檔案是否能夠切割,如果可以則按

maxSplitBytes

進行切割。如果一個檔案剩餘部分無法填滿

maxSplitBytes

,也單獨作為一個Partition。

第二部分代碼如下所示:

splitFiles.foreach { file =>
    if (currentSize + file.length > maxSplitBytes) {
      closePartition()
    }
    // Add the given file to the current partition.
    currentSize += file.length + openCostInBytes
    currentFiles += file
  }           

這樣我們就可以依次周遊第一步切好的塊,再按照

maxSplitBytes

進行合并。注意合并檔案時還需加上打開檔案的預估代價

openCostInBytes

。那麼

maxSplitBytes

openCostInBytes

這兩個關鍵參數怎麼來的呢?

val defaultMaxSplitBytes =
    fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
  val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
  val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
  val bytesPerCore = totalBytes / defaultParallelism

  val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))           

不難看出,主要是

spark.sql.files.maxPartitionBytes

spark.sql.files.openCostInBytes

、排程器預設并發度以及所有輸入檔案實際大小所控制。

Hive表

Spark SQL中的Hive表底層的RDD類為

HadoopRDD

HadoopTableReader

類實作。不過這次,具體的Partition劃分還是依賴

HadoopRDD

getPartitions

方法,具體實作如下:

override def getPartitions: Array[Partition] = {
    ...
    try {
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
      ...
    }
  }           

不難看出,在處理Hive表的時候,Spark SQL把任務劃分又交給了Hadoop的InputFormat那一套。不過需要注意的是,并不是所有Hive表都歸為這一類,Spark SQL會預設對ORC和Parquet的表進行轉化,用自己的Data Source實作

OrcFileFormat

ParquetFileFormat

來把這兩種表作為Data Source表來處理。

總結

切分輸入路徑隻是大資料處理的第一步,雖然不起眼,但是也絕對不可或缺。低效的檔案劃分可能會給端到端的執行速度帶來巨大的負面影響,更有可能影響到輸出作業的檔案布局,進而影響到整個資料流水線上所有作業的執行效率。萬事開頭難,為程式輸入選擇合适的配置參數,可以有效改善程式執行效率。

留個思考題給讀者們:如何設定參數徹底關閉Spark SQL data source表的檔案合并?

加入釘群或微信群,微信聯系小編回複你的方案,即可收到社群禮物。

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

淺析Hive/Spark SQL讀檔案時的輸入任務劃分

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

淺析Hive/Spark SQL讀檔案時的輸入任務劃分