Streaming SQL在貝殼的演進之路
原文發自公司對外公衆号,作者筆名顧淵離,本人,轉載請注明出處
背景
實時計算平台是貝殼内部統一承接實時需求和管理實時任務的平台,支援了公司埋點、商機、交易、商業化等若幹部門的業務,目前總共運作了570多的實時計算任務,日志流量單日吞吐1041億。
随着實時數倉等業務的推進,越來越多的任務接入到我們平台,使我們開發和運維實時任務的成本急劇升高,于是我們迫切希望能有一種快速開發和易維護實時任務的方法,是以我們把希望的目光投向了SQL。我們都知道SQL作為一種通用的描述資料處理的語言,有着統一的标準和廣泛的使用。它的學習成本低,開發效率高,行業内有着完整的生态,成熟的優化規則。但是SQL其實更多的是線上上系統的面向關系資料庫的OLTP場景和離線數倉的OLAP場景中使用。那麼能否将SQL應用到實時計算的場景來提升實時任務的開發效率呢?
帶着這個問題,我們開始了SQL on Streaming的挑戰。
SQL on Streaming
問題
我們都知道,SQL是作用在有限資料集上的,查詢引擎讀取完整可用的資料集,生成固定大小的結果。相比之下,資料流持續提供新的記錄,資料随着時間到達,是以流查詢必須持續的處理到達的資料。是以問題變成了如何在流上定義一個能夠是SQL作用在其上的視圖。
動态表
動态表概念的提出,用來支援對結果更新的查詢,保留流和批輸入的統一語義。動态表是一個不斷更新的表,可以像正常的表一樣進行查詢,但是,對動态表的查詢是連續運作的,而且根據輸入表的不斷更新,查詢出來的結果表也是不斷更新的,是以結果表也是一個動态表。 到這,我們可以了解出其實流和動态表是同一種東西,隻是在不同角度看待問題而産生的不同概念罷了,是以二者其實是可以互相轉換的, 我們可以将流轉換為動态表,也可以将動态表轉換為流。下圖顯示了可以在流上處理關系查詢的概念模型。
那麼我們如何在流上定義一個動态表?有兩種模式:追加和更新。追加模式下,流中的每條新增記錄都會插入到動态表中,不會對動态表中的曆史資料做修改;
而更新模式下,流中的每條記錄都會根據其唯一的健值或新增,或對曆史的資料做更新操作。
持續查詢
由于動态表随時間而變,我們必須定義查詢動态表的意義。 假設我們有一個動态表A,那麼我們可以定義它在時間t時刻的快照為A[t],這時就可以使用任意的SQL來對其進行查詢,生成一個結果表。我們定義查詢的操作為q,将查詢作用到A[t]上,我們表示為q(A[t]),生成的結果表R[t],于是我們有:
R[t] = q(A[t])
在動态表上的持續查詢,表示為在t的每個時間點,都會執行一個t時刻在動态表A上的批量查詢,而所生成的結果表也同樣是一個持續更新的動态表,它可能會不斷更新先前的行,也可能添加新行。
好了,理論部分說完了,那麼我們如何将SQL on Streaming有效的落地呢。在當時平台的所有任務都還是在使用spark技術棧,而且當時Spark新推出了Structured Streaming子產品,是以我們自然而然的将該子產品作為我們将SQL on Streaming落地的第一階段的解決方案。
SQL 1.0
我們當時調研的版本是2.3.1,是Structured Streaming剛剛推出不久,其雖然支援一些比較複雜的視窗聚合操作,但是限制也比較多,而且其僅在api層面上提供較為完善的語義,而在SQL層面并沒有提供較好的支援,而且就平台上的任務來說,也沒有在流維關聯、視窗聚合這方面有比較強烈的需求,是以我們最初對于産品的定位也僅僅是做一些簡單的ETL工作。于是,最初的設計如下:
僅支援簡單的select * from xxx where conditions...和union語句
在一個作業裡可以以臨時視圖的形式定義多個邏輯節點,最終視圖之間的依賴關系構成了整個作業的DAG
針對上下遊資料提供formatter,對上下遊的資料提供格式化輸入輸出的能力。
對不同的外部存儲,提供多種适配的元件來支援多種第三方存儲的寫能力
産品展示
下圖是我們使用SQL 1.0配置出的一個對業務後端日志做簡單ETL的任務。可以看到這裡面有五個節點,上遊source_kafka_1是讀取外部kafka中的後端日志,中間有三個節點,是處理過程,最後綠色的sink_kafka_1是将處理後的資料輸出到下遊kafka中的topic。
下圖是對輸入端kafka的配置:
裡面包含了要連接配接的kafka叢集名稱,訂閱的topic,讀取資料的格式,字段和類型。對于kafka中的資料格式,我們提供了4種定義schema和解析的方式:
預設:所有資料按一個單獨的列提供,不做解析,列名可自定義
json:對json做深度解析,以jsonpath的方式定義各列的通路路徑。
正則:以正規表達式的形式提取各列資料,比對出正則中第一組括号的比對項,此種方式一般用作資料格式不規範的場景
分隔符:可指定自定義分隔符,以索引的方式定義各列資料的通路方式。
此外,在定義中間若幹處理節點時,除要配置SQL語句外,還要指定上遊依賴的節點:
最後,在配置sink元件時,我們要指定上遊的資料依賴,輸出到下遊的kafka叢集名稱,topic,和輸出資料格式,包括想要的字段名稱,字段格式,提供了将string轉換成json object/array的能力。
産品設計
下面是設計的元件類圖:
Plugin是所有Source,Operator,Sink的父類
UpStream是對所有上遊元件的封裝,封裝了本節點生成的視圖名稱,所有連接配接的下遊節點,和擷取本節點的DataFrame的方法,SqlSource和SqlOperator都可以作為上遊元件
DownStream是對所有下遊元件的封裝,維護了所有的上遊依賴,SqlOperator和SqlSink都可以作為下遊元件。
SqlSource是對輸入資料源頭的封裝,提供了建構Formatter的能力
SqlOperator是對中間處理節點的封裝,目前僅支援簡單Filter和Union
SqlSink封裝了将上遊資料輸出到外部的能力,目前支援的Sink類型包括kafka,console,druid
目前平台上運作着的SQL 1.0的實時任務有60個,而且數量還在增加。
checkpoint
我們知道在定義Structured Streaming任務的時候是必須指定checkpoint路徑的,而且Structured Streaming底層在做checkpoint的時候是同步的,也就是說會阻塞整體的處理進度。當淩晨有大量離線任務運作導緻hdfs繁忙的時候,會導緻checkpoint阻塞,進而導緻資料的積壓和延遲,這對下遊一些對實時性要求較高的系統來說是不可接受的。
offset
Structured Streaming的消費進度是儲存在checkpoint裡的,而且每次任務重新開機都會直接從checkpoint裡讀取offset,并且會在driver端屏蔽掉自動送出offset功能,如此會有以下幾個問題:
消費進度和消費組無關,無法從消費組上共享消費進度。
無法從外部監控到消費進度,積壓等情況,不利于任務的監控報警。
重置offset流程複雜。
由于以上原因,以及考慮到後續對業務的支援,建構公司級别的實時數倉,實時名額等能力,我們不得不對SQL on Streaming的能力做整體更新。是以,SQL 2.0應運而生。
SQL 2.0
首先,我們對SQL 2.0的産品定位為提供一種以純SQL的方式來定義流式作業。那麼圍繞這一宗旨,SQL 2.0應該具備哪些能力呢?
定義維表,且能實時維表關聯
支援定義邏輯視圖
支援自定義UDF
對時間視窗的支援,包括各種分組聚合
在調研了多款開源産品之後我們選擇了Flink這款目前最火熱的開源流處理架構,它對SQL的支援是目前所有實時處理架構中最好的。 是以我們最終決定在Flink-SQL上做擴充,來實作我們的功能。
DDL
考慮到在引擎應該隻提供能力,而和管理着中繼資料系統打通應該屬于應用層的能力,是以我們初步設計是希望能用一種DDL的方式來打通應用層和引擎層,同時用來定義外部資料源,而Flink SQL中暫還并為提供這種能力。是以,我們基于Antlr4工具,對标準SQL中的DDL文法做了一些擴充,使其不但能夠描述外部資料的格式,而且能夠提供通路外部資料的一些連接配接資訊,以及提取特定字段的能力。
示例建表語句如下:
--定義資料源表
create source table kafka_source_tb (
system_type string extraction '$.system_type',
client_os_type string extraction '$.client_os_type',
ucid string extraction '$.ucid',
ts bigint extraction '$.timestamp',
watermark key ts delay for 5s
) with (
type = 'kafka',
dataType = 'json',
brokers = 'off03-bigdata.mars.ljnode.com:9092',
topics = 'data-pipeline-common-dev',
groupId = 'test-group'
);
--定義維表
create dim table redis_dim_tb (
ucid string,
first_login_time string,
device_id string,
primary key ucid
type = 'redis',
server = '127.0.0.1',
port = '6379',
cache = 'all'
--定義輸出表
create sink table console_sink_tb (
client_os_type string,
system_type string,
ts bigint
type = 'console',
dataType = 'json'
with語句塊中是對外部資料源的連接配接描述資訊
資料源表中的watermark key $xxx delay for $period為定義水印的語句
維表中的primary key用來定義主鍵,主要是為了在維表關聯時查找外部資料
引擎層在執行時,會将source和sink表的定義翻譯成底層的flink api,将表資訊注冊到flink的TableEnvironment中。
維表關聯
到目前為止,我們已經在SQL 2.0中提供了1.0中的全部能力,理論上已經能夠提供給使用者做簡單的ETL使用了。但是作為一個有理想的青年,我們并不滿足于此。而且,為了後續提供建構實時數倉和實時名額的能力,簡單的ETL是不夠的,我們還要和很多的外部資料做關聯。 為了滿足這方面的需求,我們基于calcite的SQL解析功能和阿裡為社群貢獻的Async I/O功能實作了維表關聯的能力。下面對這部分功能的設計做詳細介紹。
設計
首先,思路就是先解析SQL,如果在join語句後面出現了使用者定義的維表,就觸發維表關聯的SQL改寫邏輯,改寫邏輯如下:
抽離嵌套子查詢,獨立注冊成臨時表
合并流表關聯維表節點,提取關聯條件
對流表的DataStream做轉換,使用Async I/O,通路外部存儲做資料關聯
替換原SQL語句中所有被流表和維表原表名限定的字段的所屬表名,生成新的有效的SQL
這樣了解起來可能有些困難,我們在上面三個DDL的基礎上舉個例子,使用如下DML來定義一個作業:
insert into console_sink_tb (
ucid, first_login_time, device_id,
client_os_type, system_type)
select
ucid, first_login_time, device_id,
client_os_type, system_type
from kafka_source_tb a
join redis_dim_tb b
on a.ucid = b.ucid
由于本例中不存在嵌套查詢,第一步可以跳過。
第二步:合并流維關聯節點,提取關聯條件。
上述SQL經過SQL解析後,會生成如下文法樹:
該文法樹經過轉換後,變為
翻譯成SQL後就是:
a_J_b.ucid,
a_J_b.first_login_time,
a_J_b.device_id,
a_J_b.client_os_type,
a_J_b.system_type
from a_J_b
第三步:對流表的DataStream做轉換,使用Async I/O,通路外部存儲做資料關聯。根據我們前面講的概念,流和動态表是可以互相轉換的,是以,我們可以先将流表轉換成DataStream,然後根據維表定義中的連接配接資訊通路外部資料源,再根據提取出的關聯條件,擷取和過濾出我們需要的資料,實際上是通過RichAsyncFunction提供的異步并行執行的能力,能夠同時請求多條資料,提高效率。将關聯後的資料流注冊成中間表,表名即流維表節點合并的名字(a_J_b),這樣就可以将轉換後的SQL語句作用到該表上了。
第四步需要對原SQL語句的where子句或group by子句以a.client_os_type的方式中引用到字段的所屬表名做替換,将a替換成a_J_b。因為當我們将流表關聯維表合并為一個節點後,原來的a已經變成了一個不可識别的辨別符了。
緩存優化
在生産環境中,絕大多數的流都是很快的,如果每條資料都要通路一次外部存儲,那麼,除了整體的性能會差以外,對外部存儲也會造成很大壓力,甚至會把外部系統壓垮。另一方面,考慮到其實很多元表變更并不頻繁,而且數量也不會很大,那麼,我們完全可以将維表的資料緩存在記憶體中,設定好過期政策做到同步更新。 對于開啟了緩存的維表,記憶體中的緩存在任務剛啟動時是空的,這會有一個預熱的階段。另外可對緩存設定兩種過期政策,一種是緩存大小,一種是過期時間。緩存大小政策是根據LRU算法進行資料淘汰,過期時間是根據最後讀取資料的時間,當資料被緩存淘汰,程式會重新查詢外存,并更新到緩存中,以此來實作緩存中資料的更新。
另一方面當資料量比較大時,單節點不足以将絕大多數次元資料緩存,可以預先根據與參與關聯的維表主鍵對應的流表字段做預分區,即根據某一個字段,保證該字段下同值的記錄總被配置設定到同一個下遊節點上,這樣每個下遊節點隻緩存本節點能用到的資料,且能保證該部分的值域僅占總量的很小一部分。
關聯條件支援表達式
可以看到上述方案雖然解決了流表和維表關聯的問題,但是是有很多限制的。比如說我們拿hbase作為維表來舉例,就要求關聯條件中必須包含hbase的rowkey,而且rowkey必須作為關聯條件的一部分,其值是必須能夠直接從流表中取到的,也就是要求關聯條件中隻能是EQUAL類型的表達式,而且等号兩邊必須隻能是對列的簡單引用。
但是很多時候,hbase的rowkey可能并不是一個單一含義的值,它也可能是一個業務邏輯上的聯合主鍵,需要将多列拼接起來才能構成一個完整的rowkey,這個時候關聯條件的限制就成為了一個使用上的痛點,使用者當然可以通過定義臨時視圖的形式繞過在這個限制,但是這樣又增加了使用者的使用成本,是以我們也集中精力解決了這個問題。
我們看下面這個例子:
select a.col1, a.col2, b.coll
from a
join b
on a.col1 + a.col3 = b.col
其實思路很簡單,我們在關聯維表前,需要将流表a轉換成DataStream,其實就是在轉換的過程中添加一個環節。a中僅包含col1,col2,col3字段,但是關聯條件中需要col1 + col3,是以我們将表a轉換成臨時視圖__function_tmp_view__0,它表達的邏輯如下SQL
a.col1,
a.col2,
a.col3,
a.col1 + a.col3 AS __function_tmp_field__0
這樣關聯條件就變成了__function_tmp_field__0 = b.col,相當于我們将表達式的值的計算過程移動到了我們自動添加的臨時視圖中,将計算過程提前了,這樣我們在關聯的時候就能夠直接擷取關聯條件中所需要的值了。
維表的接入
實際上業務方在有維表關聯的需求時很多都是希望能夠直接關聯業務庫,但是受限于叢集通路業務庫的安全和穩定性問題,我提供了一種額外的方式能夠将使用者的業務庫資料通過binlog的方式來實時同步到hbase和redis中,而SQL引擎隻需要去和HBase或Redis中的資料做關聯就可以了。
目前我們已經支援了HBase,Redis,Http,Phoenix等方式的維表關聯。
新的問題
以上就是我們在建構Streaming SQL 2.0的過程中遇到的一些問題和解決辦法,在SQL 1.0中存在的checkpoint和offset問題在2.0中被架構自身所消化,但也随之給我們帶來了新的挑戰。
Flink内部的狀态管理如何優雅的配置和管理起來
Flink的checkpoint/savepoint功能如何利用和管理
等等新問題都需要我們去解決。
平台化建設
SQL 2.0在19年8月份于我們平台上上線,目前已經有200多個任務正在運作,覆寫了實時數倉,實時交易,商業化,租賃等業務部門;涉及了ETL,維表關聯,資料落地clickhouse等場景,而且任務數量增長很快。
實時計算平台目前已經內建了資料源管理功能,用于管理實時領域的結構化資料元資訊,使用者可以預先配置和共享資料源,資料源可以自動拉取樣例資料,生成schema資訊。
在SQL 2.0任務配置過程中,使用者可以選擇已經存在的資料源,後端會自動生成自定義的DDL。這樣使用者隻需編寫DML定義處理邏輯就可以了,無需編寫複雜的DDL,提升了使用者的開發效率。最右側是對任務中涉及的資料源的管理功能。
另外我們也提供了SQL的文法校驗功能,使用antlr4自定義文法檔案解析和校驗DDL語句,使用calcite解析和校驗DML語句,能夠做到線上驗證SQL文法。 此外,我們也提供了檢視執行計劃,和線上Debug等功能,能夠大大提升使用者的開發效率和debug效率。
任務運維
目前貝殼的所有實時任務是統一托管在Hermes實時計算平台上的,而SQL 2.0在平台上隻是一個特殊的場景任務。整體的任務運維和監控報警由平台統一管控。做到任務名額上報和監控,任務心跳監控,任務失敗自動拉起等功能。
挑戰 && 規劃
最後是我們對未來工作的一些思考和規劃。
資料血緣管理,可以直覺清晰的幫助我們了解實時資料和任務之間的依賴關系。
資料流量監控,采集和監控資料流量資訊,做好資料量增長的預警工作,同時資料流量的采集資料也能為我們對資源的使用情況,以及任務的資源評估做一個依據。
資源自适應,可以根據上遊流量資訊動态估算一個任務大概需要多少資源
大狀态優化和管理,對于flink任務中可能出現的大任務狀态的管理和查詢功能
實時數倉,基于Flink-SQL建構的實時數倉及其相關生态的工具鍊
原文位址
https://my.oschina.net/nalenwind/blog/3286519