在分布式系統中,根據應用的場景選擇對應的資料存儲方式是非常重要的一件事。這篇文章讨論的是在實時資料不斷進入的情況下,如何結合曆史資料進行快速分析。
背景
對于實時資料的存儲,為了避免在
HDFS
(最經典的分布式存儲系統)下生成大量的小檔案(存儲和計算都會産生大量的開銷),通常的選擇是
Hbase
。但是
Hbase
本身的設計更适合寫入資料,對于讀的優勢僅僅限于對Rowkey的查詢,不适合類似于
Group
、聚合等分析型查詢。為了避免這種情況,整體架構變成了實時資料通過流計算架構(
Storm
或者是
Spark Streaming
)導入
Hbase
,T+1天/小時後将資料導入到
HDFS
供業務或者營運人員使用。顯然這不是我們想要的實時資料快速分析,業務和營運人員需要等待一段時間才能看到資料。
随着技術的發展,
TiDB
、
CockroachDB
等分布式資料庫逐漸成熟,高可用、無限擴充且同時支援分析型和事務型操作場景。使用分布式資料庫應該是最理想的存儲實時資料,并提供快速分析的方式,但是
TiDB
、
CockroachDB
等分布式資料庫對伺服器和開發人員都有不低的要求(例如
TiDB
就需要安裝機器使用SSD盤),是以更适合非常重要的業務資料,而不是類似于埋點日志等實時資料,。
是以我們需要一個存儲方案可以在
相對廉價的商用機器上支援實時資料的快速分析。
Cloudera
公司開發的
Kudu
就是一個很好的選擇。
Kudu
支援單行資料的插入、更新和删除操作,但是使用
Kudu
用來存儲不再發生改變的曆史資料,也顯得有些昂貴了。對于曆史資料的存儲,廉價而穩定的
HDFS
是一個更好的選擇。受實時計算中滑動視窗的啟發,基于
Impala
計算引擎相容
Kudu
和
HDFS
兩個存儲引擎的特性,參考
Kudu
最新的Blog設計了後文的方案,揚
HDFS
和
Kudu
之長,避
HDFS
和
Kudu
之短,進而取得一個相對平衡的效果,實作實時資料的快速分析。
滑動視窗模式
滑動視窗類似于下圖,随着時間的前進,
Kudu
會不斷建立新的時間分區,不再變化的曆史資料導入
HDFS
存儲,再删除
Kudu
的曆史分區。

我們在
Impala
中,分别建立一張
Kudu
表和
Paruqet
格式的
HDFS
表,兩張表使用同樣的時間分區格式,時間分區可以選擇天、月、年。對于資料的查詢,可以選擇建立
View
視圖友善兩張表的查詢,也可以使用
Where+Union All
語句對兩張表合并查詢。
Impala
可以根據表不同的資料存儲方式選擇最适合的優化方式。
Kudu
表和
HDFS
表的時間分隔界限可以根據實時資料的遲到情況決定。
此時的滑動視窗模式實作了:
- 業務和營運人員可以立即查詢到實時資料的變化。
- 在一定時間段内可以對錯誤資料進行修正和補充。
- 避免了
小檔案産生。HDFS
- 減少了
的系統開銷。Kudu
流程的具體分析
- 建
表、Kudu
格式的Parquet
表,可以選擇建立相應的HDFS
視圖或者不建立。View
-
預添加時間分區,防止新來的資料會因為找不到新的分區而報Kudu
的錯,丢失資料。Non-Range
- 從
複制曆史的不可變資料到Kudu
層。此時的資料有可能會在HDFS
和HDFS
層出現備援,是以需要使用Kudu
語句保證資料不會出現備援。如果有使用where
視圖的話,需要執行View
語句更新Alter
。View
- 運作
更新COMPUTE STATS
的中繼資料資訊,并删除Impala
舊的分區。Kudu
執行個體
- 建表
CREATE TABLE db.kudu_table
(
kafka_offset string,
data string,
event_time timestamp,
PRIMARY KEY (kafka_offset, event_time)
)
PARTITION BY
HASH (kafka_offset) PARTITIONS 4,
RANGE (event_time)
PARTITION '2019-03-28' <= VALUES < '2019-03-29',
PARTITION '2019-03-30' <= VALUES < '2019-03-31'
)
COMMENT 'xxx'
STORED AS KUDU;
CREATE TABLE db.hdfs_table
(
kafka_offset string,
data string,
event_time timestamp
)
PARTITION BY (insert_time string COMMENT '曆史時間' )
COMMENT 'xxx'
STORED AS PARQUET;
假設我們的資料是從
Kafka
中實時讀取的,為了保證消息的精确一次,可以選擇
Kafka
的
Offset+Partition
進行組合作為
Kudu
的主鍵保證消息在資料庫存儲時有且僅有一次記錄。主鍵
Hash
的
Bucket
的個數取決于你的資料更新頻率。
注意:下面的任務均由定時排程工具執行,定時排程工具可以選擇Airflow、Crontab等工具。
- 提前添加
分區。Kudu
ALTER TABLE db.kudu_table
ADD IF NOT EXISTS
RANGE PARTITION "2019-04-01" <= VALUES < "2019-04-02";
- 将兩天前的資料更新進
表。這裡使用的是HDFS
的文法。Airflow
INSERT OVERWRITE TABLE db.hdfs_table partition(history_time='{{ macros.ds_add(next_ds, -2}}')
SELECT kafka_offset, data, event_time
FROM db.kudu_table
where "{{ macros.ds_add(next_ds, -2) }}" <= event_time
and event_time < "{{ macros.ds_add(next_ds, -1) }}";
- 确定資料是否更新進去
表。HDFS
select count(*)
from db.hdfs_table
where history_time='{{ macros.ds_add(next_ds, -2}}'
- 為了防止意外會選擇删除三天前的分區,而不是當天更新進
層的資料。HDFS
ALTER TABLE db.kudu_table
DROP IF EXISTS
RANGE PARTITION "{{ macros.ds_add(next_ds, -3}}" <= VALUES < "{{ macros.ds_add(next_ds, -2}}";
- 為了
獲得最好的運作性能,使用Impala
文法,預計算和更新COMPUTE STATS
的中繼資料。Impala
COMPUTE STATS db.kudu_table;
COMPUTE INCREMENTAL STATS db.hdfs_table;
- 可以建立
視圖對資料進行曆史資料和實時資料的聯合查詢,也可隻針對View
查詢實時資料的更新情況。kudu_table
CREATE VIEW unified_view AS
SELECT kafka_offset, data, event_time
FROM db.kudu_table
WHERE event_time >= date_add(now(), -2)
UNION ALL
SELECT kafka_offset, data, event_time
FROM db.hdfs_table
WHERE history_time < date_add(now(), -2)
總結
其實我們換個視角,使用
The Beam Model
去看待整個流程,我們會發現:
- 計算的結果是什麼?資料從
流入Kafka
,再轉換成Kudu
格式檔案。transformationsParuqet
- 在事件時間中的哪個位置計算結果?選擇滑動視窗,在兩天的時間間隔處計算資料,将
資料轉換成Kudu
格式。windowingParquet
- 在處理時間中的哪個時刻觸發計算結果?使用固定的處理時間(一天間隔)觸發資料的轉換,遲到兩天的資料便不再處理(水印)。triggers + watermarks
- 如何修正結果?資料隻會不斷增加,不會修改。accumulation
對The Beam Model感興趣的可以參考下面的文章:
《Streaming Systems》第一章
《Streaming Systems》第二章
《Streaming Systems》第三章
《Streaming Systems》第四章
Kudu的blog:https://kudu.apache.org/2019/03/05/transparent-hierarchical-storage-management-with-apache-kudu-and-impala.html