天天看點

湖倉一體電商項目(八):業務實作之編寫寫入 ODS 層業務代碼

作者:Lansonli

#頭條創作挑戰賽#

業務實作之編寫寫入ODS層業務代碼

一、代碼邏輯和架構圖

ODS層在湖倉一體架構中主要是存儲原始資料,這裡主要是讀取Kafka “KAFKA-DB-BUSSINESS-DATA”topic中的資料實作如下兩個方面功能:

  • 将MySQL業務資料原封不動的存儲在Iceberg-ODS層中友善項目臨時業務需求使用。
  • 将事實資料和次元資料進行分離,分别存儲Kafka對應的topic中

以上兩個方面中第一個方面需要再Hive中預先建立對應的Iceberg表,才能寫入,第二個方面不好分辨topic“KAFKA-DB-BUSSINESS-DATA”中哪些binlog資料是事實資料哪些binlog是次元資料,是以這裡我們在mysql 配置表“lakehousedb.dim_tbl_config_info”中寫入表資訊,這樣通過Flink擷取此表次元表資訊進行廣播與Kafka實時流進行關聯将事實資料和次元資料進行區分。

湖倉一體電商項目(八):業務實作之編寫寫入 ODS 層業務代碼
湖倉一體電商項目(八):業務實作之編寫寫入 ODS 層業務代碼

二、代碼編寫

資料寫入ODS層代碼是“ProduceKafkaDBDataToODS.scala”,主要代碼邏輯實作如下:

object ProduceKafkaDBDataToODS {
  private val mysqlUrl: String = ConfigUtil.MYSQL_URL
  private val mysqlUser: String = ConfigUtil.MYSQL_USER
  private val mysqlPassWord: String = ConfigUtil.MYSQL_PASSWORD
  private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS
  private val kafkaDimTopic: String = ConfigUtil.KAFKA_DIM_TOPIC
  private val kafkaOdsTopic: String = ConfigUtil.KAFKA_ODS_TOPIC
  private val kafkaDwdUserLogTopic: String = ConfigUtil.KAFKA_DWD_USERLOG_TOPIC

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    import org.apache.flink.streaming.api.scala._

    env.enableCheckpointing(5000)

    /**
      * 1.需要預先建立 Catalog
      * 建立Catalog,建立表需要在Hive中提前建立好,不在代碼中建立,因為在Flink中建立iceberg表不支援create table if not exists ...文法
      */
    tblEnv.executeSql(
      """
        |create catalog hadoop_iceberg with (
        | 'type'='iceberg',
        | 'catalog-type'='hadoop',
        | 'warehouse'='hdfs://mycluster/lakehousedata'
        |)
      """.stripMargin)

    /**
      * 2.建立 Kafka Connector,連接配接消費Kafka中資料
      * 注意:1).關鍵字要使用 " 飄"符号引起來 2).對于json對象使用 map < String,String>來接收
      */
    tblEnv.executeSql(
      """
        |create table kafka_db_bussiness_tbl(
        |   database string,
        |   `table` string,
        |   type string,
        |   ts string,
        |   xid string,
        |   `commit` string,
        |   data map<string,string>
        |) with (
        | 'connector' = 'kafka',
        | 'topic' = 'KAFKA-DB-BUSSINESS-DATA',
        | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
        | 'scan.startup.mode'='latest-offset', --也可以指定 earliest-offset 、latest-offset
        | 'properties.group.id' = 'my-group-id',
        | 'format' = 'json'
        |)
      """.stripMargin)

    /**
      * 3.将不同的業務庫資料存入各自的Iceberg表
      */
    tblEnv.executeSql(
      """
        |insert into hadoop_iceberg.icebergdb.ODS_MEMBER_INFO
        |select
        |   data['id'] as id ,
        |   data['user_id'] as user_id,
        |   data['member_growth_score'] as member_growth_score,
        |   data['member_level'] as member_level,
        |   data['balance'] as balance,
        |   data['gmt_create'] as gmt_create,
        |   data['gmt_modified'] as  gmt_modified
        | from kafka_db_bussiness_tbl where `table` = 'mc_member_info'
      """.stripMargin)


    tblEnv.executeSql(
      """
        |insert into hadoop_iceberg.icebergdb.ODS_MEMBER_ADDRESS
        |select
        |   data['id'] as id ,
        |   data['user_id'] as user_id,
        |   data['province'] as province,
        |   data['city'] as city,
        |   data['area'] as area,
        |   data['address'] as address,
        |   data['log'] as log,
        |   data['lat'] as lat,
        |   data['phone_number'] as phone_number,
        |   data['consignee_name'] as consignee_name,
        |   data['gmt_create'] as gmt_create,
        |   data['gmt_modified'] as  gmt_modified
        | from kafka_db_bussiness_tbl where `table` = 'mc_member_address'
      """.stripMargin)

    tblEnv.executeSql(
      """
        |insert into hadoop_iceberg.icebergdb.ODS_USER_LOGIN
        |select
        |   data['id'] as id ,
        |   data['user_id'] as user_id,
        |   data['ip'] as ip,
        |   data['login_tm'] as login_tm,
        |   data['logout_tm'] as logout_tm
        | from kafka_db_bussiness_tbl where `table` = 'mc_user_login'
      """.stripMargin)

    //4.讀取 Kafka 中的資料,将次元資料另外存儲到 Kafka 中
    val kafkaTbl: Table = tblEnv.sqlQuery("select database,`table`,type,ts,xid,`commit`,data from kafka_db_bussiness_tbl")

    //5.将kafkaTbl Table 轉換成DStream 與MySql中的資料
    val kafkaDS: DataStream[Row] = tblEnv.toAppendStream[Row](kafkaTbl)

    //6.設定mapState,用于廣播流
    val mapStateDescriptor = new MapStateDescriptor[String,JSONObject]("mapStateDescriptor",classOf[String],classOf[JSONObject])

    //7.從MySQL中擷取配置資訊,并廣播
    val bcConfigDs: BroadcastStream[JSONObject] = env.addSource(MySQLUtil.getMySQLData(mysqlUrl,mysqlUser,mysqlPassWord)).broadcast(mapStateDescriptor)

    //8.設定次元資料側輸出流标記
    val dimDataTag = new OutputTag[String]("dim_data")

    //9.隻監控mysql 資料庫lakehousedb 中的資料,其他庫binlog不監控,連接配接兩個流進行處理
    val factMainDs: DataStream[String] = kafkaDS.filter(row=>{"lakehousedb".equals(row.getField(0).toString)}).connect(bcConfigDs).process(new BroadcastProcessFunction[Row, JSONObject, String] {
      override def processElement(row: Row, ctx: BroadcastProcessFunction[Row, JSONObject, String]#ReadOnlyContext, out: Collector[String]): Unit = {
        //最後傳回給Kafka 事實資料的json對象
        val returnJsonObj = new JSONObject()
        //擷取廣播狀态
        val robcs: ReadOnlyBroadcastState[String, JSONObject] = ctx.getBroadcastState(mapStateDescriptor)
        //解析事件流資料
        val nObject: JSONObject = CommonUtil.rowToJsonObj(row)
        //擷取目前時間流來自的庫和表 ,樣例資料如下
        //lackhousedb,pc_product,insert,1646659263,21603,null,{gmt_create=1645493074001, category_id=220, product_name=黃金, product_id=npfSpLHB8U}
        val dbName: String = nObject.getString("database")
        val tableName: String = nObject.getString("table")
        val key = dbName + ":" + tableName
        if (robcs.contains(key)) {
          //次元資料
          val jsonValue: JSONObject = robcs.get(key)
          //次元資料,将對應的 jsonValue中的資訊設定到流事件中
          nObject.put("tbl_name", jsonValue.getString("tbl_name"))
          nObject.put("tbl_db", jsonValue.getString("tbl_db"))
          nObject.put("pk_col", jsonValue.getString("pk_col"))
          nObject.put("cols", jsonValue.getString("cols"))
          nObject.put("phoenix_tbl_name", jsonValue.getString("phoenix_tbl_name"))
          ctx.output(dimDataTag, nObject.toString)
        }else{
          //事實資料,加入iceberg 表名寫入Kafka ODS-DB-TOPIC topic中
          if("mc_user_login".equals(tableName)){
            returnJsonObj.put("iceberg_ods_tbl_name","ODS_USER_LOGIN")
            returnJsonObj.put("kafka_dwd_topic",kafkaDwdUserLogTopic)
            returnJsonObj.put("data",nObject.toString)
          }
          out.collect(returnJsonObj.toJSONString)
        }
      }

      override def processBroadcastElement(jsonObject: JSONObject, ctx: BroadcastProcessFunction[Row, JSONObject, String]#Context, out: Collector[String]): Unit = {
        val tblDB: String = jsonObject.getString("tbl_db")
        val tblName: String = jsonObject.getString("tbl_name")
        //向狀态中更新資料
        val bcs: BroadcastState[String, JSONObject] = ctx.getBroadcastState(mapStateDescriptor)
        bcs.put(tblDB + ":" + tblName, jsonObject)
        println("廣播資料流設定完成...")
      }
    })


    //10.結果寫入到Kafka -  dim_data_topic topic中
    val props = new Properties()
    props.setProperty("bootstrap.servers",kafkaBrokers)
    factMainDs.addSink(new FlinkKafkaProducer[String](kafkaOdsTopic,new KafkaSerializationSchema[String] {
      override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        new ProducerRecord[Array[Byte],Array[Byte]](kafkaOdsTopic,null,element.getBytes())
      }
    },props,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))//暫時使用at_least_once語義,exactly_once語義有些bug問題

    factMainDs.getSideOutput(dimDataTag).addSink(new FlinkKafkaProducer[String](kafkaDimTopic,new KafkaSerializationSchema[String] {
      override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        new ProducerRecord[Array[Byte],Array[Byte]](kafkaDimTopic,null,element.getBytes())
      }
    },props,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))//暫時使用at_least_once語義,exactly_once語義有些bug問題

    env.execute()

  }

}           

三、建立Iceberg-ODS層表

代碼在執行之前需要在Hive中預先建立對應的Iceberg表,建立Icebreg表方式如下:

1、在Hive中添加Iceberg表格式需要的包

啟動HDFS叢集,node1啟動Hive metastore服務,在Hive用戶端啟動Hive添加Iceberg依賴包:

#node1節點啟動Hive metastore服務
[root@node1 ~]# hive --service metastore &

#在hive用戶端node3節點加載兩個jar包
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;           

2、建立Iceberg表

這裡建立Iceberg表有“ODS_MEMBER_INFO”、“ODS_MEMBER_ADDRESS”、“ODS_USER_LOGIN”,建立語句如下:

#在Hive用戶端執行以下建表語句
CREATE TABLE ODS_MEMBER_INFO  (
id string,
user_id string,
member_growth_score string,
member_level string,
balance string,
gmt_create string,
gmt_modified string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_INFO/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);


CREATE TABLE ODS_MEMBER_ADDRESS  (
id string,
user_id string,
province string,
city string,
area string,
address string,
log string,
lat string,
phone_number string,
consignee_name string,
gmt_create string,
gmt_modified string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_ADDRESS/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE ODS_USER_LOGIN (
id string,
user_id string,
ip string,
login_tm string,
logout_tm string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_USER_LOGIN/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);           

以上語句在Hive用戶端執行完成之後,在HDFS中可以看到對應的Iceberg資料目錄:

湖倉一體電商項目(八):業務實作之編寫寫入 ODS 層業務代碼

四、代碼測試

以上代碼編寫完成後,代碼執行測試步驟如下:

1、在Kafka中建立對應的topic

#在Kafka 中建立 KAFKA-ODS-TOPIC topic
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-ODS-TOPIC --partitions 3 --replication-factor 3

#在Kafka 中建立 KAFKA-DIM-TOPIC topic
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DIM-TOPIC --partitions 3 --replication-factor 3

#監控以上兩個topic資料
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-ODS-TOPIC

[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DIM-TOPIC           

2、将代碼中消費Kafka資料改成從頭開始消費

代碼中Kafka Connector中屬性“scan.startup.mode”設定為“earliest-offset”,從頭開始消費資料。

這裡也可以不設定從頭開始消費Kafka資料,而是直接啟動實時向MySQL表中寫入資料代碼“RTMockDBData.java”代碼,實時向MySQL對應的表中寫入資料,這裡需要啟動maxwell監控資料,代碼才能實時監控到寫入MySQL的業務資料。

3、執行代碼,檢視對應topic中的結果

以上代碼執行後在,在對應的Kafka “KAFKA-DIM-TOPIC”和“KAFKA-ODS-TOPIC”中都有對應的資料。在Iceberg-ODS層中對應的表中也有資料。

繼續閱讀