天天看點

資料湖(四):Hudi與Spark整合

資料湖(四):Hudi與Spark整合

文章目錄

​​Hudi與Spark整合​​

​​一、向Hudi插入資料​​

​​1、建立項目,修改pom.xml為如下内容​​

​​2、編寫向Hudi插入資料代碼​​

​​二、指定分區向hudi中插入資料​​

​​1、指定一個分區列​​

​​2、指定分區為多個列時,可以先拼接,後指定拼接字段當做分區列: ​​

​​三、 讀取Hudi資料​​

​​四、更新Hudi資料​​

​​五、 增量查詢Hudi資料​​

​​六、指定時間範圍查詢Hudi資料​​

​​1、向原有Hudi表“person_infos”中插入兩次資料​​

​​2、指定時間段查詢Hudi中的資料​​

​​七、删除Hudi資料​​

​​八、更新Hudi某個分區資料​​

​​1、删除person_infos對應的目錄,重新插入資料,代碼如下​​

​​2、讀取更新分區資料,插入到Hudi preson_infos表中​​

​​九、覆寫Hudi整個表資料​​

​​1、删除Hudi表person_infos對應的HDFS路徑,重新插入資料​​

​​2、讀取新資料,覆寫原有Hudi表資料​​

​​十、Spark操作Hudi Merge On Read 模式​​

​​十一、測試COW模式parquet檔案删除與MOR模式Parquet檔案與log檔案Compact​​

Hudi與Spark整合

一、向Hudi插入資料

預設Spark操作Hudi使用表類型為Copy On Write模式。Hudi與Spark整合時有很多參數配置,可以參照https://hudi.apache.org/docs/configurations.html配置項來查詢,此外,整合時有幾個需要注意的點,如下:

  • Hudi這裡使用的是0.8.0版本,其對應使用的Spark版本是2.4.3+版本
  • Spark2.4.8使用的Scala版本是2.12版本,雖然2.11也是支援的,建議使用2.12。
  • maven導入包中需要保證httpclient、httpcore版本與叢集中的Hadoop使用的版本一緻,不然會導緻通信有問題。檢查Hadoop使用以上兩個包的版本路徑為:$HADOOP_HOME/share/hadoop/common/lib。
  • 在編寫代碼過程中,指定資料寫入到HDFS路徑時直接寫“/xxdir”不要寫“hdfs://mycluster/xxdir”,後期會報錯“java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie\.temp/2022xxxxxxxxxx/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/2022xxxxxxxxxx”,可以将對應的hdfs-site.xml、core-site.xml放在resources目錄下,直接會找HDFS路徑。

1、建立項目,修改pom.xml為如下内容

UTF-8
  2.12.14
  2.4.8



  
  
    org.scala-lang
    scala-library
    ${scala.version}
  
  
  
      org.apache.httpcomponents
      httpclient
      4.5.2
  
  
  
      org.apache.httpcomponents
      httpcore
      4.4.4
  
  
  
    org.apache.spark
    spark-core_2.12
    ${spark.version}
    
      
        org.apache.httpcomponents
        httpclient
      
      
          org.apache.httpcomponents
          httpcore
      
    
  

  
    org.apache.spark
    spark-sql_2.12
    ${spark.version}
  

  
    org.apache.spark
    spark-avro_2.12
    ${spark.version}
  

  
  
    org.apache.spark
    spark-hive_2.12
    ${spark.version}
  

  
  
    org.apache.hive
    hive-jdbc
    1.2.1
  

  
    org.apache.hudi
    hudi-spark-bundle_2.12
    0.8.0
  




  
    
    
      org.scala-tools
      maven-scala-plugin
      2.15.2
      
        
          
            compile
            testCompile
          
        
      
    

    
    
      maven-assembly-plugin
      2.4
      
        
        
        
          jar-with-dependencies
        
        
          
            com.xxx
          
        
      
      
        
          make-assembly
          package
          
            assembly      

2、編寫向Hudi插入資料代碼

val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    //關閉日志
//    session.sparkContext.setLogLevel("Error")

    //建立DataFrame
    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")


    //将結果儲存到hudi中
    insertDF.write.format("org.apache.hudi")//或者直接寫hudi
      //設定主鍵列名稱
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
      //當資料主鍵相同時,對比的字段,儲存該字段大的資料
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt")S
      //并行度設定,預設1500
      .option("hoodie.insert.shuffle.parallelism","2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      //表名設定
      .option(HoodieWriteConfig.TABLE_NAME,"person_infos")
      .mode(SaveMode.Overwrite)
      //注意:這裡要選擇hdfs路徑存儲,不要加上hdfs://mycluster//dir
      //将hdfs 中core-site.xml 、hdfs-site.xml放在resource目錄下,直接寫/dir路徑即可,否則會報錯:java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie\.temp/20220509164730/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/20220509164730
      .save("/hudi_data/person_infos")      
資料湖(四):Hudi與Spark整合

二、指定分區向hudi中插入資料

向Hudi中存儲資料時,如果沒有指定分區列,那麼預設隻有一個default分區,我們可以儲存資料時指定分區列,可以在寫出時指定“DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY”選項來指定分區列,如果涉及到多個分區列,那麼需要将多個分區列進行拼接生成新的字段,使用以上參數指定新的字段即可。

1、指定一個分區列

insertDF.write.format("org.apache.hudi")
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  //指定分區列
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
  .option("hoodie.insert.shuffle.parallelism", "2")
  .option("hoodie.upsert.shuffle.parallelism", "2")
  .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  .mode(SaveMode.Overwrite)
  .save("/hudi_data/person_infos")      
資料湖(四):Hudi與Spark整合

2、指定分區為多個列時,可以先拼接,後指定拼接字段當做分區列: 

指定兩個分區,需要拼接

//導入函數,拼接列
import org.apache.spark.sql.functions._
val endDF: DataFrame = insertDF.withColumn("partition_key", concat_ws("-", col("data_dt"), col("loc")))
endDF.write.format("org.apache.hudi")
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  //指定分區列,這裡是拼接的列
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition_key")
  .option("hoodie.insert.shuffle.parallelism", "2")
  .option("hoodie.upsert.shuffle.parallelism", "2")
  .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  .mode(SaveMode. )
   .save("/hudi_data/person_infos")      
資料湖(四):Hudi與Spark整合

三、 讀取Hudi資料

使用SparkSQL讀取Hudi中的資料,無法使用讀取表方式來讀取,需要指定HDFS對應的路徑來加載,指定的路徑隻需要指定到*.parquet目前路徑或者上一層路徑即可,路徑中可以使用“*”來替代任意目錄和資料。

讀取資料傳回的結果中除了原有的資料之外,還會攜帶Hudi對應的列資料,例如:hudi的主鍵、分區、送出時間、對應的parquet名稱。

Spark讀取Hudi表資料代碼如下:

val session: SparkSession = SparkSession.builder().master("local").appName("queryDataFromHudi")
  .getOrCreate()
//讀取的資料路徑下如果有分區,會自動發現分區資料,需要使用 * 代替,指定到parquet格式資料上層目錄即可。
val frame: DataFrame = session.read.format("org.apache.hudi").load("/hudi_data/person_infos/*/*")
frame.createTempView("personInfos")

//查詢結果
val result = session.sql(
  """
    | select * from personInfos
  """.stripMargin)

result.show(false)      
資料湖(四):Hudi與Spark整合

四、更新Hudi資料

向Hudi中更新資料有如下幾個特點

  • 同一個分區内,向Hudi中更新資料是用主鍵來判斷資料是否需要更新的,這裡判斷的是相同分區内是否有相同主鍵,不同分區内允許有相同主鍵。
  • 更新資料時,如果原來資料有分區,一定要指定分區,不然就相當于是向相同表目錄下插入資料,會生成對應的“default”分區。
  • 向Hudi中更新資料時,與向Hudi中插入資料一樣,但是寫入的模式需要指定成“Append”,如果指定成“overwrite”,那麼就是全覆寫了。建議使用時一直使用“Append”模式即可。
  • 當更新完成之後,再一次從Hudi中查詢資料時,會看到Hudi送出的時間字段為最新的時間。

這裡将原有的三條資料改成如下三條資料:

#修改之前
{"id":1,"name":"zs1","age":18,"loc":"beijing","data_dt":"20210709"}
{"id":2,"name":"zs2","age":19,"loc":"shanghai","data_dt":"20210709"}
{"id":3,"name":"zs3","age":20,"loc":"beijing","data_dt":"20210709"}

#修改之後
{"id":1,"name":"ls1","age":40,"loc":"beijing","data_dt":"20210709"} --更新資料
{"id":2,"name":"ls2","age":50,"loc":"shanghai","data_dt":"20210710"} --更新資料
{"id":3,"name":"ls3","age":60,"loc":"ttt","data_dt":"20210711"}  --相當于是新增資料      

 更新Hudi資料代碼如下:

val session: SparkSession = SparkSession.builder().master("local").appName("updataDataToHudi")
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .getOrCreate()

//讀取修改資料
val updateDataDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\updatedata.json")

//向Hudi 更新資料
updateDataDF.write.format("org.apache.hudi") //或者直接寫hudi
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
  .option("hoodie.insert.shuffle.parallelism", "2")
  .option("hoodie.upsert.shuffle.parallelism", "2")
  .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  .mode(SaveMode.Append)
  .save("/hudi_data/person_infos")


//查詢資料
val frame: DataFrame = session.read.format("org.apache.hudi").load("/hudi_data/person_infos/*/*")
frame.createTempView("personInfos")
//查詢結果
val result = session.sql(
  """
    | select * from personInfos
  """.stripMargin)
result.show(false)      
資料湖(四):Hudi與Spark整合

五、 增量查詢Hudi資料

Hudi可以根據我們傳入的時間戳查詢此時間戳之後的資料,這就是增量查詢,需要注意的是增量查詢必須通過以下方式在Spark中指定一個時間戳才能正常查詢:

option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,timestamp)

例如:原始資料如下:

資料湖(四):Hudi與Spark整合

我們可以查詢 20210709220335之後的資料,查詢結果如下:

資料湖(四):Hudi與Spark整合

代碼如下:

val session: SparkSession = SparkSession.builder().master("local").appName("updataDataToHudi")
  .getOrCreate()

//關閉日志
session.sparkContext.setLogLevel("Error")

//導入隐式轉換
import session.implicits._

//查詢全量資料,查詢對應的送出時間,找出倒數第二個時間
val basePath = "/hudi_data/person_infos"
session.read.format("hudi").load(basePath+"/*/*").createTempView("personInfos")

val df: DataFrame = session.sql("select distinct(_hoodie_commit_time) as commit_time from personInfos order by commit_time desc")
//這裡擷取由大到小排序的第二個值
val dt: String = df.map(row=>{row.getString(0)}).collect()(1)

//增量查詢
val result:DataFrame = session.read.format("hudi")
/**
  * 指定資料查詢方式,有以下三種:
  * val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"    -- 擷取最新所有資料 , 預設
  * val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"  --擷取指定時間戳後的變化資料
  * val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"  -- 隻查詢Base檔案中的資料
  *
  * 1) Snapshot mode (obtain latest view, based on row & columnar data)
  * 2) incremental mode (new data since an instantTime)
  * 3) Read Optimized mode (obtain latest view, based on columnar data)
  *
  * Default: snapshot
  */
  .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
  //必須指定一個開始查詢的時間,不指定報錯
  .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,dt)
  .load(basePath+"/*/*")

result.show(false)      

六、指定時間範圍查詢Hudi資料

Hudi還可以通過指定開始時間和結束時間來查詢時間範圍内的資料。如果想要查詢最早的時間點到某個結束時刻的資料,開始時間可以指定成“000”。

1、向原有Hudi表“person_infos”中插入兩次資料

目前hudi表中的資料如下:

資料湖(四):Hudi與Spark整合

先執行兩次新的資料插入,兩次插入資料之間的間隔時間至少為1分鐘,兩次插入資料代碼如下:

//以下代碼分兩次向 HDFS /hudi_data/person_infos 路徑中插入資料,兩次運作至少1分鐘以上
val session: SparkSession = SparkSession.builder().master("local").appName("PointTimeQueryHudi")
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .getOrCreate()

//讀取第一個檔案,向Hudi中插入資料
val df1: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\appendData1.json")
val df2: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\appendData2.json")

//向Hudi中插入資料
df2.write.format("hudi")
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
  .option("hoodie.insert.shuffle.parallelism", "2")
  .option("hoodie.upsert.shuffle.parallelism", "2")
  .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  .mode(SaveMode.Append)
  .save("/hudi_data/person_infos")

import org.apache.spark.sql.functions._
//查詢資料
session.read.format("hudi").load("/hudi_data/person_infos/*/*")
  .orderBy(col("_hoodie_commit_time"))
  .show(100,false)      

此時,資料如下:

資料湖(四):Hudi與Spark整合

2、指定時間段查詢Hudi中的資料

代碼如下:

val session: SparkSession = SparkSession.builder().master("local").appName("PointTimeQueryHudi")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    //指定時間段,查詢hudi中的資料
//    val beginTime = "000"
    val beginTime = "20210710002148"
    val endTime = "20210710002533"

    val result: DataFrame = session.read.format("hudi")
      //指定增量查詢
      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
      //指定查詢開始時間(不包含),“000”指定為最早時間
      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime)
      //指定查詢結束時間(包含)
      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, endTime)
      .load("/hudi_data/person_infos/*/*")

    result.createTempView("temp")
    session.sql(
      """
        | select * from temp order by _hoodie_commit_time
      """.stripMargin).show(100,false)      

開始時間為“000”,相當于是從頭開始查詢到endTime的資料:

資料湖(四):Hudi與Spark整合

開始時間為“20210710002148”: 

資料湖(四):Hudi與Spark整合

七、删除Hudi資料

我們準備對應的主鍵及分區的資料,将Hudi中對應的主鍵及分區的資料進行删除,在删除Hudi中的資料時,需要指定option(OPERATION_OPT_KEY,"delete")配置項,并且寫入模式隻能是Append,不支援其他寫入模式,另外,設定下删除執行的并行度,預設為1500個,這裡可以設定成2個。

原始資料如下:

資料湖(四):Hudi與Spark整合

準備要删除的資料如下:

{"id":11,"loc":"beijing"}
{"id":12,"loc":"beijing"}
{"id":13,"loc":"beijing"}
{"id":14,"loc":"shenzhen"}
{"id":15,"loc":"tianjian"}  --此條資料對應的主鍵一緻,但是分區不一緻,不能在Hudi中删除      

 編寫代碼如下:

val session: SparkSession = SparkSession.builder().master("local").appName("DeleteHudiData")
  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  .getOrCreate()

//讀取需要删除的資料,隻需要準備對應的主鍵及分區即可,字段保持與Hudi中需要删除的字段名稱一緻即可
//讀取的檔案中準備了一個主鍵在Hudi中存在但是分區不再Hudi中存在的資料,此主鍵資料在Hudi中不能被删除,需要分區和主鍵字段都比對才能删除
val deleteData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\deleteData.json")

//将删除的資料插入到Hudi中
deleteData.write.format("hudi")
  //指定操作模式為delete
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"delete")
  //指定主鍵
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
  //指定分區字段
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
  //指定表名,這裡的表明需要與之前指定的表名保持一緻
  .option(HoodieWriteConfig.TABLE_NAME,"person_infos")
  //設定删除并行度設定,預設1500并行度
  .option("hoodie.delete.shuffle.parallelism", "2")
  .mode(SaveMode.Append)
  .save("/hudi_data/person_infos")

//執行完成之後,查詢結果
import org.apache.spark.sql.functions._
session.read.format("hudi").load("/hudi_data/person_infos/*/*")
  .orderBy(col("_hoodie_commit_time")).show(100,false)      

結果如下:

資料湖(四):Hudi與Spark整合

八、更新Hudi某個分區資料

如果我們想要更新Hudi某個分區的資料,其他分區資料正常使用,那麼可以通過配置option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite")選項,該選項“insert_overwrite”可以直接在中繼資料層面上操作,直接将寫入某分區的新資料替換到該分區内,原有資料會在一定時間内删除,相比upsert更新Hudi速度要快。

1、删除person_infos對應的目錄,重新插入資料,代碼如下

val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  .getOrCreate()

//建立DataFrame
val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
insertDF.write.format("org.apache.hudi")
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
  .option("hoodie.insert.shuffle.parallelism", "2")
  .option("hoodie.upsert.shuffle.parallelism", "2")
  .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  .mode(SaveMode.Append)
  .save("/hudi_data/person_infos")

//寫入完成之後,查詢hudi 資料:
val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
person_infos.show(100,false)      
資料湖(四):Hudi與Spark整合

2、讀取更新分區資料,插入到Hudi preson_infos表中

讀取資料如下:

{"id":1,"name":"s1","age":1,"loc":"beijing","data_dt":"20210710"}
{"id":100,"name":"s2","age":2,"loc":"beijing","data_dt":"20210710"}
{"id":200,"name":"s3","age":3,"loc":"beijing","data_dt":"20210710"}
{"id":8,"name":"w1","age":4,"loc":"chongqing","data_dt":"20210710"}
{"id":300,"name":"w2","age":5,"loc":"chongqing","data_dt":"20210710"}      

代碼如下:

val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    //讀取需要替換的資料,将beijing分區資料替換成2條,将chognqing分區資料替換成1條
    val overWritePartitionData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\overWrite.json")

    //寫入hudi表person_infos,替換分區
    overWritePartitionData.write.format("hudi")
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite")
      //設定主鍵列名稱
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
      //當資料主鍵相同時,對比的字段,儲存該字段大的資料
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
      //指定分區列
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
      //并行度設定
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      //表名設定
      .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
      .mode(SaveMode.Append)
      .save("/hudi_data/person_infos")

    //寫入完成之後,查詢hudi 資料:
    val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
    person_infos.show(100,false)      
資料湖(四):Hudi與Spark整合

九、覆寫Hudi整個表資料

如果我們想要替換Hudi整個表資料,可以在向Hudi表寫入資料時指定配置option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite_table")選項,該選項“insert_overwrite_table”可以直接在中繼資料層面上操作,直接将資料寫入表,原有資料會在一定時間内删除,相比删除原有資料再插入更友善。

1、删除Hudi表person_infos對應的HDFS路徑,重新插入資料

val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    //建立DataFrame
    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
    insertDF.write.format("org.apache.hudi")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
      .mode(SaveMode.Append)
      .save("/hudi_data/person_infos")

    //寫入完成之後,查詢hudi 資料:
    val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
    person_infos.show(100,false)      
資料湖(四):Hudi與Spark整合

2、讀取新資料,覆寫原有Hudi表資料

覆寫更新的資料如下:

{"id":1,"name":"s1","age":1,"loc":"beijing","data_dt":"20210710"}
{"id":100,"name":"s2","age":2,"loc":"beijing","data_dt":"20210710"}
{"id":200,"name":"s3","age":3,"loc":"beijing","data_dt":"20210710"}
{"id":8,"name":"w1","age":4,"loc":"chongqing","data_dt":"20210710"}
{"id":300,"name":"w2","age":5,"loc":"chongqing","data_dt":"20210710"}      

代碼如下:

val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    //讀取需要替換的資料,覆寫原有表所有資料
    val overWritePartitionData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\overWrite.json")

    //寫入hudi表person_infos,替換分區
    overWritePartitionData.write.format("hudi")
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite_table")
      //設定主鍵列名稱
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
      //當資料主鍵相同時,對比的字段,儲存該字段大的資料
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
      //指定分區列
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
      //并行度設定
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      //表名設定
      .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
      .mode(SaveMode.Append)
      .save("/hudi_data/person_infos")

    //寫入完成之後,查詢hudi 資料:
    val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
    person_infos.show(100,false)      
資料湖(四):Hudi與Spark整合

十、Spark操作Hudi Merge On Read 模式

預設Spark操作Hudi使用Copy On Write模式,也可以使用Merge On Read 模式,通過代碼中國配置如下配置來指定:

option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)

代碼操作如下:

  • 删除原有person_infos對應的HDFS路徑
  • 讀取資料向Hudi表person_info中插入資料

讀取的資料如下:

{"id":1,"name":"zs1","age":18,"loc":"beijing","data_dt":"20210709"}
{"id":2,"name":"zs2","age":19,"loc":"shanghai","data_dt":"20210709"}
{"id":3,"name":"zs3","age":20,"loc":"beijing","data_dt":"20210709"}
{"id":4,"name":"zs4","age":21,"loc":"tianjin","data_dt":"20210709"}
{"id":5,"name":"zs5","age":22,"loc":"shenzhen","data_dt":"20210709"}
{"id":6,"name":"zs6","age":23,"loc":"hainai","data_dt":"20210709"}
{"id":7,"name":"zs7","age":24,"loc":"beijing","data_dt":"20210709"}
{"id":8,"name":"zs8","age":25,"loc":"chongqing","data_dt":"20210709"}
{"id":9,"name":"zs9","age":26,"loc":"shandong","data_dt":"20210709"}
{"id":10,"name":"zs10","age":27,"loc":"hunan","data_dt":"20210709"}      

代碼如下:

//1.讀取json格式資料
val insertDf: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")

//2.将結果使用Merge on Read 模式寫入到Hudi中,并設定分區
insertDf.write.format("hudi")
  //設定表模式為 mor
  .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt")
  //并行度設定
  .option("hoodie.insert.shuffle.parallelism", "2")
  .option("hoodie.upsert.shuffle.parallelism", "2")
  //表名設定
  .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  .mode(SaveMode.Append)
  .save("/hudi_data/person_infos")      
資料湖(四):Hudi與Spark整合
  • 更新Hudi表person_info資料

這裡更新“beijing”、“shanghai”、“ttt”分區資料,更新資料如下:

{"id":1,"name":"ls1","age":40,"loc":"beijing","data_dt":"20210709"}
{"id":2,"name":"ls2","age":50,"loc":"shanghai","data_dt":"20210710"}
{"id":3,"name":"ls3","age":60,"loc":"ttt","data_dt":"20210711"}      

代碼如下:

//3.讀取更新資料,并執行插入更新
val updateDf: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\updatedata.json")

updateDf.write.format("hudi")
  //設定表模式為 mor
  .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt")
  //并行度設定
  .option("hoodie.insert.shuffle.parallelism", "2")
  .option("hoodie.upsert.shuffle.parallelism", "2")
  //表名設定
  .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  .mode(SaveMode.Append)
  .save("/hudi_data/person_infos")      
資料湖(四):Hudi與Spark整合
  • 增量查詢Hudi表中的資料

Snapshot 模式查詢,這種模式對于COW或者MOR模式都是查詢到目前時刻全量的資料,如果有更新,那麼就是更新之後全量的資料:

//4.使用不同模式查詢 MOR 表中的資料
/**
  * 指定資料查詢方式,有以下三種:
  * val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"    -- 擷取最新所有資料 , 預設
  * val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"  --擷取指定時間戳後的變化資料
  * val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"  -- 隻查詢Base檔案中的資料
  *
  * 1) Snapshot mode (obtain latest view, based on row & columnar data)
  * 2) incremental mode (new data since an instantTime)
  * 3) Read Optimized mode (obtain latest view, based on columnar data)
  *
  * Default: snapshot
  */
//4.1 Snapshot 模式查詢
session.read.format("hudi")
  .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
  .load("/hudi_data/person_infos/*/*")
  .show(100,false)      
資料湖(四):Hudi與Spark整合

incremental 模式查詢,這種模式需要指定一個時間戳,查詢指定時間戳之後的新增資料: 

//4.2 incremental 模式查詢,查詢指定時間戳後的資料
session.read.format("hudi")
  .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
  //必須指定一個開始查詢的時間,不指定報錯
  .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,"20210710171240")
  .load("/hudi_data/person_infos/*/*")
  .show(100,false)      
資料湖(四):Hudi與Spark整合

Read Optimized 模式查詢,這種模式隻查詢Base中的資料,不會查詢MOR中Log檔案中的資料,代碼如下:

//4.3 Read Optimized 模式查詢,查詢Base中的資料,不會查詢log中的資料
session.read.format("hudi")
  .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
  .load("/hudi_data/person_infos/*/*")
  .show(100,false)      
資料湖(四):Hudi與Spark整合

十一、測試COW模式parquet檔案删除與MOR模式Parquet檔案與log檔案Compact

 COW預設情況下,每次更新資料Commit都會基于之前parquet檔案生成一個新的Parquet Base檔案資料,預設曆史parquet檔案數為10,當超過10個後會自動删除舊的版本,可以通過參數“hoodie.cleaner.commits.retained”來控制保留的FileID版本檔案數,預設是10。測試代碼如下:

val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    //建立DataFrame
    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata1.json")
//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata2.json")
//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata3.json")
//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata4.json")
//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata5.json")
//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata6.json")
//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata7.json")
//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata8.json")
//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata9.json")
//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata10.json")

    insertDF.write.format("org.apache.hudi")
      //設定cow模式
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
      //根據commit送出次數計算保留多少個fileID版本檔案,預設10。
      .option("hoodie.cleaner.commits.retained","3")
      //設定主鍵列名稱
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
      //當資料主鍵相同時,對比的字段,儲存該字段大的資料
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
      //并行度設定,預設1500并行度
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      //表名設定
      .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
      .mode(SaveMode.Append)
      .save("/hudi_data/test_person")

    //查詢結果資料
    session.read.format("hudi")
      //全量讀取
      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
      .load("/hudi_data/test_person/*/*").show()      

 測試注意:每次運作代碼,讀取新的一個資料檔案,并檢視Hudi表對應的HDFS路徑,每次讀取都會生成一個新的Parquet檔案,當達到指定的3個曆史版本時(不包含最新Parquet檔案),再插入資料生成新的Parquet檔案時,一緻會将之前的舊版本删除,儲存4個檔案。

資料湖(四):Hudi與Spark整合

MOR模式下,如果有新增資料會直接寫入Base Parquet檔案,這個Parquet檔案個數的控制也是由“hoodie.cleaner.commits.retained”控制,預設為10。當對應的每個FlieSlice(Base Parquet檔案+log Avro檔案)中有資料更新時,會寫入對應的log Avro檔案,那麼這個檔案何時與Base Parquet檔案進行合并,這個是由參數“hoodie.compact.inline.max.delta.commits”決定的,這個參數意思是在送出多少次commit後觸發壓縮政策,預設是5,也就是目前FlieSlice中如果有5次資料更新就會兩者合并生成全量的資料,目前FlieSlice還是這個FileSlice名稱,隻不過對應的parquet檔案中是全量資料,再有更新資料還是會寫入目前FileSlice對應的log日志檔案中。使“hoodie.compact.inline.max.delta.commits”參數起作用,預設必須開啟“hoodie.compact.inline”,此值代表是否完成送出資料後進行壓縮,預設是false。

測試代碼如下:

#注意代碼中設定參數如下:
//根據commit送出次數計算保留多少個fileID版本檔案,預設10。
.option("hoodie.cleaner.commits.retained","3")
//預設false:是否在一個事務完成後内聯執行壓縮操作
.option("hoodie.compact.inline","true")
//設定送出多少次後觸發壓縮政策,預設5
.option("hoodie.compact.inline.max.delta.commits","2")

#完整代碼如下:
val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    //建立DataFrame ,新增
//    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata1.json")

    //建立DataFrame ,更新
    val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\update11.json")

    insertDF.write.format("org.apache.hudi") //或者直接寫hudi
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
      //根據commit送出次數計算保留多少個fileID版本檔案,預設10。
      .option("hoodie.cleaner.commits.retained","3")
      //預設false:是否在一個事務完成後内聯執行壓縮操作
      .option("hoodie.compact.inline","true")
      //設定送出多少次後觸發壓縮政策,預設5
      .option("hoodie.compact.inline.max.delta.commits","2")
      //設定主鍵列名稱
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
      //當資料主鍵相同時,對比的字段,儲存該字段大的資料
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
      //并行度設定,預設1500并行度
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      //表名設定
      .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
      .mode(SaveMode.Append)
      .save("/hudi_data/test_person")

    //查詢結果資料
    session.read.format("hudi")
      //全量讀取
      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
      .load("/hudi_data/test_person/*/*").show()      

 第一次運作插入資料,commit,路徑對應資料目錄如下:

資料湖(四):Hudi與Spark整合

第一次運作更新資料,commit,路徑對應資料目錄如下:

資料湖(四):Hudi與Spark整合

第二次運作更新資料,commit,路徑對應的資料目錄如下: 

資料湖(四):Hudi與Spark整合

第三次運作更新資料,commit,路徑對應的資料目錄如下: 

資料湖(四):Hudi與Spark整合

第四次運作更新資料,commit,路徑對應的資料目錄如下: 

資料湖(四):Hudi與Spark整合

繼續閱讀