天天看點

SparkSQL與Hive metastore Parquet轉換

本文轉載自公衆号:大資料學習與分享

Spark SQL為了更好的性能,在讀寫Hive metastore parquet格式的表時,會預設使用自己的Parquet SerDe,而不是采用Hive的SerDe進行序列化和反序列化。該行為可以通過配置參數spark.sql.hive.convertMetastoreParquet進行控制,預設true。

這裡從表schema的處理角度而言,就必須注意Hive和Parquet相容性,主要有兩個差別:

1.Hive是大小寫敏感的,但Parquet相反

2.Hive會将所有列視為nullable,但是nullability在parquet裡有獨特的意義

由于上面的原因,在将Hive metastore parquet轉化為Spark SQL parquet時,需要相容處理一下Hive和Parquet的schema,即需要對二者的結構進行一緻化。主要處理規則是:

1.有相同名字的字段必須要有相同的資料類型,忽略nullability。相容處理的字段應該保持Parquet側的資料類型,這樣就可以處理到nullability類型了(空值問題)

2.相容處理的schema應隻包含在Hive中繼資料裡的schema資訊,主要展現在以下兩個方面:

(1)隻出現在Parquet schema的字段會被忽略

(2)隻出現在Hive中繼資料裡的字段将會被視為nullable,并處理到相容後的schema中

關于schema(或者說中繼資料metastore),Spark SQL在處理Parquet表時,同樣為了更好的性能,會緩存Parquet的中繼資料資訊。此時,如果我們直接通過Hive或者其他工具對該Parquet表進行修改導緻了中繼資料的變化,那麼Spark SQL緩存的中繼資料并不能同步更新,此時需要手動重新整理Spark SQL緩存的中繼資料,來確定中繼資料的一緻性,方式如下:

// 第一種方式應用的比較多
1. sparkSession.catalog.refreshTable(s"${dbName.tableName}")
2. sparkSession.catalog.refreshByPath(s"${path}")           

最後說一下最近背景小夥伴在生産中遇到的一個問題,大家如果在業務進行中遇到類似的問題,提供一個思路。

在說問題之前首先了解一個參數spark.sql.parquet.writeLegacyFormat(預設false)的作用:

設定為true時,資料會以Spark1.4和更早的版本的格式寫入。比如decimal類型的值會被以Apache Parquet的fixed-length byte array格式寫出,該格式是其他系統例如Hive、Impala等使用的。

設定為false時,會使用parquet的新版格式。例如,decimals會以int-based格式寫出。如果Spark SQL要以Parquet輸出并且結果會被不支援新格式的其他系統使用的話,需要設定為true。

比如,對于decimal資料類型的相容處理,不設定true時,經常會報類似如下的錯誤:

Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://hadoop/data/test_decimal/dt=20200515000000/part-00000-9820eba2-8a40-446d-8c28-37027a1b1f2d-c000.snappy.parquet
  at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
  at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
  at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122)
  at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)
  at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)
Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
  at parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)
           

此時我們需要将spark.sql.parquet.writeLegacyFormat設定為true來解決上述的異常問題。

但如果同時設定spark.sql.hive.convertMetastoreParquet為false時,要注意一些資料類型以及精度的處理,比如對于decimal類型的處理。通過一個例子複原一下當時的場景:

1.建立Hive外部表testdb.test_decimal,其中字段fee_rate為decimal(10,10)

CREATE EXTERNAL TABLE `testdb`.`test_decimal`(`no` STRING ,
            `fee_rate` DECIMAL(10,10)) 
PARTITIONED BY (`dt` STRING ) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION 'hdfs://hadoop/data/test_decimal' 
TBLPROPERTIES ( 'transient_lastDdlTime' = '1589160440' ) ;           

2.将testdb.item中的資料處理後儲存到testdb.test_decimal中

// 這裡為了展示友善,直接查詢testdb.item中的資料
// 注意: 字段fee_rate的類型為decimal(10,6)
select no, fee_rate from testdb.item  where dt=20190528;
​
// testdb.item中資料示例如下
+-------------------+----------------+
|                 no|       fee_rate|
+-------------------+----------------+
|                  1|        0.000000|
|                  2|        0.000000|
|                  3|        0.000000|
+-------------------+----------------+           

3.将testdb.item中的資料儲存到testdb.test_decimal中

// tmp是上述查詢testdb.item獲得的臨時表
// 以parquet格式儲存到test_decimal的20200529分區中
save overwrite tmp as parquet.`/data/test_decimal/dt=20200529`; 
msck repair TABLE testdb.item;           

上述1-3都能成功執行,資料也能儲存到testdb.test_decimal中,但是當查詢testdb.test_decimal中的資料時,比如執行sql:

select * from testdb.test_decimal where dt = 20200529;
           

會報如下空指針的異常:

Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException
  at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107)
  at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415)
  at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414)
  at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:443)
  at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:434)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)           

究其原因是因為按照上述兩個參數的配置,testdb.item中fee_rate字段類型為decimal(10,6),資料為0.000000,經過一系列處理0.000000最終會被處理為0,看下邊最終導緻空指針異常的部分,就會一目了然。

public static BigDecimal enforcePrecisionScale(BigDecimal bd, int maxPrecision, int maxScale) {
        if (bd == null) {
            return null;
        } else {
            bd = trim(bd);
            if (bd.scale() > maxScale) {
                bd = bd.setScale(maxScale, RoundingMode.HALF_UP);
            }
            // testdb.test_decimal中fee_rate的類型decimal(10,10),即precision為10,scale也為10
            // 對應這裡即maxPrecision和maxScale分别為10,則maxIntDigits為0
            int maxIntDigits = maxPrecision - maxScale;
            
            // bd對應0。對于0而言,precision為1,scale為0
            // 處理之後 intDigits為1
            int intDigits = bd.precision() - bd.scale();
            return intDigits > maxIntDigits ? null : bd;
        }
}           

解決辦法也很簡單,就是将testdb.test_decimal中的fee_rate資料類型和依賴的表testdb.item中的fee_rate保持完全一緻,即也為decimal(10,6)。

這個現象在實際應用環境中經常遇到,通用的解決辦法就是将要儲存的表中的資料類型與依賴的表(實體表或者臨時表)的字段類型保持完全一緻。

​----

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

SparkSQL與Hive metastore Parquet轉換

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

SparkSQL與Hive metastore Parquet轉換

Apache Spark技術交流社群公衆号,微信掃一掃關注

SparkSQL與Hive metastore Parquet轉換

繼續閱讀