作者:陶克路,花名敵琺,阿裡巴巴技術專家。Apache Pulsar 等開源軟體 Contributor。技術領域包括大資料和雲原生技術棧,目前緻力于建構大資料領域業界領先的 APM 産品。 原文連結 距離 Google 的上一篇 F1 論文,也就是 F1: A Distributed SQL Database That Scales 已經 5 年過去了,Google 在今年的 VLDB 上終于釋出了 F1 的新版本 F1 Query: Declarative Querying at Scale,我們今天就來看一下這篇論文。安利一下,在 PingCAP 的 paper party 上,黃東旭, 首頁連結
大神對這篇論文的講解非常精彩,文章中也部分引用了他的觀點,在此鳴謝。
2013 年的 F1 是基于 Spanner,主要提供 OLTP 服務,而新的 F1 則定位則是大一統:旨在處理 OLTP/OLAP/ETL 等多種不同的 workload。但是這篇新的 F1 論文對 OLTP 的讨論則是少之又少,據八卦是 Spanner 開始原生支援之前 F1 的部分功能,導緻 F1 對 OLTP 的領地被吞并了。下面看一下論文的具體内容,疏漏之處歡迎指正。
0. 摘要
F1 Query 是一個大一統的 SQL 查詢處理平台,可以處理存儲在 Google 内部不同存儲媒體(Bigtable, Spanner, Google Spreadsheet)上面的不同格式檔案。簡單來說,F1 Query 可以同時支援如下功能:OLTP 查詢,低延遲 OLAP 查詢,ETL 工作流。F1 Query 的特性包括:
- 為不同資料源的資料提供統一視圖
- 利用資料中心的資源提供高吞吐和低延遲的查詢
- High Scalability
- Extensibility
1. 背景
在 Google 内部的資料處理和分析的 use case 非常複雜,對很多方面都有不同的要求,比如資料大小、延遲、資料源以及業務邏輯支援。結果導緻了許多資料處理系統都隻 focus 在一個點上,比如事務式查詢、OLAP 查詢、ETL 工作流。這些不同的系統往往具有不同的特性,不管是使用還是開發上都會有極大的不便利。
F1 Query 就在這個背景下誕生了,用論文中的話說就是
we present F1 Query, an SQL query engine that is unique not because of its focus on doing one thing well, but instead because it aims to cover all corners of the requirements space for enterprise data processing and analysis.
F1 Query 旨在覆寫資料處理和分析的所有方面。F1 Query 在内部已經應用到了多個産品線,比如 Advertising, Shopping, Analytics 和 Payment。
在 F1 Query 的系統設計過程中,下面幾點考量具有非常關鍵的作用。
- Data Fragmentation: Google 内部的資料由于本身的特性不同,會被存儲到不同的存儲系統中。這樣會導緻一個應用程式依賴的資料可能橫跨多個資料存儲系統中,甚至以不同的檔案格式。對于這個問題,F1 Query 對于這些資料提供一個統一的資料視圖。
- Datacenter Architecture: F1 Query 的目标是多資料中心,這個和傳統的 shared nothing 架構的資料處理系統不同相同。傳統的模式為了降低延遲,往往需要考慮 locality,也就是資料和計算越近越好。由于 Google 内部的網絡環境優勢,locality 的優勢顯得不是那麼重要。是以 F1 Query 更強調計算和存儲分離,這樣計算節點和存儲節點的擴充性(scalability)都會更好。畢竟 Google 内部的系統,scalability 才是第一法則。還有一點值得一提的是,由于使用了 GFS 的更強版本: Colossue File System,磁盤不會成為瓶頸。
- Scalability: 在 F1 Query 中,short query 會在單個節點上執行,larger query 會以分布式的模式執行,largest query 以批處理 MapReduce 模式執行。對于這些模式,F1 Query 可以通過增加運算的并行度來優化。
- Extensibility: 對于那些無法用 SQL 語義來表達的查詢需求,F1 通過提供 user-defined functions (UDF)、user-defined aggregate functions (UDAs) 和 table-valued functions (TVF) 來支援。
2. 架構
F1 的架構圖如下所示:

下面的方框裡面是每個 Datacenter 一套。關于各個元件的介紹如下:
- 使用者通過 client libary 和 F1 Server 互動
- F1 Master 負責 query 的狀态的運作時監控和其他元件的管理
- F1 Server 收到使用者請求,對于 short query 直接單機執行查詢;對于 larger query 轉發到多台 worker 上并行執行查詢。最後再彙總結果傳回給 client。
- F1 Worker 負責具體查詢執行
- F1 Server 和 Worker 都是無狀态的,友善擴充
2.1 query 執行
使用者通過 client libary 送出 query 到 F1 Server 上,F1 Server 首先解析和分析 SQL,然後将涉及到的資料源提取出來,如果某些資料源在目前 datacenter 不存在,則直接将 query 傳回給 client 并告知哪些 F1 Server 距離哪些資料源更近。這裡直接将請求傳回給業務層,由業務層去 retry,設計的也是非常的簡單。盡管前面說到要将存儲和計算分離,但是這個地方的設計還是考慮到了 locality,datacenter 級别的 locality,畢竟 locality 對查詢延遲的影響還是巨大的。
F1 Server 将 query 解析并優化成 DAG,然後由執行層來執行,具體執行模式(interactive 還是 batch)由使用者指定。原文是: Based on a client- specified execution mode preference, F1 Query executes queries on F1 servers and workers in an interactive mode or in a batch mode.
對于互動式查詢模式(interactive mode)有單節點集中執行模式和多節點分布式執行模式,query 優化會根據啟發式的算法來決定采用哪種模式。集中式下,F1 Server 解析分析 query,然後在目前節點上直接執行并接收查詢結果。分布式下,接收 query 的 F1 Server 充當一個 query coordinator 的角色,将 query 拆解并下發給 worker。互動式查詢在資料量不太大的情況下往往具有不錯的性能和高效的資源使用率。
除了互動式查詢還有一種模式是批處理模式(batch mode)。批處理模式使用 MapReduce 架構異步送出執行執行,相比互動式這種 long-running 方式,批處理模式的可靠性(reliabitly)更高。
2.2 資料源
資料查詢支援跨 datacenter。存儲計算分離模式使得多資料源的支援更加簡單,比如 Spanner, Bigtable, CSV, columnar file 等。為了支援多資料源,F1 Query 在他們之上抽象出了一層,讓資料看起來都是存儲在關系型表裡面。而各個資料源的中繼資料就存儲在 catalog service 裡面。
對于沒有存儲到 catalog service 裡面的表資料,隻要提供一個DEFINE TABLE即可查詢。
DEFINE TABLE People(
format = ‘csv’,
path = ‘/path/to/peoplefile’,
columns = ‘name:STRING,
DateOfBirth:DATE’);
SELECT Name, DateOfBirth FROM People
WHERE Name = ‘John Doe’;
論文中沒有提到的是單看這個 DEFINE TABLE 可以表現力不夠,所說這些資訊并不足以表現出資料的行為:
- 是否支援 partition?
- 是否支援 邏輯下推?
- 是否支援索引?
- 是否支援多種 掃描模式?
- 對于新資料源的支援可以通過 Table-Valued Function (TVF) 的方式來支援。
2.3 Data Sink
query 的結果可以直接傳回給 client,也可以插入到另外一個表裡面。
2.4 SQL
SQL 2011。之是以是 2011 是因為其他老的系統使用的是 2011。
3. 互動式查詢
互動式查詢模式是預設的查詢模式。如前所述,互動式查詢有集中式和分布式,具體使用哪種由優化器分析 client 的 query 然後決定。
3.1 Single Threaded Execution Kernel
集中式的查詢如下圖所示,是一種 pull-based 的單線程執行方式。
3.2 Distributed Execution
如前所述,由優化器分析完 query 決定是否采用分布式模式。在分布式這種模式下接收到 query 的 F1 Server 充當一個 coordinator 的角色,将執行 plan 推給 worker。worker 是多線程的,可以并發執行單個 query 的無依賴的 fragment。Fragment 是執行計劃切分出來的執行計劃片段,非常像 MR 或者 Spark 中的 stage。Fragment 之間通過 Exchange Operator (資料重分布) 連接配接。
Fragment 的切分過程如下:優化器使用一種基于資料分布依賴的 bottom-up 政策。具體來說每個算子對于輸入資料的分布都有要求,比如 hash 或者依賴其他字段的分布。典型的例子有 group by key 和 hash join。如果目前的資料分布滿足前後兩個算子的要求,則兩個算子就被放到一個 Fragment 裡面,否則就被分到兩個 Fragment 裡面,然後通過 Exchange Operator 來連接配接。
下一步就是計算每個 Fragment 的并行度,Fragment 之間并行度互相獨立。葉子節點的 Fragment 的底層 table scan 決定最初的并行度,然後上層通過 width calculator 逐漸計算。比如 hash-join 的底層兩個 Fragment 分别是 100-worker 和 50-worker,則 hash-join 這個 Fragment 會使用 100-worker 的并行度。下面是一個具體的例子。
SELECT Clicks.Region, COUNT(*) ClickCount
FROM Ads JOIN Clicks USING (AdId)
WHERE Ads.StartDate > ‘2018-05-14’ AND
Clicks.OS = ‘Chrome OS’
GROUP BY Clicks.Region
ORDER BY ClickCount DESC;
上面 SQL 對應的 Fragment 和一種可能 worker 并行度如下圖所示:
3.3 Partitioning Strategy
資料重分布也就是 Fragment 之間的 Exchange Operator,對于每條資料,資料發送者通過分區函數來計算資料的目的分區數,每個分區數對應一個 worker。Exchange Operator 通過 RPC 調用,擴充可以支援到每個 Fragment 千級的 partion 并發。要求再高就需要使用 batch mode。
查詢優化器将 scan 操作作為執行計劃的葉子節點和 N 個 worker 節點并發。為了并發執行 scan 操作,資料必須要被并發分布,然後由所有 worker 一起産生輸出結果。有時候資料的 partition 會超過 N,而 scan 并發度為 N,多餘的 partition 就交由空閑的 worker 去處理,這樣可以避免資料傾斜。
3.4 Performance Considerations
F1 Query 的主要性能問題在于資料傾斜和通路模式不佳。Hash join 對于 hot key 尤為敏感。當 hot key 被 worker 載入到記憶體的時候可能會因為資料量太大而寫入磁盤,進而導緻性能下降。
論文中舉了一個 lookup join 的例子,這裡不打算詳述了。
對于這種資料傾斜的問題,F1 Query 的解決方案是 Dynamic Key Range,但是論文中對其描述還是不夠詳細。
F1 Query 對于互動式查詢采用存記憶體計算,而且沒有 check point。因為是記憶體計算,是以速度非常的快,但是由于沒有 checkpoint 等 failover 的機制,隻能依賴于業務層的重試。
4. 批處理
像 ETL,都是通過 Batch Mode 來處理的。Google 以前都是通過 MapReduce 或者 FlumeJava 來開發的,開發成本一般比較高。相比 SQL 這種方式,不能有效複用 SQL 優化,是以 F1 Query 選擇使用 SQL 來做。
如前所述,互動式查詢不适合處理 worker failure,而 batch mode,也就是批處理這種模式特别适合處理 failover(每一個 stage 結果落盤)。批處理模式複用互動式 SQL query 的一些特性,比如 query 優化,執行計劃生成。互動式模式和批處理模式的核心差別在于排程方式不同:互動式模式是同步的,而批處理模式是異步的。
4.1 Batch Execution Framework
批處理使用的架構是 MapReduce,Fragment 被抽象成 MapReduce 中的 stage,stage 的輸出結果被存儲到 Colossus file system (GFS 二代)。
在 Fragment 映射有一點值得注意的是嚴格來說,Fragment 的 DAG 映射到 mr 是 map-reduce-reduce,對這種模式做一個簡單的變通變成:map-reduce-map<identity>-reduce,如下圖:
關于 MapReduce 的更詳細資訊可以參考 Google 03 年那篇論文。
4.2 Batch Service Framework
Framework 會對 batch mode query 的執行進行編排。具體包括:query 注冊,query 分發,排程已經監控 mr 作業的執行。當 F1 Server 接收到一個 batch mode query,它會先生成執行計劃并将 query 注冊到 Query Registry,全局唯一的 Spanner db,用來 track batch mode query。Query Distributor 然後将 query 分發給 datacenter。Query Scheduler 會定期從 Registry 拿到 query,然後生成執行計劃并交給 Query Executor 來處理。
Service Framework 的健壯性非常好:Query Distributor 是選主(master-elect)模式;Query Scheduler 在每個 datacenter 有多個。query 的所有執行狀态都是儲存在 Query Registry,這就保證其他的元件是無狀态的。容錯處理:MapReduce 的 stage 會被重試,如果 datacenter 出問題,query 會被配置設定到新的 datacenter 上重新執行。
5. 優化器
SQL 優化器類似 Spark Catalyst,架構如下圖,不細說了。
6. EXTENSIBILITY
對于很多複雜業務邏輯無法用 SQL 來描述,F1 針對這種情況提供了一種使用者自定義函數的方法,包括 UDF (user-define functions),UDA (aggrega- tion functions) 和 TVF (table-valued functions)。對于簡單的UDF需求,通常直接以SQL或者LUA的形式作為query的一部分;對于更複雜或者性能要求高的UDF需求,則可以用其它進階語言以UDF Server的形式實作。
UDF Server 和 F1 Query 是 RPC 調用關系,有 client 單獨部署在同一個 datacenter。udf server 完全有 client 來控制,無狀态,基本可以無限擴充。
6.1 Scalar Functions
UDF 并不是新的概念,UDF Server 這種部署方式看上去還算新穎一點。但是 UDF Server 這種單獨部署模式一個可能的問題是延遲問題,這裡通過批量流水線的方式來減少延遲。下面是 UDF 的一個例子。
local function string2unixtime(value)
local y,m,d = match("(%d+)%-(%d+)%-(%d+)")
return os.time({year=y, month=m, day=d})
end
6.2 Aggregate Functions
UDA 是對多行輸入産生一個單一的輸出,要實作 UDA,使用者需要實作算子 Initialize, Accumulate, and Finalize。另外如要要對多個 UDA 的子聚合結果進行再聚合,使用者可以實作 Reaccumulate。
6.3 Table-Valued Functions
TVF 的輸入是一個 table,輸出是另外一個 table。這種在機器學習的模型訓練場景下比較有用。下面是論文中的具體的一個例子:EventsFromPastDays 就是一個 TVF。
SELECT * FROM EventsFromPastDays(
3, TABLE Clicks);
當然 TVF 也支援用 SQL 來描述,如下。
CREATE TABLE FUNCTION EventsFromPastDays(
num_days INT64, events ANY TABLE) AS
SELECT * FROM events
WHERE date >= DATE_SUB(
CURRENT_DATE(),
INTERVAL num_days DAY);
7. Production Metric
下面是 F1 Query 在 Production 環境下的幾個 metrics。
8. 總結
回過頭來看 F1 Query 最新的這篇論文給人最大的啟發就是大一統的思想,這個很有可能是行業發展趨勢。回想一下 MapReduce 論文由 Google 于 2003 年發表,開源實作 Hadoop 于 2005 問世。不妨期待了一下未來的 3 到 5 年的 F1 Query 的開源産品。
阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。