天天看點

湖倉一體電商項目(十二):編寫寫入DM層業務代碼

作者:Lansonli

#頭條創作挑戰賽#

編寫寫入DM層業務代碼

DM層主要是報表資料,針對實時業務将DM層設定在Clickhouse中,在此業務中DM層主要存儲的是通過Flink讀取Kafka “KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC” topic中的資料進行設定視窗分析,每隔10s設定滾動視窗統計該視窗内通路商品及商品一級、二級分類分析結果,實時寫入到Clickhouse中。

一、代碼編寫

具體代碼參照“ProcessBrowseLogInfoToDM.scala”,大體代碼邏輯如下:

object ProcessBrowseLogInfoToDM {
  def main(args: Array[String]): Unit = {
    //1.準備環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    env.enableCheckpointing(5000)

    import org.apache.flink.streaming.api.scala._
    /**
      * 2.建立 Kafka Connector,連接配接消費Kafka dwd中資料
      *
      */
    tblEnv.executeSql(
      """
        |create table kafka_dws_user_login_wide_tbl (
        |   user_id string,
        |   product_name string,
        |   first_category_name string,
        |   second_category_name string,
        |   obtain_points string
        |) with (
        | 'connector' = 'kafka',
        | 'topic' = 'KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC',
        | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
        | 'scan.startup.mode'='earliest-offset', --也可以指定 earliest-offset 、latest-offset
        | 'properties.group.id' = 'my-group-id',
        | 'format' = 'json'
        |)
      """.stripMargin)

    /**
      * 3.實時統計每個使用者最近10s浏覽的商品次數和商品一級、二級種類次數,存入到Clickhouse
      */

    val dwsTbl:Table = tblEnv.sqlQuery(
      """
        | select user_id,product_name,first_category_name,second_category_name from kafka_dws_user_login_wide_tbl
      """.stripMargin)

    //4.将Row 類型資料轉換成對象類型操作
    val browseDS: DataStream[BrowseLogWideInfo] = tblEnv.toAppendStream[Row](dwsTbl)
      .map(row => {
        val user_id: String = row.getField(0).toString
        val product_name: String = row.getField(1).toString
        val first_category_name: String = row.getField(2).toString
        val second_category_name: String = row.getField(3).toString
        BrowseLogWideInfo(null, user_id, null, product_name, null, null, first_category_name, second_category_name, null)
      })


    val dwsDS: DataStream[ProductVisitInfo] = browseDS.keyBy(info => {
      info.first_category_name + "-" + info.second_category_name + "-" + info.product_name
    })
      .timeWindow(Time.seconds(10))
      .process(new ProcessWindowFunction[BrowseLogWideInfo, ProductVisitInfo, String, TimeWindow] {

        override def process(key: String, context: Context, elements: Iterable[BrowseLogWideInfo], out: Collector[ProductVisitInfo]): Unit = {
          val currentDt: String = DateUtil.getDateYYYYMMDD(context.window.getStart.toString)
          val startTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getStart.toString)
          val endTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getEnd.toString)
          val arr: Array[String] = key.split("-")

          val firstCatName: String = arr(0)
          val secondCatName: String = arr(1)
          val productName: String = arr(2)
          val cnt: Int = elements.toList.size
          out.collect(ProductVisitInfo(currentDt, startTime, endTime, firstCatName, secondCatName, productName, cnt))
        }

      })

    /**
      * 5.将以上結果寫入到Clickhouse表 dm_product_visit_info 表中
      *  create table dm_product_visit_info(
      *    current_dt String,
      *    window_start String,
      *    window_end String,
      *    first_cat String,
      *    second_cat String,
      *    product String,
      *    product_cnt UInt32
      *  ) engine = MergeTree() order by current_dt
      *
      */

    //準備向ClickHouse中插入資料的sql
    val insertIntoCkSql = "insert into dm_product_visit_info (current_dt,window_start,window_end,first_cat,second_cat,product,product_cnt) values (?,?,?,?,?,?,?)"


    val ckSink: SinkFunction[ProductVisitInfo] = MyClickHouseUtil.clickhouseSink[ProductVisitInfo](insertIntoCkSql,new JdbcStatementBuilder[ProductVisitInfo] {
      override def accept(pst: PreparedStatement, productVisitInfo: ProductVisitInfo): Unit = {
        pst.setString(1,productVisitInfo.currentDt)
        pst.setString(2,productVisitInfo.windowStart)
        pst.setString(3,productVisitInfo.windowEnd)
        pst.setString(4,productVisitInfo.firstCat)
        pst.setString(5,productVisitInfo.secondCat)
        pst.setString(6,productVisitInfo.product)
        pst.setLong(7,productVisitInfo.productCnt)

      }
    })

    //針對資料加入sink
    dwsDS.addSink(ckSink)

    env.execute()

  }
}           

二、建立Clickhouse-DM層表

代碼在執行之前需要在Clickhouse中建立對應的DM層商品浏覽資訊表dm_product_visit_info,clickhouse建表語句如下:

#node1節點啟動clickhouse
[root@node1 bin]# service clickhouse-server start

#node1節點進入clickhouse
[root@node1 bin]# clickhouse-client -m

#node1節點建立clickhouse-DM層表
create table dm_product_visit_info(
 current_dt String,
 window_start String,
 window_end String,
 first_cat String,
 second_cat String,
 product String,
 product_cnt UInt32
) engine = MergeTree() order by current_dt;           

三、代碼測試

以上代碼編寫完成後,代碼執行測試步驟如下:

1、将代碼中消費Kafka資料改成從頭開始消費

代碼中Kafka Connector中屬性“scan.startup.mode”設定為“earliest-offset”,從頭開始消費資料。

這裡也可以不設定從頭開始消費Kafka資料,而是直接啟動向日志采集接口模拟生産日志代碼“RTMockUserLogData.java”,需要啟動日志采集接口及Flume。

2、執行代碼,檢視對應結果

以上代碼執行後在,在Clickhouse-DM層中表“dm_product_visit_info”中檢視對應資料結果如下:

湖倉一體電商項目(十二):編寫寫入DM層業務代碼

四、架構圖

湖倉一體電商項目(十二):編寫寫入DM層業務代碼

繼續閱讀