文章目錄
業務實作之編寫寫入DIM層業務代碼
一、代碼邏輯和架構圖
二、代碼編寫
三、代碼測試
業務實作之編寫寫入DIM層業務代碼
一、代碼邏輯和架構圖
編寫代碼讀取Kafka “KAFKA-DIM-TOPIC” topic次元資料通過Phoenix寫入到HBase中,我們可以通過topic中每條資料擷取該條資料對應的phoenix表名及字段名動态建立phoenix表以及插入資料,這裡所有在mysql“lakehousedb.dim_tbl_config_info”中配置的次元表都會動态的寫入到HBase中。這裡使用Flink處理對應topic資料時如果次元資料需要清洗還可以進行清洗
二、代碼編寫
讀取Kafka 次元資料寫入HBase代碼為“DimDataToHBase.scala”,主要代碼邏輯如下:
object DimDataToHBase {
private val consumeKafkaFromEarliest: Boolean = ConfigUtil.CONSUME_KAFKA_FORMEARLIEST
private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS
private val kafakDimTopic: String = ConfigUtil.KAFKA_DIM_TOPIC
private val phoenixURL: String = ConfigUtil.PHOENIX_URL
var ds: DataStream[String] = _
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//1.導入隐式轉換
import org.apache.flink.streaming.api.scala._
//2.設定Kafka配置
val props = new Properties()
props.setProperty("bootstrap.servers",kafkaBrokers)
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
props.setProperty("group.id","mygroup.id")
//3.從資料中擷取Kafka DIM層 KAFKA-DIM-TOPIC 資料
/**
* 資料樣例:
* {
* "gmt_create": "1646037374201",
* "commit": "true",
* "tbl_name": "mc_member_info",
* "type": "insert",
* "gmt_modified": "1646037374201",
* "member_level": "3",
* "database": "lakehousedb",
* "xid": "38450",
* "pk_col": "id",
* "balance": "10482",
* "user_id": "0uid9060",
* "phoenix_tbl_name": "DIM_MEMBER_INFO",
* "tbl_db": "lakehousedb",
* "member_points": "7568",
* "id": "10014",
* "cols": "user_id,member_growth_score,member_level,member_points,balance,gmt_create,gmt_modified",
* "table": "mc_member_info",
* "member_growth_score": "3028",
* "ts": "1646901373"
* }
*
*/
if(consumeKafkaFromEarliest){
ds = env.addSource(MyKafkaUtil.GetDataFromKafka(kafakDimTopic,props).setStartFromEarliest())
}else{
ds = env.addSource(MyKafkaUtil.GetDataFromKafka(kafakDimTopic,props))
}
ds.keyBy(line=>{
JSON.parseObject(line).getString("phoenix_tbl_name")
}).process(new KeyedProcessFunction[String,String,String] {
//設定狀态,存儲每個Phoenix表是否被建立
lazy private val valueState: ValueState[String] = getRuntimeContext.getState(new ValueStateDescriptor[String]("valueState",classOf[String]))
var conn: Connection = _
var pst: PreparedStatement = _
//在open方法中,設定連接配接Phoenix ,友善後期建立對應的phoenix表
override def open(parameters: Configuration): Unit = {
println("建立Phoenix 連接配接... ...")
conn = DriverManager.getConnection(phoenixURL)
}
override def processElement(jsonStr: String, ctx: KeyedProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = {
val nObject: JSONObject = JSON.parseObject(jsonStr)
//從json 對象中擷取對應 hbase 表名、主鍵、列資訊
val operateType: String = nObject.getString("type")
val phoenixTblName: String = nObject.getString("phoenix_tbl_name")
val pkCol: String = nObject.getString("pk_col")
val cols: String = nObject.getString("cols")
//判斷操作類型,這裡隻會向HBase中存入增加、修改的資料,删除等其他操作不考慮
//operateType.equals("bootstrap-insert") 這種情況主要是使用maxwell 直接批量同步次元資料時,操作類型為bootstrap-insert
if(operateType.equals("insert")||operateType.equals("update")||operateType.equals("bootstrap-insert")){
//判斷狀态中是否有目前表狀态,如果有說明已經被建立,沒有就組織建表語句,通過phoenix建立次元表
if(valueState.value() ==null){
createPhoenixTable(phoenixTblName, pkCol, cols)
//更新狀态
valueState.update(phoenixTblName)
}
//向phoenix表中插入資料,同時方法中涉及資料清洗
upsertIntoPhoenixTable(nObject, phoenixTblName, pkCol, cols)
/**
* 當有次元資料更新時,那麼将Redis中次元表緩存删除
* Redis中 key 為:次元表-主鍵值
*/
if(operateType.equals("update")){
//擷取目前更新資料中主鍵對應的值
val pkValue: String = nObject.getJSONObject("data").getString(pkCol)
//組織Redis中的key
val key = phoenixTblName+"-"+pkValue
//删除Redis中緩存的此key對應資料,沒有此key也不會報錯
MyRedisUtil.deleteKey(key)
}
out.collect("執行成功")
}
}
private def upsertIntoPhoenixTable(nObject: JSONObject, phoenixTblName: String, pkCol: String, cols: String): Unit = {
//擷取向phoenix中插入資料所有列
val colsList: ListBuffer[String] = MyStringUtil.getAllCols(cols)
//擷取主鍵對應的值
val pkValue: String = nObject.getString(pkCol)
//組織向表中插入資料的語句
//upsert into test values ('1','zs',18);
val upsertSQL = new StringBuffer(s"upsert into ${phoenixTblName} values ('${pkValue}'")
for (col <- colsList) {
val currentColValue: String = nObject.getString(col)
println("colsList = "+colsList.toString+" - current col = "+currentColValue)
//将列資料中的 “'”符号進行轉義
upsertSQL.append(s",'${currentColValue.replace("'","\\'")}'")
}
upsertSQL.append(s")")
//向表中Phoenix中插入資料
println("phoenix 插入Sql = "+upsertSQL.toString)
pst = conn.prepareStatement(upsertSQL.toString)
pst.execute()
//這裡如果想要批量送出,可以設定狀态,當每個key 滿足1000條時,commit一次,
// 另外定義定時器,每隔2分鐘自動送出一次,防止有些資料沒有達到2000條時沒有存入Phoenix
conn.commit()
}
private def createPhoenixTable(phoenixTblName: String, pkCol: String, cols: String): Boolean = {
//擷取所有列
val colsList: ListBuffer[String] = MyStringUtil.getAllCols(cols)
//組織phoenix建表語句,為了後期操作友善,這裡建表語句所有列族指定為“cf",所有字段都為varchar
//create table xxx (id varchar primary key ,cf.name varchar,cf.age varchar);
val createSql = new StringBuffer(s"create table if not exists ${phoenixTblName} (${pkCol} varchar primary key,")
for (col <- colsList) {
createSql.append(s"cf.${col.replace("'","\\'")} varchar,")//處理資料中帶 ' 的資料
}
//将最後一個逗号替換成“) column_encoded_bytes=0” ,最後這個參數是不讓phoenix對資料進行16進制編碼
createSql.replace(createSql.length() - 1, createSql.length(), ") column_encoded_bytes=0")
println(s"拼接Phoenix SQL 為 = ${createSql}")
//執行sql
pst = conn.prepareStatement(createSql.toString)
pst.execute()
}
//關閉連接配接
override def close(): Unit = {
pst.close()
conn.close()
}
}).print()
env.execute()
}
}
三、代碼測試
執行代碼之前首先需要啟動HDFS、HBase,代碼中設定讀取Kafka資料從頭開始讀取,然後執行代碼,代碼執行完成後可以進入phoenix中檢視對應的結果
# 在node4節點上啟動phoenix
[root@node4 ~]# cd /software/apache-phoenix-5.0.0-HBase-2.0-bin/bin
[root@node4 bin]# ./sqlline.py
- 📢部落格首頁
- 📢歡迎點贊 👍 收藏 ⭐留言 📝 如有錯誤敬請指正!
- 📢本文由 Lansonli 原創
- 📢停下休息的時候不要忘了别人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活✨