天天看點

湖倉一體電商項目(十六):業務實作之編寫寫入ODS層業務代碼

湖倉一體電商項目(十六):業務實作之編寫寫入ODS層業務代碼

文章目錄

​​業務實作之編寫寫入ODS層業務代碼​​

​​一、代碼編寫​​

​​二、建立Iceberg-ODS層表​​

​​1、在Hive中添加Iceberg表格式需要的包​​

​​2、建立Iceberg表​​

​​三、代碼測試​​

​​1、在Kafka中建立對應的topic​​

​​2、将代碼中消費Kafka資料改成從頭開始消費​​

​​3、啟動日志采集接口,啟動Flume監控​​

​​4、執行代碼,檢視對應topic中的結果​​

​​5、執行模拟生産使用者日志代碼,檢視對應topic中的結果​​

業務實作之編寫寫入ODS層業務代碼

由于本業務涉及到MySQL業務資料和使用者日志資料,兩類資料是分别采集存儲在不同的Kafka Topic中的,是以這裡寫入ODS層代碼由兩個代碼組成。

一、代碼編寫

處理MySQL業務庫binlog資料的代碼複用第一個業務代碼隻需要在”ProduceKafkaDBDataToODS.scala” 代碼中寫入存入Icebeg-ODS層表的代碼即可,“ProduceKafkaDBDataToODS.scala”代碼檔案中加入代碼如下:

//向Iceberg ods 層 ODS_PRODUCT_CATEGORY 表插入資料
tblEnv.executeSql(
  """
    |insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_CATEGORY
    |select
    |   data['id'] as id ,
    |   data['p_id'] as p_id,
    |   data['name'] as name,
    |   data['pic_url'] as pic_url,
    |   data['gmt_create'] as gmt_create
    | from kafka_db_bussiness_tbl where `table` = 'pc_product_category'
  """.stripMargin)

//向Iceberg ods 層 ODS_PRODUCT_INFO 表插入資料
tblEnv.executeSql(
  """
    |insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_INFO
    |select
    |   data['product_id'] as product_id ,
    |   data['category_id'] as category_id,
    |   data['product_name'] as product_name,
    |   data['gmt_create'] as gmt_create
    | from kafka_db_bussiness_tbl where `table` = 'pc_product'
  """.stripMargin)
處理使用者日志的代碼需要自己編寫,代碼中的業務邏輯主要是讀取存儲使用者浏覽日志資料topic “KAFKA-USER-LOG-DATA”中的資料,通過Flink代碼處理将不同類型使用者日志處理成json類型資料,将該json結果後續除了存儲在Iceberg-ODS層對應的表之外還要将資料存儲在Kafka topic “KAFKA-ODS-TOPIC” 中友善後續的業務處理。具體代碼參照“ProduceKafkaLogDataToODS.scala”,主要代碼邏輯如下:
object ProduceKafkaLogDataToODS {
  private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS
  private val kafkaOdsTopic: String = ConfigUtil.KAFKA_ODS_TOPIC
  private val kafkaDwdBrowseLogTopic: String = ConfigUtil.KAFKA_DWD_BROWSELOG_TOPIC

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    env.enableCheckpointing(5000)

    import org.apache.flink.streaming.api.scala._

    /**
      * 1.需要預先建立 Catalog
      * 建立Catalog,建立表需要在Hive中提前建立好,不在代碼中建立,因為在Flink中建立iceberg表不支援create table if not exists ...文法
      */
    tblEnv.executeSql(
      """
        |create catalog hadoop_iceberg with (
        | 'type'='iceberg',
        | 'catalog-type'='hadoop',
        | 'warehouse'='hdfs://mycluster/lakehousedata'
        |)
      """.stripMargin)

    /**
      *  {
      *    "logtype": "browselog",
      *    "data": {
      *    "browseProductCode": "eSHd1sFat9",
      *    "browseProductTpCode": "242",
      *    "userIp": "251.100.236.37",
      *    "obtainPoints": 32,
      *    "userId": "uid208600",
      *    "frontProductUrl": "https://f/dcjp/nVnE",
      *    "logTime": 1646980514321,
      *    "browseProductUrl": "https://kI/DXSNBeP/"
      *   }
      * }
      */

    /**
      * 2.建立 Kafka Connector,連接配接消費Kafka中資料
      * 注意:1).關鍵字要使用 " 飄"符号引起來 2).對于json對象使用 map < String,String>來接收
      */
    tblEnv.executeSql(
      """
        |create table kafka_log_data_tbl(
        |   logtype string,
        |   data map<string,string>
        |) with (
        | 'connector' = 'kafka',
        | 'topic' = 'KAFKA-USER-LOG-DATA',
        | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
        | 'scan.startup.mode'='earliest-offset', --也可以指定 earliest-offset 、latest-offset
        | 'properties.group.id' = 'my-group-id',
        | 'format' = 'json'
        |)
      """.stripMargin)

    /**
      * 3.将不同的業務庫資料存入各自的Iceberg表
      */
    tblEnv.executeSql(
      """
        |insert into hadoop_iceberg.icebergdb.ODS_BROWSELOG
        |select
        |   data['logTime'] as log_time ,
        |   data['userId'] as user_id,
        |   data['userIp'] as user_ip,
        |   data['frontProductUrl'] as front_product_url,
        |   data['browseProductUrl'] as browse_product_url,
        |   data['browseProductTpCode'] as browse_product_tpcode,
        |   data['browseProductCode'] as browse_product_code,
        |   data['obtainPoints'] as  obtain_points
        | from kafka_log_data_tbl where `logtype` = 'browselog'
      """.stripMargin)


    //4.将使用者所有日志資料組裝成Json資料存入 kafka topic ODS-TOPIC 中
    //讀取 Kafka 中的資料,将次元資料另外存儲到 Kafka 中
    val kafkaLogTbl: Table = tblEnv.sqlQuery("select logtype,data from kafka_log_data_tbl")

    //将 kafkaLogTbl Table 轉換成 DataStream 資料
    val userLogDS: DataStream[Row] = tblEnv.toAppendStream[Row](kafkaLogTbl)
    //将 userLogDS 資料轉換成JSON 資料寫出到 kafka topic ODS-TOPIC
    val odsSinkDS: DataStream[String] = userLogDS.map(row => {
      //最後傳回給Kafka 日志資料的json對象
      val returnJsonObj = new JSONObject()
      val logType: String = row.getField(0).toString

      val data: String = row.getField(1).toString
      val nObject = new JSONObject()
      val arr: Array[String] = data.stripPrefix("{").stripSuffix("}").split(",")
      for (elem <- arr) {
        //有些資料 “data”中屬性沒有值,就沒有“=”
        if (elem.contains("=") && elem.split("=").length == 2) {
          val split: Array[String] = elem.split("=")
          nObject.put(split(0).trim, split(1).trim)
        } else {
          nObject.put(elem.stripSuffix("=").trim, "")
        }
      }

      if ("browselog".equals(logType)) {
        returnJsonObj.put("iceberg_ods_tbl_name", "ODS_BROWSELOG")
        returnJsonObj.put("kafka_dwd_topic",kafkaDwdBrowseLogTopic)
        returnJsonObj.put("data",nObject.toString)
      } else {
        //其他日志,這裡目前沒有
      }

      returnJsonObj.toJSONString
    })

    val props = new Properties()
    props.setProperty("bootstrap.servers",kafkaBrokers)

    odsSinkDS.addSink(new FlinkKafkaProducer[String](kafkaOdsTopic,new KafkaSerializationSchema[String] {
      override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        new ProducerRecord[Array[Byte],Array[Byte]](kafkaOdsTopic,null,element.getBytes())
      }
    },props,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))

    env.execute()

  }
}      

二、建立Iceberg-ODS層表

代碼在執行之前需要在Hive中預先建立對應的Iceberg表,建立Icebreg表方式如下:

1、在Hive中添加Iceberg表格式需要的包

啟動HDFS叢集,node1啟動Hive metastore服務,在Hive用戶端啟動Hive添加Iceberg依賴包:

#node1節點啟動Hive metastore服務
[root@node1 ~]# hive --service metastore &

#在hive用戶端node3節點加載兩個jar包
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;      

2、建立Iceberg表

這裡建立Iceberg表有“ODS_PRODUCT_CATEGORY”、“ODS_PRODUCT_INFO”,建立語句如下:

CREATE TABLE ODS_PRODUCT_CATEGORY (
id string,
p_id string,
name string,
pic_url string,
gmt_create string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_PRODUCT_CATEGORY/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE ODS_PRODUCT_INFO (
product_id string,
category_id string,
product_name string,
gmt_create string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_PRODUCT_INFO/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE ODS_BROWSELOG  (
 log_time string,
 user_id string,
 user_ip string,
 front_product_url string,
 browse_product_url string,
 browse_product_tpcode string,
 browse_product_code string,
 obtain_points string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_BROWSELOG/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);      

以上語句在Hive用戶端執行完成之後,在HDFS中可以看到對應的Iceberg資料目錄:

湖倉一體電商項目(十六):業務實作之編寫寫入ODS層業務代碼

三、代碼測試

以上代碼編寫完成後,代碼執行測試步驟如下:

1、在Kafka中建立對應的topic

#在Kafka 中建立 KAFKA-USER-LOG-DATA topic
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-USER-LOG-DATA --partitions 3 --replication-factor 3

#在Kafka 中建立 KAFKA-ODS-TOPIC topic(第一個業務已建立可忽略)
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-ODS-TOPIC --partitions 3 --replication-factor 3

#在Kafka 中建立 KAFKA-DIM-TOPIC topic(第一個業務已建立可忽略)
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DIM-TOPIC --partitions 3 --replication-factor 3

#監控以上兩個topic資料
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-ODS-TOPIC

[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DIM-TOPIC      

2、将代碼中消費Kafka資料改成從頭開始消費

代碼中Kafka Connector中屬性“scan.startup.mode”設定為“earliest-offset”,從頭開始消費資料。

這裡也可以不設定從頭開始消費Kafka資料,而是直接啟動實時向MySQL表中寫入資料代碼“RTMockDBData.java”代碼,實時向MySQL對應的表中寫入資料,這裡需要啟動maxwell監控資料,代碼才能實時監控到寫入MySQL的業務資料。

針對使用者日志資料可以啟動代碼“RTMockUserLogData.java”,實時向日志采集接口寫入資料。

3、啟動日志采集接口,啟動Flume監控

#在node5節點上啟動日志采集接口
[root@node5 ~]# cd /software/
[root@node5 software]# java -jar logcollector-0.0.1-SNAPSHOT.jar


#在node5節點上啟動Flume
[root@node5 software]# flume-ng agent --name a -f /software/a.properties -Dflume.root.logger=INFO,console      

4、執行代碼,檢視對應topic中的結果

5、執行模拟生産使用者日志代碼,檢視對應topic中的結果

  • 📢歡迎點贊 👍 收藏 ⭐留言 📝 如有錯誤敬請指正!
  • 📢本文由 Lansonli 原創
  • 📢停下休息的時候不要忘了别人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活✨

繼續閱讀