天天看點

湖倉一體電商項目(二十三):離線業務 統計每天使用者商品浏覽所獲積分

湖倉一體電商項目(二十三):離線業務 統計每天使用者商品浏覽所獲積分

文章目錄

​​統計每天使用者商品浏覽所獲積分​​

​​一、業務需求​​

​​二、業務流程圖​​

​​三、業務實作​​

​​1、代碼編寫​​

​​2、代碼執行​​

​​四、資料釋出接口​​

​​五、資料可視化​​

統計每天使用者商品浏覽所獲積分

一、業務需求

使用Iceberg建構湖倉一體架構進行資料倉庫分層,通過Flink操作各層資料同步到Iceberg中做到的離線與實時資料一緻,當項目中有一些離線臨時性的需求時,我們可以基于Iceberg各層編寫SQL進行資料查詢,針對Iceberg DWS層中的資料我們可以編寫SQL進行離線資料名額分析。

目前離線業務根據Iceberg-DWS層中商品浏覽寬表資料“DWS_BROWSE_INFO”進行查詢每天每個使用者商品浏覽所獲積分資訊。

二、業務流程圖

這裡通過Flink代碼讀取Iceberg-DWS層寬表資料,編寫SQL進行名額分析,将分析結果存儲在MySQL中,此業務流程圖如下所示:

湖倉一體電商項目(二十三):離線業務 統計每天使用者商品浏覽所獲積分

三、業務實作

1、代碼編寫

此業務代碼詳細如下:

object UserPointsAnls {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    import org.apache.flink.api.scala._

    //1.建立Catalog
    tblEnv.executeSql(
      """
        |create catalog hadoop_iceberg with (
        | 'type'='iceberg',
        | 'catalog-type'='hadoop',
        | 'warehouse'='hdfs://mycluster/lakehousedata'
        |)
      """.stripMargin)

    //2.使用目前Catalog
    tblEnv.useCatalog("hadoop_iceberg")
    //3.使用資料庫
    tblEnv.useDatabase("icebergdb")

    val userPointTbl :Table = tblEnv.sqlQuery(
      """
        | select log_time as dt,user_id,product_name,sum(cast(obtain_points as int)) as total_points from DWS_BROWSE_INFO
        | group by log_time,user_id,product_name
      """.stripMargin)


    val userPointDS: DataStream[(Boolean, Row)] = tblEnv.toRetractStream[Row](userPointTbl)

    /**
      * 4.需要在MySQL resultdb 中建立表 user_points
      * create database resultdb;
      * create table user_points (log_time varchar(255),user_id varchar(255),product_name varchar(255),total_points bigint);
      */
    val jdbcOutput: JdbcOutputFormat = JdbcOutputFormat.buildJdbcOutputFormat().setDrivername("com.mysql.jdbc.Driver")
      .setDBUrl("jdbc:mysql://node2:3306/resultdb?user=root&password=123456")
      .setQuery("insert into user_points values (?,?,?,?)")
      .finish()


    userPointDS.map(_._2).writeUsingOutputFormat(jdbcOutput)

    env.execute()
  }
}      

2、代碼執行

代碼執行之前,我們需要登入MySQL建立庫“resultdb”以及表user_points:

#在node2節點上執行如下指令
[root@node2 ~]# mysql -u root -p123456
mysql> create database resultdb;
mysql> use resultdb;
mysql> create table user_points(log_time varchar(255),user_id varchar(255),product_name varchar(255),total_points bigint);      

建立完成之後,可以直接執行以上代碼,代碼執行完成之後,在mysql表“resultdb.user_points”中可以檢視對應的結果:

湖倉一體電商項目(二十三):離線業務 統計每天使用者商品浏覽所獲積分

四、資料釋出接口

此離線業務對應的接口在資料釋出接口項目“LakeHouseDataPublish”,對應的資料釋出接口為:”localhost:8989/lakehouse/dataapi/getUserPoints”,啟動資料釋出接口,查詢結果如下:

湖倉一體電商項目(二十三):離線業務 統計每天使用者商品浏覽所獲積分

五、資料可視化

  • 📢本文由 Lansonli 原創
  • 📢停下休息的時候不要忘了别人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活✨