天天看點

湖倉一體電商項目(九):業務實作之編寫寫入DIM層業務代碼

湖倉一體電商項目(九):業務實作之編寫寫入DIM層業務代碼

文章目錄

​​業務實作之編寫寫入DIM層業務代碼​​

​​一、代碼邏輯和架構圖​​

​​二、代碼編寫​​

​​三、代碼測試​​

業務實作之編寫寫入DIM層業務代碼

一、代碼邏輯和架構圖

編寫代碼讀取Kafka “KAFKA-DIM-TOPIC” topic次元資料通過Phoenix寫入到HBase中,我們可以通過topic中每條資料擷取該條資料對應的phoenix表名及字段名動态建立phoenix表以及插入資料,這裡所有在mysql“lakehousedb.dim_tbl_config_info”中配置的次元表都會動态的寫入到HBase中。這裡使用Flink處理對應topic資料時如果次元資料需要清洗還可以進行清洗

湖倉一體電商項目(九):業務實作之編寫寫入DIM層業務代碼
湖倉一體電商項目(九):業務實作之編寫寫入DIM層業務代碼

二、代碼編寫

讀取Kafka 次元資料寫入HBase代碼為“DimDataToHBase.scala”,主要代碼邏輯如下:

object DimDataToHBase {
  private val consumeKafkaFromEarliest: Boolean = ConfigUtil.CONSUME_KAFKA_FORMEARLIEST
  private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS
  private val kafakDimTopic: String = ConfigUtil.KAFKA_DIM_TOPIC
  private val phoenixURL: String = ConfigUtil.PHOENIX_URL
  var ds: DataStream[String] = _

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //1.導入隐式轉換
    import org.apache.flink.streaming.api.scala._

    //2.設定Kafka配置
    val props = new Properties()
    props.setProperty("bootstrap.servers",kafkaBrokers)
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","mygroup.id")

    //3.從資料中擷取Kafka DIM層  KAFKA-DIM-TOPIC 資料
    /**
      * 資料樣例:
      *   {
      *     "gmt_create": "1646037374201",
      *     "commit": "true",
      *     "tbl_name": "mc_member_info",
      *     "type": "insert",
      *     "gmt_modified": "1646037374201",
      *     "member_level": "3",
      *     "database": "lakehousedb",
      *     "xid": "38450",
      *     "pk_col": "id",
      *     "balance": "10482",
      *     "user_id": "0uid9060",
      *     "phoenix_tbl_name": "DIM_MEMBER_INFO",
      *     "tbl_db": "lakehousedb",
      *     "member_points": "7568",
      *     "id": "10014",
      *     "cols": "user_id,member_growth_score,member_level,member_points,balance,gmt_create,gmt_modified",
      *     "table": "mc_member_info",
      *     "member_growth_score": "3028",
      *     "ts": "1646901373"
      *   }
      *
      */
      if(consumeKafkaFromEarliest){
        ds = env.addSource(MyKafkaUtil.GetDataFromKafka(kafakDimTopic,props).setStartFromEarliest())
      }else{
        ds = env.addSource(MyKafkaUtil.GetDataFromKafka(kafakDimTopic,props))
      }

    ds.keyBy(line=>{
      JSON.parseObject(line).getString("phoenix_tbl_name")
    }).process(new KeyedProcessFunction[String,String,String] {

      //設定狀态,存儲每個Phoenix表是否被建立
      lazy private val valueState: ValueState[String] = getRuntimeContext.getState(new ValueStateDescriptor[String]("valueState",classOf[String]))

      var conn: Connection = _
      var pst: PreparedStatement = _

      //在open方法中,設定連接配接Phoenix ,友善後期建立對應的phoenix表
      override def open(parameters: Configuration): Unit = {
        println("建立Phoenix 連接配接... ...")
        conn = DriverManager.getConnection(phoenixURL)
      }

      override def processElement(jsonStr: String, ctx: KeyedProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = {

        val nObject: JSONObject = JSON.parseObject(jsonStr)
        //從json 對象中擷取對應 hbase 表名、主鍵、列資訊
        val operateType: String = nObject.getString("type")
        val phoenixTblName: String = nObject.getString("phoenix_tbl_name")
        val pkCol: String = nObject.getString("pk_col")
        val cols: String = nObject.getString("cols")

        //判斷操作類型,這裡隻會向HBase中存入增加、修改的資料,删除等其他操作不考慮
        //operateType.equals("bootstrap-insert") 這種情況主要是使用maxwell 直接批量同步次元資料時,操作類型為bootstrap-insert
        if(operateType.equals("insert")||operateType.equals("update")||operateType.equals("bootstrap-insert")){
          //判斷狀态中是否有目前表狀态,如果有說明已經被建立,沒有就組織建表語句,通過phoenix建立次元表
          if(valueState.value() ==null){
            createPhoenixTable(phoenixTblName, pkCol, cols)
            //更新狀态
            valueState.update(phoenixTblName)
          }
          //向phoenix表中插入資料,同時方法中涉及資料清洗
          upsertIntoPhoenixTable(nObject, phoenixTblName, pkCol, cols)

          /**
            *  當有次元資料更新時,那麼将Redis中次元表緩存删除
            *  Redis中 key 為:次元表-主鍵值
            */
          if(operateType.equals("update")){
            //擷取目前更新資料中主鍵對應的值
            val pkValue: String = nObject.getJSONObject("data").getString(pkCol)
            //組織Redis中的key
            val key = phoenixTblName+"-"+pkValue
            //删除Redis中緩存的此key對應資料,沒有此key也不會報錯
            MyRedisUtil.deleteKey(key)
          }
          out.collect("執行成功")
        }
      }

      private def upsertIntoPhoenixTable(nObject: JSONObject, phoenixTblName: String, pkCol: String, cols: String): Unit = {
        //擷取向phoenix中插入資料所有列
        val colsList: ListBuffer[String] = MyStringUtil.getAllCols(cols)

        //擷取主鍵對應的值
        val pkValue: String = nObject.getString(pkCol)

        //組織向表中插入資料的語句
        //upsert into test values ('1','zs',18);
        val upsertSQL = new StringBuffer(s"upsert into  ${phoenixTblName} values ('${pkValue}'")

        for (col <- colsList) {
          val currentColValue: String = nObject.getString(col)
          println("colsList = "+colsList.toString+" - current col = "+currentColValue)
          //将列資料中的 “'”符号進行轉義
          upsertSQL.append(s",'${currentColValue.replace("'","\\'")}'")
        }
        upsertSQL.append(s")")

        //向表中Phoenix中插入資料
        println("phoenix 插入Sql = "+upsertSQL.toString)
        pst = conn.prepareStatement(upsertSQL.toString)

        pst.execute()

        //這裡如果想要批量送出,可以設定狀态,當每個key 滿足1000條時,commit一次,
        // 另外定義定時器,每隔2分鐘自動送出一次,防止有些資料沒有達到2000條時沒有存入Phoenix
        conn.commit()
      }

      private def createPhoenixTable(phoenixTblName: String, pkCol: String, cols: String): Boolean = {
        //擷取所有列
        val colsList: ListBuffer[String] = MyStringUtil.getAllCols(cols)

        //組織phoenix建表語句,為了後期操作友善,這裡建表語句所有列族指定為“cf",所有字段都為varchar
        //create table xxx (id varchar primary key ,cf.name varchar,cf.age varchar);
        val createSql = new StringBuffer(s"create table if not exists ${phoenixTblName} (${pkCol} varchar primary key,")
        for (col <- colsList) {
          createSql.append(s"cf.${col.replace("'","\\'")} varchar,")//處理資料中帶 ' 的資料
        }
        //将最後一個逗号替換成“) column_encoded_bytes=0” ,最後這個參數是不讓phoenix對資料進行16進制編碼
        createSql.replace(createSql.length() - 1, createSql.length(), ") column_encoded_bytes=0")

        println(s"拼接Phoenix SQL 為 = ${createSql}")

        //執行sql
        pst = conn.prepareStatement(createSql.toString)
        pst.execute()
      }

      //關閉連接配接
      override def close(): Unit = {
        pst.close()
        conn.close()
      }
    }).print()

    env.execute()

  }
}      

三、代碼測試

執行代碼之前首先需要啟動HDFS、HBase,代碼中設定讀取Kafka資料從頭開始讀取,然後執行代碼,代碼執行完成後可以進入phoenix中檢視對應的結果

# 在node4節點上啟動phoenix
[root@node4 ~]# cd /software/apache-phoenix-5.0.0-HBase-2.0-bin/bin
[root@node4 bin]# ./sqlline.py      
  • 📢部落格首頁
  • 📢歡迎點贊 👍 收藏 ⭐留言 📝 如有錯誤敬請指正!
  • 📢本文由 Lansonli 原創
  • 📢停下休息的時候不要忘了别人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活✨