文章目錄
Hudi與Spark整合
一、向Hudi插入資料
1、建立項目,修改pom.xml為如下内容
2、編寫向Hudi插入資料代碼
二、指定分區向hudi中插入資料
1、指定一個分區列
2、指定分區為多個列時,可以先拼接,後指定拼接字段當做分區列:
三、 讀取Hudi資料
四、更新Hudi資料
五、 增量查詢Hudi資料
六、指定時間範圍查詢Hudi資料
1、向原有Hudi表“person_infos”中插入兩次資料
2、指定時間段查詢Hudi中的資料
七、删除Hudi資料
八、更新Hudi某個分區資料
1、删除person_infos對應的目錄,重新插入資料,代碼如下
2、讀取更新分區資料,插入到Hudi preson_infos表中
九、覆寫Hudi整個表資料
1、删除Hudi表person_infos對應的HDFS路徑,重新插入資料
2、讀取新資料,覆寫原有Hudi表資料
十、Spark操作Hudi Merge On Read 模式
十一、測試COW模式parquet檔案删除與MOR模式Parquet檔案與log檔案Compact
Hudi與Spark整合
一、向Hudi插入資料
預設Spark操作Hudi使用表類型為Copy On Write模式。Hudi與Spark整合時有很多參數配置,可以參照https://hudi.apache.org/docs/configurations.html配置項來查詢,此外,整合時有幾個需要注意的點,如下:
- Hudi這裡使用的是0.8.0版本,其對應使用的Spark版本是2.4.3+版本
- Spark2.4.8使用的Scala版本是2.12版本,雖然2.11也是支援的,建議使用2.12。
- maven導入包中需要保證httpclient、httpcore版本與叢集中的Hadoop使用的版本一緻,不然會導緻通信有問題。檢查Hadoop使用以上兩個包的版本路徑為:$HADOOP_HOME/share/hadoop/common/lib。
- 在編寫代碼過程中,指定資料寫入到HDFS路徑時直接寫“/xxdir”不要寫“hdfs://mycluster/xxdir”,後期會報錯“java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie\.temp/2022xxxxxxxxxx/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/2022xxxxxxxxxx”,可以将對應的hdfs-site.xml、core-site.xml放在resources目錄下,直接會找HDFS路徑。
1、建立項目,修改pom.xml為如下内容
UTF-8
2.12.14
2.4.8
org.scala-lang
scala-library
${scala.version}
org.apache.httpcomponents
httpclient
4.5.2
org.apache.httpcomponents
httpcore
4.4.4
org.apache.spark
spark-core_2.12
${spark.version}
org.apache.httpcomponents
httpclient
org.apache.httpcomponents
httpcore
org.apache.spark
spark-sql_2.12
${spark.version}
org.apache.spark
spark-avro_2.12
${spark.version}
org.apache.spark
spark-hive_2.12
${spark.version}
org.apache.hive
hive-jdbc
1.2.1
org.apache.hudi
hudi-spark-bundle_2.12
0.8.0
org.scala-tools
maven-scala-plugin
2.15.2
compile
testCompile
maven-assembly-plugin
2.4
jar-with-dependencies
com.xxx
make-assembly
package
assembly
2、編寫向Hudi插入資料代碼
val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//關閉日志
// session.sparkContext.setLogLevel("Error")
//建立DataFrame
val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
//将結果儲存到hudi中
insertDF.write.format("org.apache.hudi")//或者直接寫hudi
//設定主鍵列名稱
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
//當資料主鍵相同時,對比的字段,儲存該字段大的資料
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt")S
//并行度設定,預設1500
.option("hoodie.insert.shuffle.parallelism","2")
.option("hoodie.upsert.shuffle.parallelism", "2")
//表名設定
.option(HoodieWriteConfig.TABLE_NAME,"person_infos")
.mode(SaveMode.Overwrite)
//注意:這裡要選擇hdfs路徑存儲,不要加上hdfs://mycluster//dir
//将hdfs 中core-site.xml 、hdfs-site.xml放在resource目錄下,直接寫/dir路徑即可,否則會報錯:java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie\.temp/20220509164730/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/20220509164730
.save("/hudi_data/person_infos")
二、指定分區向hudi中插入資料
向Hudi中存儲資料時,如果沒有指定分區列,那麼預設隻有一個default分區,我們可以儲存資料時指定分區列,可以在寫出時指定“DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY”選項來指定分區列,如果涉及到多個分區列,那麼需要将多個分區列進行拼接生成新的字段,使用以上參數指定新的字段即可。
1、指定一個分區列
insertDF.write.format("org.apache.hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
//指定分區列
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Overwrite)
.save("/hudi_data/person_infos")
2、指定分區為多個列時,可以先拼接,後指定拼接字段當做分區列:
指定兩個分區,需要拼接
//導入函數,拼接列
import org.apache.spark.sql.functions._
val endDF: DataFrame = insertDF.withColumn("partition_key", concat_ws("-", col("data_dt"), col("loc")))
endDF.write.format("org.apache.hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
//指定分區列,這裡是拼接的列
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition_key")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode. )
.save("/hudi_data/person_infos")
三、 讀取Hudi資料
使用SparkSQL讀取Hudi中的資料,無法使用讀取表方式來讀取,需要指定HDFS對應的路徑來加載,指定的路徑隻需要指定到*.parquet目前路徑或者上一層路徑即可,路徑中可以使用“*”來替代任意目錄和資料。
讀取資料傳回的結果中除了原有的資料之外,還會攜帶Hudi對應的列資料,例如:hudi的主鍵、分區、送出時間、對應的parquet名稱。
Spark讀取Hudi表資料代碼如下:
val session: SparkSession = SparkSession.builder().master("local").appName("queryDataFromHudi")
.getOrCreate()
//讀取的資料路徑下如果有分區,會自動發現分區資料,需要使用 * 代替,指定到parquet格式資料上層目錄即可。
val frame: DataFrame = session.read.format("org.apache.hudi").load("/hudi_data/person_infos/*/*")
frame.createTempView("personInfos")
//查詢結果
val result = session.sql(
"""
| select * from personInfos
""".stripMargin)
result.show(false)
四、更新Hudi資料
向Hudi中更新資料有如下幾個特點
- 同一個分區内,向Hudi中更新資料是用主鍵來判斷資料是否需要更新的,這裡判斷的是相同分區内是否有相同主鍵,不同分區内允許有相同主鍵。
- 更新資料時,如果原來資料有分區,一定要指定分區,不然就相當于是向相同表目錄下插入資料,會生成對應的“default”分區。
- 向Hudi中更新資料時,與向Hudi中插入資料一樣,但是寫入的模式需要指定成“Append”,如果指定成“overwrite”,那麼就是全覆寫了。建議使用時一直使用“Append”模式即可。
- 當更新完成之後,再一次從Hudi中查詢資料時,會看到Hudi送出的時間字段為最新的時間。
這裡将原有的三條資料改成如下三條資料:
#修改之前
{"id":1,"name":"zs1","age":18,"loc":"beijing","data_dt":"20210709"}
{"id":2,"name":"zs2","age":19,"loc":"shanghai","data_dt":"20210709"}
{"id":3,"name":"zs3","age":20,"loc":"beijing","data_dt":"20210709"}
#修改之後
{"id":1,"name":"ls1","age":40,"loc":"beijing","data_dt":"20210709"} --更新資料
{"id":2,"name":"ls2","age":50,"loc":"shanghai","data_dt":"20210710"} --更新資料
{"id":3,"name":"ls3","age":60,"loc":"ttt","data_dt":"20210711"} --相當于是新增資料
更新Hudi資料代碼如下:
val session: SparkSession = SparkSession.builder().master("local").appName("updataDataToHudi")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//讀取修改資料
val updateDataDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\updatedata.json")
//向Hudi 更新資料
updateDataDF.write.format("org.apache.hudi") //或者直接寫hudi
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Append)
.save("/hudi_data/person_infos")
//查詢資料
val frame: DataFrame = session.read.format("org.apache.hudi").load("/hudi_data/person_infos/*/*")
frame.createTempView("personInfos")
//查詢結果
val result = session.sql(
"""
| select * from personInfos
""".stripMargin)
result.show(false)
五、 增量查詢Hudi資料
Hudi可以根據我們傳入的時間戳查詢此時間戳之後的資料,這就是增量查詢,需要注意的是增量查詢必須通過以下方式在Spark中指定一個時間戳才能正常查詢:
option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,timestamp)
例如:原始資料如下:
我們可以查詢 20210709220335之後的資料,查詢結果如下:
代碼如下:
val session: SparkSession = SparkSession.builder().master("local").appName("updataDataToHudi")
.getOrCreate()
//關閉日志
session.sparkContext.setLogLevel("Error")
//導入隐式轉換
import session.implicits._
//查詢全量資料,查詢對應的送出時間,找出倒數第二個時間
val basePath = "/hudi_data/person_infos"
session.read.format("hudi").load(basePath+"/*/*").createTempView("personInfos")
val df: DataFrame = session.sql("select distinct(_hoodie_commit_time) as commit_time from personInfos order by commit_time desc")
//這裡擷取由大到小排序的第二個值
val dt: String = df.map(row=>{row.getString(0)}).collect()(1)
//增量查詢
val result:DataFrame = session.read.format("hudi")
/**
* 指定資料查詢方式,有以下三種:
* val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot" -- 擷取最新所有資料 , 預設
* val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental" --擷取指定時間戳後的變化資料
* val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized" -- 隻查詢Base檔案中的資料
*
* 1) Snapshot mode (obtain latest view, based on row & columnar data)
* 2) incremental mode (new data since an instantTime)
* 3) Read Optimized mode (obtain latest view, based on columnar data)
*
* Default: snapshot
*/
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
//必須指定一個開始查詢的時間,不指定報錯
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,dt)
.load(basePath+"/*/*")
result.show(false)
六、指定時間範圍查詢Hudi資料
Hudi還可以通過指定開始時間和結束時間來查詢時間範圍内的資料。如果想要查詢最早的時間點到某個結束時刻的資料,開始時間可以指定成“000”。
1、向原有Hudi表“person_infos”中插入兩次資料
目前hudi表中的資料如下:
先執行兩次新的資料插入,兩次插入資料之間的間隔時間至少為1分鐘,兩次插入資料代碼如下:
//以下代碼分兩次向 HDFS /hudi_data/person_infos 路徑中插入資料,兩次運作至少1分鐘以上
val session: SparkSession = SparkSession.builder().master("local").appName("PointTimeQueryHudi")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//讀取第一個檔案,向Hudi中插入資料
val df1: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\appendData1.json")
val df2: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\appendData2.json")
//向Hudi中插入資料
df2.write.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Append)
.save("/hudi_data/person_infos")
import org.apache.spark.sql.functions._
//查詢資料
session.read.format("hudi").load("/hudi_data/person_infos/*/*")
.orderBy(col("_hoodie_commit_time"))
.show(100,false)
此時,資料如下:
2、指定時間段查詢Hudi中的資料
代碼如下:
val session: SparkSession = SparkSession.builder().master("local").appName("PointTimeQueryHudi")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//指定時間段,查詢hudi中的資料
// val beginTime = "000"
val beginTime = "20210710002148"
val endTime = "20210710002533"
val result: DataFrame = session.read.format("hudi")
//指定增量查詢
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
//指定查詢開始時間(不包含),“000”指定為最早時間
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime)
//指定查詢結束時間(包含)
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, endTime)
.load("/hudi_data/person_infos/*/*")
result.createTempView("temp")
session.sql(
"""
| select * from temp order by _hoodie_commit_time
""".stripMargin).show(100,false)
開始時間為“000”,相當于是從頭開始查詢到endTime的資料:
開始時間為“20210710002148”:
七、删除Hudi資料
我們準備對應的主鍵及分區的資料,将Hudi中對應的主鍵及分區的資料進行删除,在删除Hudi中的資料時,需要指定option(OPERATION_OPT_KEY,"delete")配置項,并且寫入模式隻能是Append,不支援其他寫入模式,另外,設定下删除執行的并行度,預設為1500個,這裡可以設定成2個。
原始資料如下:
準備要删除的資料如下:
{"id":11,"loc":"beijing"}
{"id":12,"loc":"beijing"}
{"id":13,"loc":"beijing"}
{"id":14,"loc":"shenzhen"}
{"id":15,"loc":"tianjian"} --此條資料對應的主鍵一緻,但是分區不一緻,不能在Hudi中删除
編寫代碼如下:
val session: SparkSession = SparkSession.builder().master("local").appName("DeleteHudiData")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//讀取需要删除的資料,隻需要準備對應的主鍵及分區即可,字段保持與Hudi中需要删除的字段名稱一緻即可
//讀取的檔案中準備了一個主鍵在Hudi中存在但是分區不再Hudi中存在的資料,此主鍵資料在Hudi中不能被删除,需要分區和主鍵字段都比對才能删除
val deleteData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\deleteData.json")
//将删除的資料插入到Hudi中
deleteData.write.format("hudi")
//指定操作模式為delete
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,"delete")
//指定主鍵
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
//指定分區字段
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
//指定表名,這裡的表明需要與之前指定的表名保持一緻
.option(HoodieWriteConfig.TABLE_NAME,"person_infos")
//設定删除并行度設定,預設1500并行度
.option("hoodie.delete.shuffle.parallelism", "2")
.mode(SaveMode.Append)
.save("/hudi_data/person_infos")
//執行完成之後,查詢結果
import org.apache.spark.sql.functions._
session.read.format("hudi").load("/hudi_data/person_infos/*/*")
.orderBy(col("_hoodie_commit_time")).show(100,false)
結果如下:
八、更新Hudi某個分區資料
如果我們想要更新Hudi某個分區的資料,其他分區資料正常使用,那麼可以通過配置option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite")選項,該選項“insert_overwrite”可以直接在中繼資料層面上操作,直接将寫入某分區的新資料替換到該分區内,原有資料會在一定時間内删除,相比upsert更新Hudi速度要快。
1、删除person_infos對應的目錄,重新插入資料,代碼如下
val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//建立DataFrame
val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
insertDF.write.format("org.apache.hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Append)
.save("/hudi_data/person_infos")
//寫入完成之後,查詢hudi 資料:
val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
person_infos.show(100,false)
2、讀取更新分區資料,插入到Hudi preson_infos表中
讀取資料如下:
{"id":1,"name":"s1","age":1,"loc":"beijing","data_dt":"20210710"}
{"id":100,"name":"s2","age":2,"loc":"beijing","data_dt":"20210710"}
{"id":200,"name":"s3","age":3,"loc":"beijing","data_dt":"20210710"}
{"id":8,"name":"w1","age":4,"loc":"chongqing","data_dt":"20210710"}
{"id":300,"name":"w2","age":5,"loc":"chongqing","data_dt":"20210710"}
代碼如下:
val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//讀取需要替換的資料,将beijing分區資料替換成2條,将chognqing分區資料替換成1條
val overWritePartitionData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\overWrite.json")
//寫入hudi表person_infos,替換分區
overWritePartitionData.write.format("hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite")
//設定主鍵列名稱
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
//當資料主鍵相同時,對比的字段,儲存該字段大的資料
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
//指定分區列
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
//并行度設定
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
//表名設定
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Append)
.save("/hudi_data/person_infos")
//寫入完成之後,查詢hudi 資料:
val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
person_infos.show(100,false)
九、覆寫Hudi整個表資料
如果我們想要替換Hudi整個表資料,可以在向Hudi表寫入資料時指定配置option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite_table")選項,該選項“insert_overwrite_table”可以直接在中繼資料層面上操作,直接将資料寫入表,原有資料會在一定時間内删除,相比删除原有資料再插入更友善。
1、删除Hudi表person_infos對應的HDFS路徑,重新插入資料
val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//建立DataFrame
val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
insertDF.write.format("org.apache.hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Append)
.save("/hudi_data/person_infos")
//寫入完成之後,查詢hudi 資料:
val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
person_infos.show(100,false)
2、讀取新資料,覆寫原有Hudi表資料
覆寫更新的資料如下:
{"id":1,"name":"s1","age":1,"loc":"beijing","data_dt":"20210710"}
{"id":100,"name":"s2","age":2,"loc":"beijing","data_dt":"20210710"}
{"id":200,"name":"s3","age":3,"loc":"beijing","data_dt":"20210710"}
{"id":8,"name":"w1","age":4,"loc":"chongqing","data_dt":"20210710"}
{"id":300,"name":"w2","age":5,"loc":"chongqing","data_dt":"20210710"}
代碼如下:
val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//讀取需要替換的資料,覆寫原有表所有資料
val overWritePartitionData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\overWrite.json")
//寫入hudi表person_infos,替換分區
overWritePartitionData.write.format("hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite_table")
//設定主鍵列名稱
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
//當資料主鍵相同時,對比的字段,儲存該字段大的資料
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
//指定分區列
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
//并行度設定
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
//表名設定
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Append)
.save("/hudi_data/person_infos")
//寫入完成之後,查詢hudi 資料:
val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
person_infos.show(100,false)
十、Spark操作Hudi Merge On Read 模式
預設Spark操作Hudi使用Copy On Write模式,也可以使用Merge On Read 模式,通過代碼中國配置如下配置來指定:
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
代碼操作如下:
- 删除原有person_infos對應的HDFS路徑
- 讀取資料向Hudi表person_info中插入資料
讀取的資料如下:
{"id":1,"name":"zs1","age":18,"loc":"beijing","data_dt":"20210709"}
{"id":2,"name":"zs2","age":19,"loc":"shanghai","data_dt":"20210709"}
{"id":3,"name":"zs3","age":20,"loc":"beijing","data_dt":"20210709"}
{"id":4,"name":"zs4","age":21,"loc":"tianjin","data_dt":"20210709"}
{"id":5,"name":"zs5","age":22,"loc":"shenzhen","data_dt":"20210709"}
{"id":6,"name":"zs6","age":23,"loc":"hainai","data_dt":"20210709"}
{"id":7,"name":"zs7","age":24,"loc":"beijing","data_dt":"20210709"}
{"id":8,"name":"zs8","age":25,"loc":"chongqing","data_dt":"20210709"}
{"id":9,"name":"zs9","age":26,"loc":"shandong","data_dt":"20210709"}
{"id":10,"name":"zs10","age":27,"loc":"hunan","data_dt":"20210709"}
代碼如下:
//1.讀取json格式資料
val insertDf: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
//2.将結果使用Merge on Read 模式寫入到Hudi中,并設定分區
insertDf.write.format("hudi")
//設定表模式為 mor
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt")
//并行度設定
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
//表名設定
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Append)
.save("/hudi_data/person_infos")
- 更新Hudi表person_info資料
這裡更新“beijing”、“shanghai”、“ttt”分區資料,更新資料如下:
{"id":1,"name":"ls1","age":40,"loc":"beijing","data_dt":"20210709"}
{"id":2,"name":"ls2","age":50,"loc":"shanghai","data_dt":"20210710"}
{"id":3,"name":"ls3","age":60,"loc":"ttt","data_dt":"20210711"}
代碼如下:
//3.讀取更新資料,并執行插入更新
val updateDf: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\updatedata.json")
updateDf.write.format("hudi")
//設定表模式為 mor
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt")
//并行度設定
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
//表名設定
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Append)
.save("/hudi_data/person_infos")
- 增量查詢Hudi表中的資料
Snapshot 模式查詢,這種模式對于COW或者MOR模式都是查詢到目前時刻全量的資料,如果有更新,那麼就是更新之後全量的資料:
//4.使用不同模式查詢 MOR 表中的資料
/**
* 指定資料查詢方式,有以下三種:
* val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot" -- 擷取最新所有資料 , 預設
* val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental" --擷取指定時間戳後的變化資料
* val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized" -- 隻查詢Base檔案中的資料
*
* 1) Snapshot mode (obtain latest view, based on row & columnar data)
* 2) incremental mode (new data since an instantTime)
* 3) Read Optimized mode (obtain latest view, based on columnar data)
*
* Default: snapshot
*/
//4.1 Snapshot 模式查詢
session.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load("/hudi_data/person_infos/*/*")
.show(100,false)
incremental 模式查詢,這種模式需要指定一個時間戳,查詢指定時間戳之後的新增資料:
//4.2 incremental 模式查詢,查詢指定時間戳後的資料
session.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
//必須指定一個開始查詢的時間,不指定報錯
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,"20210710171240")
.load("/hudi_data/person_infos/*/*")
.show(100,false)
Read Optimized 模式查詢,這種模式隻查詢Base中的資料,不會查詢MOR中Log檔案中的資料,代碼如下:
//4.3 Read Optimized 模式查詢,查詢Base中的資料,不會查詢log中的資料
session.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load("/hudi_data/person_infos/*/*")
.show(100,false)
十一、測試COW模式parquet檔案删除與MOR模式Parquet檔案與log檔案Compact
COW預設情況下,每次更新資料Commit都會基于之前parquet檔案生成一個新的Parquet Base檔案資料,預設曆史parquet檔案數為10,當超過10個後會自動删除舊的版本,可以通過參數“hoodie.cleaner.commits.retained”來控制保留的FileID版本檔案數,預設是10。測試代碼如下:
val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//建立DataFrame
val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata1.json")
// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata2.json")
// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata3.json")
// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata4.json")
// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata5.json")
// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata6.json")
// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata7.json")
// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata8.json")
// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata9.json")
// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata10.json")
insertDF.write.format("org.apache.hudi")
//設定cow模式
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
//根據commit送出次數計算保留多少個fileID版本檔案,預設10。
.option("hoodie.cleaner.commits.retained","3")
//設定主鍵列名稱
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
//當資料主鍵相同時,對比的字段,儲存該字段大的資料
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
//并行度設定,預設1500并行度
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
//表名設定
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Append)
.save("/hudi_data/test_person")
//查詢結果資料
session.read.format("hudi")
//全量讀取
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load("/hudi_data/test_person/*/*").show()
測試注意:每次運作代碼,讀取新的一個資料檔案,并檢視Hudi表對應的HDFS路徑,每次讀取都會生成一個新的Parquet檔案,當達到指定的3個曆史版本時(不包含最新Parquet檔案),再插入資料生成新的Parquet檔案時,一緻會将之前的舊版本删除,儲存4個檔案。
MOR模式下,如果有新增資料會直接寫入Base Parquet檔案,這個Parquet檔案個數的控制也是由“hoodie.cleaner.commits.retained”控制,預設為10。當對應的每個FlieSlice(Base Parquet檔案+log Avro檔案)中有資料更新時,會寫入對應的log Avro檔案,那麼這個檔案何時與Base Parquet檔案進行合并,這個是由參數“hoodie.compact.inline.max.delta.commits”決定的,這個參數意思是在送出多少次commit後觸發壓縮政策,預設是5,也就是目前FlieSlice中如果有5次資料更新就會兩者合并生成全量的資料,目前FlieSlice還是這個FileSlice名稱,隻不過對應的parquet檔案中是全量資料,再有更新資料還是會寫入目前FileSlice對應的log日志檔案中。使“hoodie.compact.inline.max.delta.commits”參數起作用,預設必須開啟“hoodie.compact.inline”,此值代表是否完成送出資料後進行壓縮,預設是false。
測試代碼如下:
#注意代碼中設定參數如下:
//根據commit送出次數計算保留多少個fileID版本檔案,預設10。
.option("hoodie.cleaner.commits.retained","3")
//預設false:是否在一個事務完成後内聯執行壓縮操作
.option("hoodie.compact.inline","true")
//設定送出多少次後觸發壓縮政策,預設5
.option("hoodie.compact.inline.max.delta.commits","2")
#完整代碼如下:
val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//建立DataFrame ,新增
// val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata1.json")
//建立DataFrame ,更新
val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\update11.json")
insertDF.write.format("org.apache.hudi") //或者直接寫hudi
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
//根據commit送出次數計算保留多少個fileID版本檔案,預設10。
.option("hoodie.cleaner.commits.retained","3")
//預設false:是否在一個事務完成後内聯執行壓縮操作
.option("hoodie.compact.inline","true")
//設定送出多少次後觸發壓縮政策,預設5
.option("hoodie.compact.inline.max.delta.commits","2")
//設定主鍵列名稱
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
//當資料主鍵相同時,對比的字段,儲存該字段大的資料
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
//并行度設定,預設1500并行度
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
//表名設定
.option(HoodieWriteConfig.TABLE_NAME, "person_infos")
.mode(SaveMode.Append)
.save("/hudi_data/test_person")
//查詢結果資料
session.read.format("hudi")
//全量讀取
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load("/hudi_data/test_person/*/*").show()
第一次運作插入資料,commit,路徑對應資料目錄如下:
第一次運作更新資料,commit,路徑對應資料目錄如下:
第二次運作更新資料,commit,路徑對應的資料目錄如下:
第三次運作更新資料,commit,路徑對應的資料目錄如下:
第四次運作更新資料,commit,路徑對應的資料目錄如下: