天天看點

Presto查詢優化拾遺

作者:小蝦好望角

Grouped Execution

為了友善大家了解 Grouped Execution 的原理,我們先來介紹兩個概念:分桶 和 Hash Join。

1.1 分桶

  • 其實 Hive 表中桶的概念就是 MapReduce 的分區的概念,兩者完全相同。實體上每個桶就是目錄裡的一個檔案,一個作業産生的桶(輸出檔案)數量和reduce任務個數相同。
  • 而分區表的概念,則是新的概念。分區代表了資料的倉庫,也就是檔案夾目錄。每個檔案夾下面可以放不同的資料檔案,通過檔案夾可以查詢裡面存放的檔案,但檔案夾本身和資料的内容毫無關系。
  • 桶則是按照資料内容的某個值進行分桶,把一個大檔案散列稱為一個個小檔案。
Presto查詢優化拾遺

1.2 Hash Join

主要分為兩個階段:建立階段(build phase)和探測階段(probe phase)

  • Bulid Phase:選擇一個表(一般情況下是較小的那個表,以減少建立哈希表的時間和空間),對其中每個元組上的連接配接屬性(join attribute)采用哈希函數得到哈希值,進而建立一個哈希表。
  • Probe Phase:對另一個表,掃描它的每一行并計算連接配接屬性的哈希值,與bulid phase建立的哈希表對比,若有落在同一個 bucket 的,如果滿足連接配接謂詞(predicate)則連接配接成新的表。
Presto查詢優化拾遺

1.3 Grouped Execution 原理

所謂的 Grouped Execution 針對的是那種資料擺放上進行了 bucketed 存儲的資料查詢的一種優化手段。

  • 比如兩個表 JOIN,在沒有 Grouped Execution 的時候做 hash join,一般會把小表作為 build 表發到每個 worker 上,然後針對大表的資料做 probe,這樣記憶體占用由整個小表資料量的大小決定
Presto查詢優化拾遺
  • 而如果這兩個表是 bucketed 表,而且 bucket 的字段就是這個 JOIN 的字段,那麼不同 bucket 之間的資料天然就 JOIN 不上,那麼兩個表之間的 JOIN 可以轉變為相對應的 bucket 之間的 JOIN,最後再做個彙總即可,這樣每個 worker 上的記憶體量會大大降低。

Recoverable Grouped Execution

單個 bucket 的 JOIN 如果失敗是可以單獨重試的,這也就引出了 Facebook 做的第二個優化:Lifespan 重試。通過 Lifespan 級别的重試可以提高大查詢的成功率。

Presto查詢優化拾遺

Exchange Materialization

上面兩種優化手段的應用場景是很受限的,必須要 bucketed table,而且要用 bucketed 字段 JOIN 才能有用。确實,如果故事隻到這裡就結束就很沒意思了,是以 Facebook 提出了第三個優化手段: Exchange Materialization。

我們知道 Presto 的 Exchange 本來是流式的,上遊把資料通過 HTTP 發給下遊,下遊如果處理不過來會反壓上遊,中間是沒有資料落盤的,如下圖所示:

Presto查詢優化拾遺

Exchange Materialization 則是要把資料落到盤上,并且按照 JOIN 的 key 組織成 bucketed table,那麼從這個 Exchange 節點開始往後就可以應用上面的優化了,如下圖所示:

Presto查詢優化拾遺

Exchange Materialization 啟用參數:

SET SESSION exchange_materialization_strategy='ALL';
SET SESSION partitioning_provider_catalog='hive';
SET SESSION hash_partition_count = 4096;           

Spill to Disk

預設情況下,如果查詢執行所請求的記憶體超過會話屬性 query_max_memory 或 query_max_memory_per_node,Presto 就會終止查詢。這種機制確定了查詢配置設定記憶體的公平性,防止了記憶體配置設定導緻的死鎖。當叢集中有很多小查詢時,它是有效的,但會導緻殺死不符合限制的大型查詢。

4.1 原理簡介

為了克服這種低效率,Presto 引入了可撤銷記憶體的概念。Query 可以請求不計入限制的記憶體,但記憶體管理器可以在任何時候撤銷這些記憶體。當記憶體被撤銷時,查詢運作程式将中間資料從記憶體溢出到磁盤,然後繼續處理它。

在實踐中,當叢集空閑且所有記憶體都可用時,記憶體密集型查詢可能會使用叢集中的所有記憶體。另一方面,當叢集沒有太多空閑記憶體時,同一個查詢可能被迫使用磁盤作為中間資料的存儲。與完全在記憶體中運作的查詢相比,強制溢出到磁盤的查詢的執行時間可能要長幾個數量級。

請注意,啟用 spill-to-disk 并不能保證能夠執行所有記憶體密集型的查詢。查詢運作程式仍然有可能無法将中間資料劃分為足夠小的塊,使每個塊都能裝入記憶體,進而導緻從磁盤加載資料時出現記憶體不足錯誤。

4.2 支援的操作

1、Joins

  • 當任務并發大于 1 時,将對 build 表進行分區,分區的數量等于 task.concurrency 配置值
  • 在對 build 表進行分區時,spill-to-disk 機制可以減少連接配接操作所需的峰值記憶體使用。當查詢接近記憶體限制時,build 表分區的一個子集将溢出到磁盤,join 另一側表中的記錄也會落到相同的分區。
  • 然後,就可以 one-by-one 讀取溢出的分區資料以完成連接配接操作。
  • 需要注意的是 join_spill_enabled 預設值是 false,可通過 set session join_spill_enabled=true; 啟用

2、Aggregations

  • 如果正在聚合的 group 中數量很大,則可能需要大量的記憶體。
  • 當啟用 spill-to-disk 時,如果沒有足夠的記憶體,則将中間累積聚合結果寫入磁盤。
  • 當記憶體可用時,它們被加載回來進行合并。
  • 開啟 spill_enabled 參數後無需單獨設定其他參數

Dynamic Filtering

早在2005年,Oracle 資料庫就支援比較豐富的 dynamic filtering 功能,而 Spark 和 Presto 在最近版本才開始支援這個功能。為了讓大家快速了解這個功能,我們先來看一個 Spark 的栗子:

5.1 Spark 動态分區裁減

SELECT * FROM fact_iteblog
JOIN dim_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol) 
WHERE dim_iteblog.othercol > 10           

通過 Spark 的動态分區裁減,可以将執行計劃修改成如下形式:

可見,在掃描 fact_iteblog 表時,如果能自動加上了類似于 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 的過濾條件,那麼當 fact_iteblog 表有大量的分區,而 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 查詢語句隻傳回少量的分區,這樣可以大大提升查詢性能。

5.2 Presto 動态過濾

Presto 也支援上面的功能,這個功能稱為動态過濾(Dynamic Filtering)。事實上,Presto 的動态過濾比 Spark 的動态分區裁減要豐富。因為 Spark 動态分區裁減隻能用于分區表,而 Presto 的動态過濾支援分區表和非分區表,Presto 的動态分區包含 Partition Pruning(分區表) 以及 Row filtering(非分區表)。直到 Presto 0.241,這個功能正式加入到 master 分支。

注意事項:

  • 目前 PrestoDB 的實作中隻有 Hive 資料源支援動态過濾
  • 而且非分區表動态過濾隻支援 ORC 資料格式
  • 另外,Presto 的 dynamic filtering 和 grouped_execution 不能同時使用
  • 并且需要設定以下參數
set session enable_dynamic_filtering=true;
set session hive.pushdown_filter_enabled=true;           

Alluxio Cache Service

提高 Presto 查詢延遲的一個常見優化是緩存工作集,以避免來自遠端資料源或通過慢速網絡的不必要的 I/O。Presto 利用 Alluxio 作為緩存層,主要有兩種使用方式:Alluxio File System 和 Alluxio Structured Data Service。

注意:目前我們公司 Presto 叢集尚未加入 Alluxio 緩存服務,這個優化手段可以作為一個知識儲備。

6.1 Alluxio File System

Alluxio File System 将 Presto Hive Connector 作為一個獨立的分布式緩存檔案系統服務于 HDFS 或 AWS S3、GCP、Azure blob store 等對象存儲之上。使用者可以通過檔案系統接口明确地了解緩存的使用情況并控制緩存。例如,可以預加載 Alluxio 目錄中的所有檔案,為 Presto 查詢預熱緩存,并設定緩存資料的 TTL(生存時間),以回收緩存容量。舉個栗子:

Presto查詢優化拾遺
Presto查詢優化拾遺

注意對比 Hive 表的 location,字首換成了 “alluxio://”

6.2 Alluxio Structured Data Service

Alluxio Structured Data Service 通過基于 Alluxio File System 的目錄和緩存檔案系統與 Presto 進行互動。這種方式有額外的優勢,在不用修改 Hive 表的 location 的前提下,就能無縫通路現有的 Hive 表,并通過合并小檔案或轉換輸入檔案的格式進一步性能優化。具體做法如下:

Presto查詢優化拾遺

繼續閱讀