天天看點

Flink SQL 實戰:HBase 的結合應用

本文主要介紹 HBase 和 Flink SQL 的結合使用。HBase 作為 Google 發表 Big Table 論文的開源實作版本,是一種分布式列式存儲的資料庫,建構在 HDFS 之上的 NoSQL 資料庫,非常适合大規模實時查詢,是以 HBase 在實時計算領域使用非常廣泛。可以實時寫 HBase,也可以利用 buckload 一把把離線 Job 生成 HFile Load 到HBase 表中。而當下 Flink SQL 的火熱程度不用多說,Flink SQL 也為 HBase 提供了 connector,是以 HBase 與 Flink SQL 的結合非常有必要實踐實踐。

當然,本文假設使用者有一定的 HBase 知識基礎,不會詳細去介紹 HBase 的架構和原理,本文着重介紹 HBase 和 Flink 在實際場景中的結合使用。主要分為兩種場景,第一種場景:HBase 作為維表與 Flink Kafka table 做 temporal table join 的場景;第二種場景:Flink SQL 做計算之後的結果寫到 HBase 表,供其他使用者查詢的場景。是以,本文介紹的内容如下所示:

· HBase 環境準備

· 資料準備

· HBase 作為次元表進行 temporal table join的場景

· Flink SQL 做計算寫 HBase 的場景

· 總結

一、HBase 環境準備

由于沒有測試的 HBase 環境以及為了避免污染線上 Hbase 環境。是以,自己 build一個 Hbase docker image(大家可以 docker pull guxinglei/myhbase 拉到本地),是基于官方幹淨的 ubuntu imgae 之上安裝了 Hbase 2.2.0 版本以及 JDK1.8 版本。

啟動容器,暴露 Hbase web UI 端口以及内置 zk 端口,友善我們從 web 頁面看資訊以及建立 Flink Hbase table 需要 zk 的連結資訊。

docker run -it --network=host -p 2181:2181 -p 60011:60011 docker.io/guxinglei/myhbase:latest bash複制代碼      
Flink SQL 實戰:HBase 的結合應用

· 進入容器,啟動 HBase 叢集,以及啟動 rest server,後續友善我們用 REST API 來讀取 Flink SQL 寫進 HBase 的資料。

# 啟動hbase 叢集bin/start-hbase.sh# 背景啟動restServerbin/hbase-daemon.sh start rest -p 8000複制代碼      
Flink SQL 實戰:HBase 的結合應用

二、資料準備

由于 HBase 環境是自己臨時搞的單機服務,裡面沒有資料,需要往裡面寫點資料供後續示例用。在 Flink SQL 實戰系列第二篇中介紹了如何注冊 Flink Mysql table,我們可以将廣告位表抽取到 HBase 表中,用來做次元表,進行 temporal table join。是以,我們需要在 HBase 中建立一張表,同時還需要建立 Flink HBase table, 這兩張表通過 Flink SQL 的 HBase connector 關聯起來。

· 在容器中啟動 HBase shell,建立一張名為 dim_hbase 的 HBase 表,建表語句如下所示:

# 在hbase shell建立 hbase表
hbase(main):002:0> create 'dim_hbase','cf'
Created table dim_hbase
Took 1.3120 seconds
=> Hbase::Table - dim_hbase複制代碼      
Flink SQL 實戰:HBase 的結合應用

· 在 Flink 中建立 Flink HBase table,建表語句如下所示:

# 注冊 Flink Hbase table
DROP TABLE IF EXISTS flink_rtdw.demo.hbase_dim_table;
CREATE TABLE flink_rtdw.demo.hbase_dim_table (
  rowkey STRING,
  cf ROW < adspace_name STRING >,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dim_hbase',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'localhost:2181'
);複制代碼      

· Flink MySQL table 和 Flink HBase table 已經建立好了,就可以寫抽取資料到HBase 的 SQL job 了,SQL 語句以及 job 狀态如下所示:

# 抽取Mysql資料到Hbase表中

insert into
  hbase_dim_table
select
CAST (ID as VARCHAR),
ROW(name)
from
  mysql_dim_table;複制代碼      
Flink SQL 實戰:HBase 的結合應用
Flink SQL 實戰:HBase 的結合應用
Flink SQL 實戰:HBase 的結合應用
Flink SQL 實戰:HBase 的結合應用

03 HBase 作為維表與 Kafka 做 temporal join 的場景

在 Flink SQL join 中,次元表的 join 一定繞不開的,比如訂單金額 join 匯率表,點選流 join 廣告位的明細表等等,使用場景非常廣泛。那麼作為分布式資料庫的 HBase 比 MySQL 作為次元表用作次元表 join 更有優勢。在 Flink SQL 實戰系列第二篇中,我們注冊了廣告的點選流,将 Kafka topic 注冊 Flink Kafka Table,同時也介紹了 temporal table join 在 Flink SQL 中的使用;那麼本節中将會介紹 HBase 作為次元表來使用,上面小節中已經将資料抽取到 Hbase 中了,我們直接寫 temporal table join 計算邏輯即可。

· 作為廣告點選流的 Flink Kafa table 與 作為廣告位的 Flink HBase table 通過廣告位 Id 進行 temporal table join,輸出廣告位 ID 和廣告位中文名字,SQL join 邏輯如下所示:

select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       hbase_dim_table.cf.adspace_name as publisher_adspace_name
from adsdw_dwd_max_click_mobileapp
left join hbase_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_max_click_mobileapp.procTime
on cast(adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as string) = hbase_dim_table.rowkey;複制代碼      

· temporal table join job 送出 Flink 叢集上的狀态以及 join 結果如下所示:

Flink SQL 實戰:HBase 的結合應用
Flink SQL 實戰:HBase 的結合應用

四、計算結果 sink 到 HBase 作為結果的場景

上面小節中,HBase 作為次元表用作 temporal table join 是非常常見的場景,實際上 HBase 作為存儲計算結果也是非常常見的場景,畢竟 Hbase 作為分布式資料庫,底層存儲是擁有多副本機制的 HDFS,維護簡單,擴容友善, 實時查詢快,而且提供各種用戶端友善下遊使用存儲在 HBase 中的資料。那麼本小節就介紹 Flink SQL 将計算結果寫到 HBase,并且通過 REST API 查詢計算結果的場景。

· 進入容器中,在 HBase 中建立一張 HBase 表,一個 column family 就滿足需求,建表語句如下所示:

# 注冊hbase sink table
create 'dwa_hbase_click_report','cf'複制代碼      
Flink SQL 實戰:HBase 的結合應用

· 建立好 HBase 表之後,我們需要在 Flink SQL 建立一張 Flink HBase table,這個時候我們需要明确 cf 這個 column famaly 下面 column 字段,在 Flink SQL實戰第二篇中,已經注冊好了作為點選流的 Flink Kafka table,是以本節中,将會計算點選流的 uv 和點選數,是以兩個 column 分别為 uv 和 click_count,建表語句如下所示:

# 注冊 Flink Hbase table
DROP TABLE IF EXISTS flink_rtdw.demo.dwa_hbase_click_report;
CREATE TABLE flink_rtdw.demo.dwa_hbase_click_report (
  rowkey STRING,
  cf ROW < uv BIGINT, click_count BIGINT >,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dwa_hbase_click_report',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'hostname:2181'
);複制代碼      
Flink SQL 實戰:HBase 的結合應用

· 前面點選流的 Flink Kafka table 和存儲計算結果的 HBase table 和 Flink HBase table 已經準備了,我們将做一個1分鐘的翻轉視窗計算 uv 和點選數,并且将計算結果寫到 HBase 中。對 HBase 了解的人應該知道,rowkey 的設計對 hbase regoin 的分布有着非常重要的影響,基于此我們的 rowkey 是使用 Flink SQL 内置的 reverse 函數進行廣告位 Id 進行反轉和視窗啟始時間做 concat,是以,SQL 邏輯語句如下所示:

INSERT INTO dwa_hbase_click_report
SELECT
CONCAT(REVERSE(CAST(publisher_adspace_adspaceId AS STRING)) ,
'_',
CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000) AS STRING)
  ) as rowkey, 
ROW(COUNT(DISTINCT audience_mvid) , COUNT(audience_behavior_click_creative_impressionId)) as cf
FROM
  adsdw_dwd_max_click_mobileapp
WHERE publisher_adspace_adspaceId IS NOT NULL AND audience_mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULL
GROUP BY
  TUMBLE(ets, INTERVAL '1' MINUTE),
  publisher_adspace_adspaceId;複制代碼      
Flink SQL 實戰:HBase 的結合應用

· SQL job 送出之後的狀态以及結果 check 如下所示:

Flink SQL 實戰:HBase 的結合應用
Flink SQL 實戰:HBase 的結合應用

上述 SQL job 已經成功的将結算結果寫到 HBase 中了。對于線上的 HBase 服務來講,很多同僚不一定有 HBase 用戶端的權限,進而也不能通過 HBase shell 讀取資料;另外作為線上報表服務顯然不可能通過 HBase shell 來通過查詢資料。是以,在實時報表場景中,資料開發工程師将資料寫入 HBase, 前端工程師通過 REST API 來讀取資料。前面我們已經啟動了 HBase rest server 程序,我們可以通 rest 服務提供讀取 HBase 裡面的資料。

· 我們先 get 一條剛剛寫到 HBase 中的資料看看,如下所示:

Flink SQL 實戰:HBase 的結合應用

· 下面我們開始通過 REST API 來查詢 HBase 中的資料,第一步,執行如下語句拿到 scannerId;首先需要将要查詢的 rowkey 進行 base64 編碼才能使用,後面需要将結果進行 base64 解碼

rowkey base64 編碼前:0122612_1606295280000

base64 編碼之後:MDEyMjYxMl8xNjA2Mjk1MjgwMDAw

curl -vi -X PUT \
         -H "Accept: text/xml" \
         -H "Content-Type: text/xml" \
         -d '<Scanner startRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw" endRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"></Scanner>' \
"http://hostname:8000/dwa_hbase_click_report/scanner"複制代碼      
Flink SQL 實戰:HBase 的結合應用

· 第二步,執行如下語句根據上條語句傳回的 scannerID 查詢資料,可以看到傳回的結果:

curl -vi -X GET \
         -H "Accept: application/json" \
"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"複制代碼      
Flink SQL 實戰:HBase 的結合應用

· 第三步,查詢完畢之後,執行如下語句删除該scannerId:

curl -vi -X DELETE \
         -H "Accept: text/xml" \
"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"複制代碼      
Flink SQL 實戰:HBase 的結合應用

五、總結

繼續閱讀