天天看點

湖倉一體電商項目(十八):業務實作之編寫寫入DWD層業務代碼

湖倉一體電商項目(十八):業務實作之編寫寫入DWD層業務代碼

文章目錄

​​業務實作之編寫寫入DWD層業務代碼​​

​​一、代碼編寫​​

​​二、建立Iceberg-DWD層表​​

​​1、在Hive中添加Iceberg表格式需要的包​​

​​2、建立Iceberg表​​

​​三、代碼測試​​

​​1、在Kafka中建立對應的topic​​

​​2、将代碼中消費Kafka資料改成從頭開始消費​​

​​3、執行代碼,檢視對應結果​​

業務實作之編寫寫入DWD層業務代碼

一、代碼編寫

Flink讀取Kafka topic “KAFKA-ODS-TOPIC” 資料寫入Iceberg-DWD層也是複用第一個業務代碼,這裡隻需要在代碼中加入寫入Iceberg-DWD層代碼即可,代碼如下:

//插入 iceberg - dwd 層 會員浏覽商品日志資訊 :DWD_BROWSELOG
tblEnv.executeSql(
  s"""
    |insert into hadoop_iceberg.icebergdb.DWD_BROWSELOG
    |select
    | log_time,
    | user_id2,
    | user_ip,
    | front_product_url,
    | browse_product_url,
    | browse_product_tpcode,
    | browse_product_code,
    | obtain_points
    | from ${table} where iceberg_ods_tbl_name = 'ODS_BROWSELOG'
  """.stripMargin)      

另外,在Flink處理此topic中每條資料時都有擷取對應寫入後續Kafka topic資訊,本業務對應的每條使用者日志資料寫入的kafka topic為“KAFKA-DWD-BROWSE-LOG-TOPIC”,是以代碼可以複用。

二、建立Iceberg-DWD層表

代碼在執行之前需要在Hive中預先建立對應的Iceberg表,建立Icebreg表方式如下:

1、在Hive中添加Iceberg表格式需要的包

啟動HDFS叢集,node1啟動Hive metastore服務,在Hive用戶端啟動Hive添加Iceberg依賴包:

#node1節點啟動Hive metastore服務
[root@node1 ~]# hive --service metastore &

#在hive用戶端node3節點加載兩個jar包
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;      

2、建立Iceberg表

這裡建立Iceberg-DWD表有“DWD_BROWSELOG”,建立語句如下:

CREATE TABLE DWD_BROWSELOG  (
 log_time string,
 user_id string,
 user_ip string,
 front_product_url string,
 browse_product_url string,
 browse_product_tpcode string,
 browse_product_code string,
 obtain_points string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWD_BROWSELOG/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);      

三、代碼測試

以上代碼編寫完成後,代碼執行測試步驟如下:

1、在Kafka中建立對應的topic

#在Kafka 中建立 KAFKA-DWD-BROWSE-LOG-TOPIC topic
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWD-BROWSE-LOG-TOPIC --partitions 3 --replication-factor 3

#監控以上topic資料
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DWD-BROWSE-LOG-TOPIC      

2、将代碼中消費Kafka資料改成從頭開始消費

代碼中Kafka Connector中屬性“scan.startup.mode”設定為“earliest-offset”,從頭開始消費資料。

這裡也可以不設定從頭開始消費Kafka資料,而是直接啟動向日志采集接口模拟生産日志代碼“RTMockUserLogData.java”,需要啟動日志采集接口及Flume。

3、執行代碼,檢視對應結果

以上代碼執行後在,在對應的Kafka “KAFKA-DWD-BROWSE-LOG-TOPIC” topic中都有對應的資料。在Iceberg-DWD層中對應的表中也有資料。

Kafka中結果如下:

  • 📢歡迎點贊 👍 收藏 ⭐留言 📝 如有錯誤敬請指正!
  • 📢本文由 Lansonli 原創
  • 📢停下休息的時候不要忘了别人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活✨