天天看點

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

摘要:使用Spark SQL進行ETL任務,在讀取某張表的時候報錯:“IOException: totalValueCount == 0”,但該表在寫入時,并沒有什麼異常。

本文分享自華為雲社群《你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅》,原文作者:wzhfy 。

1. 問題描述

使用Spark SQL進行ETL任務,在讀取某張表的時候報錯:“IOException: totalValueCount == 0”,但該表在寫入時,并沒有什麼異常。

2. 初步分析

該表的結果是由兩表join後生成。經分析,join的結果産生了資料傾斜,且傾斜key為null。Join後每個task寫一個檔案,是以partition key為null的那個task将大量的null值寫入了一個檔案,null值個數達到22億。

22億這個數字比較敏感,正好超過int最大值2147483647(21億多)。是以,初步懷疑parquet在寫入超過int.max個value時有問題。

【注】本文隻關注大量null值寫入同一個檔案導緻讀取時報錯的問題。至于該列資料産生如此大量的null是否合理,不在本文讨論範圍之内。

3. Deep dive into Parquet (version 1.8.3,部分内容可能需要結合Parquet源碼進行了解)

入口:Spark(Spark 2.3版本) -> Parquet

Parquet調用入口在Spark,是以從Spark開始挖掘調用棧。

InsertIntoHadoopFsRelationCommand.run()/SaveAsHiveFile.saveAsHiveFile() -> FileFormatWriter.write()

這裡分幾個步驟:

  1. 啟動作業前,建立outputWriterFactory: ParquetFileFormat.prepareWrite()。這裡會設定一系列與parquet寫檔案有關的配置資訊。其中主要的一個,是設定WriteSupport類:ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]),ParquetWriteSupport是Spark自己定義的類。
  2. 在executeTask() -> writeTask.execute()中,先通過outputWriterFactory建立OutputWriter (ParquetOutputWriter):outputWriterFactory.newInstance()。
  3. 對于每行記錄,使用ParquetOutputWriter.write(InternalRow)方法依次寫入parquet檔案。
  4. Task結束前,調用ParquetOutputWriter.close()關閉資源。

3.1 Write過程

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

在ParquetOutputWriter中,通過ParquetOutputFormat.getRecordWriter構造一個RecordWriter(ParquetRecordWriter),其中包含了:

  1. prepareWrite()時設定的WriteSupport:負責轉換Spark record并寫入parquet結構
  2. ParquetFileWriter:負責寫入檔案

ParquetRecordWriter中,其實是把write操作委托給了一個internalWriter(InternalParquetRecordWriter,用WriteSupport和ParquetFileWriter構造)。

現在讓我們梳理一下,目前為止的大緻流程為:

SingleDirectoryWriteTask/DynamicPartitionWriteTask.execute

-> ParquetOutputWriter.write -> ParquetRecordWriter.write -> InternalParquetRecordWriter.write

接下來,InternalParquetRecordWriter.write裡面,就是三件事:

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

(1)writeSupport.write,即ParquetWriteSupport.write,裡面分三個步驟:

    1. MessageColumnIO.MessageColumnIORecordConsumer.startMessage;
    2. ParquetWriteSupport.writeFields:寫入一行中各個列的值,null值除外;
    3. MessageColumnIO.MessageColumnIORecordConsumer.endMessage:針對第二步中的missing fields寫入null值。

      ColumnWriterV1.writeNull -> accountForValueWritten:

      1) 增加計數器valueCount (int類型)

      2) 檢查空間是否已滿,需要writePage - 檢查點1

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

(2)增加計數器recordCount(long類型)

(3)檢查block size,是否需要flushRowGroupToStore - 檢查點2

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

由于寫入的值全是null,在1、2兩個檢查點的memSize都為0,是以不會重新整理page和row group。導緻的結果就是,一直在往同一個page裡增加null值。而ColumnWriterV1的計數器valueCount是int類型,當超過int.max時,溢出,變為了一個負數。

是以,隻有當調用close()方法時(task結束時),才會執行flushRowGroupToStore:

ParquetOutputWriter.close -> ParquetRecordWriter.close

-> InternalParquetRecordWriter.close -> flushRowGroupToStore

-> ColumnWriteStoreV1.flush -> for each column ColumnWriterV1.flush

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

由于valueCount溢出為負,此處也不會寫page。

因為未調用過writePage,是以此處的totalValueCount一直為0。

ColumnWriterV1.writePage -> ColumnChunkPageWriter.writePage -> 累計totalValueCount

在write結束時,InternalParquetRecordWriter.close -> flushRowGroupToStore -> ColumnChunkPageWriteStore.flushToFileWriter -> for each column ColumnChunkPageWriter.writeToFileWriter:

  1. ParquetFileWriter.startColumn:totalValueCount指派給currentChunkValueCount
  2. ParquetFileWriter.writeDataPages
  3. ParquetFileWriter.endColumn:currentChunkValueCount(為0)和其他中繼資料資訊構造出一個ColumnChunkMetaData,相關資訊最終會被寫入檔案。

3.2 Read過程

同樣以Spark為入口,進行檢視。

初始化階段:ParquetFileFormat.BuildReaderWithPartitionValues -> VectorizedParquetRecordReader.initialize -> ParquetFileReader.readFooter -> ParquetMetadataConverter.readParquetMetadata -> fromParquetMetadata -> ColumnChunkMetaData.get,其中包含valueCount(為0)。

讀取時:VectorizedParquetRecordReader.nextBatch -> checkEndOfRowGroup:

1) ParquetFileReader.readNextRowGroup -> for each chunk, currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages())

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

由于getValueCount為0,是以pagesInChunk為空。

2)構造ColumnChunkPageReader:

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

由于page清單為空,是以totalValueCount為0,導緻在構造VectorizedColumnReader時報了問題中的錯誤。

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

4. 解決方法:Parquet更新(version 1.11.1)

在新版本中,ParquetWriteSupport.write ->

MessageColumnIO.MessageColumnIORecordConsumer.endMessage ->

ColumnWriteStoreV1(ColumnWriteStoreBase).endRecord:

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

在endRecord中增加了每個page最大記錄條數(預設2w條)的屬性和檢查邏輯,超出限制時會writePage,使得ColumnWriterV1的valueCount不會溢出(每次writePage後會清零)。

而對比老版本1.8.3中,ColumnWriteStoreV1.endRecord為空函數。

你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

附:Parquet中的一個小trick

Parquet中為了節約空間,當一個long類型的值,在一定範圍内時,會使用int來存儲,其方法如下:

  • 判斷是否可以用int存儲:
你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅
  • 如果可以,用IntColumnChunkMetaData代替LongColumnChunkMetaData,構造時轉換:
你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅
  • 使用時,再轉回來,IntColumnChunkMetaData.getValueCount -> intToPositiveLong():
你的Parquet該更新了:IOException: totalValueCount == 0問題定位之旅

普通的int範圍是 -2^31 ~ (2^31 - 1),由于中繼資料資訊(如valueCount等)都是非負整數,那麼實際隻能存儲0 ~ (2^31 - 1) 範圍的數。而用這種方法,可以表示0 ~ (2^32 - 1) 範圍的數,表達範圍也大了一倍。

附件:可用于複現的測試用例代碼(依賴Spark部分類,可置于Spark工程中運作)

繼續閱讀