#頭條創作挑戰賽#
業務實作之編寫寫入DWD層業務代碼
DWD層資料主要存儲幹淨的明細資料,這裡針對ODS層“KAFKA-ODS-TOPIC”資料編寫代碼進行清洗寫入對應的Kafka topic和Iceberg-DWD層中。代碼功能中有以下幾點重要方面:
- 針對Kafka ODS層中的資料進行清洗,寫入Iceberg-DWD層中。
- 将資料除了寫入Iceberg-DWD層中之外,還要寫入Kafka中友善後續處理得到DWS層資料。
一、代碼編寫
編寫處理Kafka ODS層資料寫入Iceberg-DWD層資料時,由于在Kafka “KAFKA-ODS-TOPIC”topic中每條資料都已經有對應寫入kafka的topic資訊,是以這裡我們隻需要讀取“KAFKA-ODS-TOPIC”topic中的資料寫入到Iceberg-DWD層中,另外動态擷取每條資料寫入Kafka topic資訊将每條資料寫入到對應的topic即可。
具體代碼參照“ProduceODSDataToDWD.scala”,大體代碼邏輯如下:
case class DwdInfo (iceberg_ods_tbl_name:String,kafka_dwd_topic:String,browse_product_code:String,browse_product_tpcode:String,user_ip:String,obtain_points:String,user_id1:String,user_id2:String, front_product_url:String, log_time:String, browse_product_url:String ,id:String,ip:String, login_tm:String,logout_tm:String)
object ProduceODSDataToDWD {
private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS
def main(args: Array[String]): Unit = {
//1.準備環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
env.enableCheckpointing(5000)
import org.apache.flink.streaming.api.scala._
/**
* 2.需要預先建立 Catalog
* 建立Catalog,建立表需要在Hive中提前建立好,不在代碼中建立,因為在Flink中建立iceberg表不支援create table if not exists ...文法
*/
tblEnv.executeSql(
"""
|create catalog hadoop_iceberg with (
| 'type'='iceberg',
| 'catalog-type'='hadoop',
| 'warehouse'='hdfs://mycluster/lakehousedata'
|)
""".stripMargin)
/**
* 2.建立 Kafka Connector,連接配接消費Kafka ods中資料
*/
tblEnv.executeSql(
"""
|create table kafka_ods_tbl(
| iceberg_ods_tbl_name string,
| kafka_dwd_topic string,
| data string
|) with (
| 'connector' = 'kafka',
| 'topic' = 'KAFKA-ODS-TOPIC',
| 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
| 'scan.startup.mode'='latest-offset', --也可以指定 earliest-offset 、latest-offset
| 'properties.group.id' = 'my-group-id',
| 'format' = 'json'
|)
""".stripMargin)
val odsTbl :Table = tblEnv.sqlQuery(
"""
| select iceberg_ods_tbl_name,data,kafka_dwd_topic from kafka_ods_tbl
""".stripMargin)
val odsDS: DataStream[Row] = tblEnv.toAppendStream[Row](odsTbl)
//3.設定Sink 到Kafka 資料輸出到側輸出流标記
val kafkaDataTag = new OutputTag[JSONObject]("kafka_data")
/**
* 4.表準換成對應的DataStream資料處理,清洗ODS 中的資料,存入Iceberg
* {
* "iceberg_ods_tbl_name": "ODS_BROWSELOG",
* "data": "{\"browseProductCode\":\"yyRAteviDb\",\"browseProductTpCode\":\"120\",\"userIp\":\"117.233.5.190\",\"obtainPoints\":\"24\",
* \"userId\":\"uid464936\",\"frontProductUrl\":\"https://1P//2RQbHFS2\",\"logTime\":\"1647065858856\",\"browseProductUrl\":\"https://RXm/iOUxR/Tliu9TE0\"}",
* "kafka_dwd_topic": "KAFKA-DWD-BROWSE-LOG-TOPIC"
* }
*
* {
* "iceberg_ods_tbl_name": "ODS_USER_LOGIN",
* "data": "{\"database\":\"lakehousedb\",\"xid\":\"14942\",\"user_id\":\"uid283876\",\"ip\":\"215.148.233.254\",\"commit\":\"true\",
* \"id\":\"10052\",\"type\":\"insert\",\"logout_tm\":\"1647066506140\",\"table\":\"mc_user_login\",\"ts\":\"1647066504\",\"login_tm\":\"1647051931534\"}",
* "kafka_dwd_topic": "KAFKA-DWD-USER-LOGIN-TOPIC"
* }
*
* 這裡将資料轉換成DataStream後再轉換成表寫入Iceberg
*
*/
//對資料隻是時間進行清洗,轉換成DwdInfo 類型DataStream 傳回,先過濾一些資料為null的
val dwdDS: DataStream[DwdInfo] = odsDS.filter(row=>{row.getField(0)!=null && row.getField(1)!=null &&row.getField(2)!=null })
.process(new ProcessFunction[Row,DwdInfo]() {
override def processElement(row: Row, context: ProcessFunction[Row, DwdInfo]#Context, collector: Collector[DwdInfo]): Unit = {
val iceberg_ods_tbl_name: String = row.getField(0).toString
val data: String = row.getField(1).toString
val kafka_dwd_topic: String = row.getField(2).toString
val jsonObj: JSONObject = JSON.parseObject(data)
//清洗日期資料
jsonObj.put("logTime",DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("logTime")))
jsonObj.put("login_tm",DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("login_tm")))
jsonObj.put("logout_tm",DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("logout_tm")))
//解析json 嵌套資料
val browse_product_code: String = jsonObj.getString("browseProductCode")
val browse_product_tpcode: String = jsonObj.getString("browseProductTpCode")
val user_ip: String = jsonObj.getString("userIp")
val obtain_points: String = jsonObj.getString("obtainPoints")
val user_id1: String = jsonObj.getString("user_id")
val user_id2: String = jsonObj.getString("userId")
val front_product_url: String = jsonObj.getString("frontProductUrl")
val log_time: String = jsonObj.getString("logTime")
val browse_product_url: String = jsonObj.getString("browseProductUrl")
val id: String = jsonObj.getString("id")
val ip: String = jsonObj.getString("ip")
val login_tm: String = jsonObj.getString("login_tm")
val logout_tm: String = jsonObj.getString("logout_tm")
//往各類資料 data json 對象中加入sink dwd topic 的資訊
jsonObj.put("kafka_dwd_topic",kafka_dwd_topic)
context.output(kafkaDataTag,jsonObj)
collector.collect(DwdInfo(iceberg_ods_tbl_name, kafka_dwd_topic, browse_product_code, browse_product_tpcode, user_ip, obtain_points,
user_id1,user_id2, front_product_url, log_time, browse_product_url, id, ip, login_tm, logout_tm))
}
})
val props = new Properties()
props.setProperty("bootstrap.servers",kafkaBrokers)
/**
* 6.将以上資料寫入到Kafka 各自DWD 層topic中,這裡不再使用SQL方式,而是直接使用DataStream代碼方式 Sink 到各自的DWD層代碼中
*/
dwdDS.getSideOutput(kafkaDataTag).addSink(new FlinkKafkaProducer[JSONObject]("KAFKA-DWD-DEFAULT-TOPIC",new KafkaSerializationSchema[JSONObject] {
override def serialize(jsonObj: JSONObject, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
val sinkDwdTopic: String = jsonObj.getString("kafka_dwd_topic")
new ProducerRecord[Array[Byte], Array[Byte]](sinkDwdTopic,null,jsonObj.toString.getBytes())
}
},props,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
env.execute()
}
}
二、建立Iceberg-DWD層表
代碼在執行之前需要在Hive中預先建立對應的Iceberg表,建立Icebreg表方式如下:
1、在Hive中添加Iceberg表格式需要的包
啟動HDFS叢集,node1啟動Hive metastore服務,在Hive用戶端啟動Hive添加Iceberg依賴包:
#node1節點啟動Hive metastore服務
[root@node1 ~]# hive --service metastore &
#在hive用戶端node3節點加載兩個jar包
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
2、建立Iceberg表
這裡建立Iceberg-DWD表有“DWD_USER_LOGIN”,建立語句如下:
CREATE TABLE DWD_USER_LOGIN (
id string,
user_id string,
ip string,
login_tm string,
logout_tm string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWD_USER_LOGIN/'
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);
三、代碼測試
以上代碼編寫完成後,代碼執行測試步驟如下:
1、在Kafka中建立對應的topic
#在Kafka 中建立 KAFKA-DWD-USER-LOGIN-TOPIC topic
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWD-USER-LOGIN-TOPIC --partitions 3 --replication-factor 3
#監控以上topic資料
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DWD-USER-LOGIN-TOPIC
2、将代碼中消費Kafka資料改成從頭開始消費
代碼中Kafka Connector中屬性“scan.startup.mode”設定為“earliest-offset”,從頭開始消費資料。
這裡也可以不設定從頭開始消費Kafka資料,而是直接啟動實時向MySQL表中寫入資料代碼“RTMockDBData.java”代碼,實時向MySQL對應的表中寫入資料,這裡需要啟動maxwell監控資料,代碼才能實時監控到寫入MySQL的業務資料。
3、執行代碼,檢視對應結果
以上代碼執行後在,在對應的Kafka “KAFKA-DWD-USER-LOGIN-TOPIC” topic中都有對應的資料。在Iceberg-DWD層中對應的表中也有資料。
Kafka中結果如下:
Iceberg-DWD層表”DWD_USER_LOGIN”中的資料如下: