天天看點

定時跑視圖往另外一張表添加資料_實時計算實踐:實時資料快速分析的解決方案...

在分布式系統中,根據應用的場景選擇對應的資料存儲方式是非常重要的一件事。這篇文章讨論的是在實時資料不斷進入的情況下,如何結合曆史資料進行快速分析。

背景

對于實時資料的存儲,為了避免在

HDFS

(最經典的分布式存儲系統)下生成大量的小檔案(存儲和計算都會産生大量的開銷),通常的選擇是

Hbase

。但是

Hbase

本身的設計更适合寫入資料,對于讀的優勢僅僅限于對Rowkey的查詢,不适合類似于

Group

、聚合等分析型查詢。為了避免這種情況,整體架構變成了實時資料通過流計算架構(

Storm

或者是

Spark Streaming

)導入

Hbase

,T+1天/小時後将資料導入到

HDFS

供業務或者營運人員使用。顯然這不是我們想要的實時資料快速分析,業務和營運人員需要等待一段時間才能看到資料。

随着技術的發展,

TiDB

CockroachDB

等分布式資料庫逐漸成熟,高可用、無限擴充且同時支援分析型和事務型操作場景。使用分布式資料庫應該是最理想的存儲實時資料,并提供快速分析的方式,但是

TiDB

CockroachDB

等分布式資料庫對伺服器和開發人員都有不低的要求(例如

TiDB

就需要安裝機器使用SSD盤),是以更适合非常重要的業務資料,而不是類似于埋點日志等實時資料,。

是以我們需要一個存儲方案可以在

相對廉價的

商用機器上支援實時資料的快速分析。

Cloudera

公司開發的

Kudu

就是一個很好的選擇。

Kudu

支援單行資料的插入、更新和删除操作,但是使用

Kudu

用來存儲不再發生改變的曆史資料,也顯得有些昂貴了。對于曆史資料的存儲,廉價而穩定的

HDFS

是一個更好的選擇。受實時計算中滑動視窗的啟發,基于

Impala

計算引擎相容

Kudu

HDFS

兩個存儲引擎的特性,參考

Kudu

最新的Blog設計了後文的方案,揚

HDFS

Kudu

之長,避

HDFS

Kudu

之短,進而取得一個相對平衡的效果,實作實時資料的快速分析。

滑動視窗模式

滑動視窗類似于下圖,随着時間的前進,

Kudu

會不斷建立新的時間分區,不再變化的曆史資料導入

HDFS

存儲,再删除

Kudu

的曆史分區。

定時跑視圖往另外一張表添加資料_實時計算實踐:實時資料快速分析的解決方案...

我們在

Impala

中,分别建立一張

Kudu

表和

Paruqet

格式的

HDFS

表,兩張表使用同樣的時間分區格式,時間分區可以選擇天、月、年。對于資料的查詢,可以選擇建立

View

視圖友善兩張表的查詢,也可以使用

Where+Union All

語句對兩張表合并查詢。

Impala

可以根據表不同的資料存儲方式選擇最适合的優化方式。

Kudu

表和

HDFS

表的時間分隔界限可以根據實時資料的遲到情況決定。

此時的滑動視窗模式實作了:

  • 業務和營運人員可以立即查詢到實時資料的變化。
  • 在一定時間段内可以對錯誤資料進行修正和補充。
  • 避免了

    HDFS

    小檔案産生。
  • 減少了

    Kudu

    的系統開銷。

流程的具體分析

  1. Kudu

    表、

    Parquet

    格式的

    HDFS

    表,可以選擇建立相應的

    View

    視圖或者不建立。
  2. Kudu

    預添加時間分區,防止新來的資料會因為找不到新的分區而報

    Non-Range

    的錯,丢失資料。
  3. Kudu

    複制曆史的不可變資料到

    HDFS

    層。此時的資料有可能會在

    HDFS

    Kudu

    層出現備援,是以需要使用

    where

    語句保證資料不會出現備援。如果有使用

    View

    視圖的話,需要執行

    Alter

    語句更新

    View

  4. 運作

    COMPUTE STATS

    更新

    Impala

    的中繼資料資訊,并删除

    Kudu

    舊的分區。

執行個體

  1. 建表
CREATE TABLE db.kudu_table 
(  
    kafka_offset string,
    data         string,
    event_time   timestamp,
   PRIMARY KEY (kafka_offset, event_time)
) 
PARTITION BY 
   HASH (kafka_offset) PARTITIONS 4,
   RANGE (event_time)
     PARTITION '2019-03-28' <= VALUES < '2019-03-29',
     PARTITION '2019-03-30' <= VALUES < '2019-03-31'
   )
COMMENT 'xxx' 
STORED AS KUDU;

CREATE TABLE db.hdfs_table 
(  
    kafka_offset string,
    data         string,
    event_time   timestamp
) 
PARTITION BY (insert_time string COMMENT '曆史時間' ) 
COMMENT 'xxx' 
STORED AS PARQUET;
           

假設我們的資料是從

Kafka

中實時讀取的,為了保證消息的精确一次,可以選擇

Kafka

Offset+Partition

進行組合作為

Kudu

的主鍵保證消息在資料庫存儲時有且僅有一次記錄。主鍵

Hash

Bucket

的個數取決于你的資料更新頻率。

注意:下面的任務均由定時排程工具執行,定時排程工具可以選擇Airflow、Crontab等工具。
  1. 提前添加

    Kudu

    分區。
ALTER TABLE db.kudu_table 
ADD IF NOT EXISTS 
RANGE PARTITION "2019-04-01" <= VALUES < "2019-04-02";
           
  1. 将兩天前的資料更新進

    HDFS

    表。這裡使用的是

    Airflow

    的文法。
INSERT OVERWRITE TABLE db.hdfs_table partition(history_time='{{ macros.ds_add(next_ds, -2}}')
SELECT kafka_offset, data, event_time 
FROM db.kudu_table
where "{{ macros.ds_add(next_ds, -2) }}" <= event_time
and event_time < "{{ macros.ds_add(next_ds, -1) }}";
           
  1. 确定資料是否更新進去

    HDFS

    表。
select count(*) 
from db.hdfs_table 
where history_time='{{ macros.ds_add(next_ds, -2}}'
           
  1. 為了防止意外會選擇删除三天前的分區,而不是當天更新進

    HDFS

    層的資料。
ALTER TABLE db.kudu_table 
DROP IF EXISTS 
RANGE PARTITION "{{ macros.ds_add(next_ds, -3}}" <= VALUES < "{{ macros.ds_add(next_ds, -2}}";
           
  1. 為了

    Impala

    獲得最好的運作性能,使用

    COMPUTE STATS

    文法,預計算和更新

    Impala

    的中繼資料。
COMPUTE STATS db.kudu_table;
COMPUTE INCREMENTAL STATS db.hdfs_table;
           
  1. 可以建立

    View

    視圖對資料進行曆史資料和實時資料的聯合查詢,也可隻針對

    kudu_table

    查詢實時資料的更新情況。
CREATE VIEW unified_view AS
SELECT kafka_offset, data, event_time
FROM db.kudu_table
WHERE event_time >= date_add(now(), -2)
UNION ALL
SELECT kafka_offset, data, event_time
FROM db.hdfs_table
WHERE history_time < date_add(now(), -2)
           

總結

其實我們換個視角,使用

The Beam Model

去看待整個流程,我們會發現:

  • 計算的結果是什麼?資料從

    Kafka

    流入

    Kudu

    ,再轉換成

    Paruqet

    格式檔案。transformations
  • 在事件時間中的哪個位置計算結果?選擇滑動視窗,在兩天的時間間隔處計算資料,将

    Kudu

    資料轉換成

    Parquet

    格式。windowing
  • 在處理時間中的哪個時刻觸發計算結果?使用固定的處理時間(一天間隔)觸發資料的轉換,遲到兩天的資料便不再處理(水印)。triggers + watermarks
  • 如何修正結果?資料隻會不斷增加,不會修改。accumulation

對The Beam Model感興趣的可以參考下面的文章:

《Streaming Systems》第一章

《Streaming Systems》第二章

《Streaming Systems》第三章

《Streaming Systems》第四章

Kudu的blog:https://kudu.apache.org/2019/03/05/transparent-hierarchical-storage-management-with-apache-kudu-and-impala.html