天天看點

Flink SQL 性能優化:multiple input 詳解

作者|賀小令、翁才智

執行效率的優化一直是 Flink 追尋的目标。在大多數作業,特别是批作業中,資料通過網絡在 task 之間傳遞(稱為資料 shuffle)的代價較大。正常情況下一條資料經過網絡需要經過序列化、磁盤讀寫、socket 讀寫與反序列化等艱難險阻,才能從上遊 task 傳輸到下遊;而相同資料在記憶體中的傳輸,僅需要耗費幾個 CPU 周期傳輸一個八位元組指針即可。

Flink 在早期版本中已經通過 operator chaining 機制,将并發相同的相鄰單輸入算子整合進同一個 task 中,消除了單輸入算子之間不必要的網絡傳輸。然而,join 等多輸入算子之間同樣存在額外的資料 shuffle 問題,shuffle 資料量最大的 source 節點與多輸入算子之間的資料傳輸也無法利用 operator chaining 機制進行優化。

在 Flink 1.12 中,我們針對目前 operator chaining 無法覆寫的場景,推出了 multiple input operator 與 source chaining 優化。該優化将消除 Flink 作業中大多數備援 shuffle,進一步提高作業的執行效率。本文将以一個 SQL 作業為例介紹上述優化,并展示 Flink 1.12 在 TPC-DS 測試集上取得的成果。

優化案例解析:訂單量統計

我們将以 TPC-DS q96 為例子詳細介紹如何消除備援 shuffle,該 SQL 意在通過多路 join 篩選并統計符合特定條件的訂單量。

select count(*) 
from store_sales
    ,household_demographics 
    ,time_dim, store
where ss_sold_time_sk = time_dim.t_time_sk   
    and ss_hdemo_sk = household_demographics.hd_demo_sk 
    and ss_store_sk = s_store_sk
    and time_dim.t_hour = 8
    and time_dim.t_minute >= 30
    and household_demographics.hd_dep_count = 5
    and store.s_store_name = 'ese'           
Flink SQL 性能優化:multiple input 詳解

圖 1 - 初始執行計劃

備援 Shuffle 是如何産生的?

由于部分算子對輸入資料的分布有要求(如 hash join 算子要求同一并發内資料 join key 的 hash 值相同),資料在算子之間傳遞時可能需要經過重新排布與整理。與 map-reduce 的 shuffle 過程類似,Flink shuffle 将上遊 task 産生的中間結果進行整理,并按需發送給需要這些中間結果的下遊 task。但在一部分情況下,上遊産出的資料已經滿足了資料分布要求(如連續多個 join key 相同的 hash join 算子),此時對資料的整理便不再必要,由此産生的 shuffle 也就成為了備援 shuffle,在執行計劃中以 forward shuffle 表示。

圖 1 中的 hash join 算子是一種稱為 broadcast hash join 的特殊算子。以 store_sales join time_dim 為例,由于 time_dim 表資料量很小,此時通過 broadcast shuffle 将該表的全量資料發送給 hash join 的每個并發,就能讓任何并發接受 store_sales 表的任意資料而不影響 join 結果的正确性,同時提高 hash join 的執行效率。此時 store_sales 表向 join 算子的網絡傳輸也成為了備援 shuffle。同理幾個 join 之間的 shuffle 也是不必要的。

Flink SQL 性能優化:multiple input 詳解

圖 2 - 備援的shuffle(紅框标記)

除 hash join 與 broadcast hash join 外,産生備援 shuffle 的場景還有很多,例如 group key 與 join key 相同的 hash aggregate + hash join、group key 具有包含關系的多個 hash aggregate 等等,這裡不再展開描述。

Operator Chaining 能解決嗎?

對 Flink 優化過程有一定了解的讀者可能會知道,為了消除不必要的 forward shuffle,Flink 在早期就已經引入了 operator chaining 機制。該機制将并發相同的相鄰單輸入算子整合進同一個 task 中,并在同一個線程中一起運算。Operator chaining 機制在圖 1 中其實已經在發揮作用,如果沒有它,做 broadcast shuffle 的三個 Source 節點名稱中被“->”分隔的算子将會被拆分至多個不同的 task,産生備援的資料 shuffle。圖 3 為 Operator chaining 關閉是的執行計劃。

Flink SQL 性能優化:multiple input 詳解

圖 3 - Operator chaining關閉後的執行計劃

減少資料在 TM 之間通過網絡和檔案傳輸并将算子連結合并入 task 是非常有效的優化:它能減少線程之間的切換,減少消息的序列化與反序列化,減少資料在緩沖區的交換,并減少延遲的同時提高整體吞吐量。然而,operator chaining 對算子的整合有非常嚴格的限制,其中一條就是“下遊算子的入度為 1”,也就是說下遊算子隻能有一路輸入。這就将多路輸入的算子(如 join)排除在外。

多輸入算子的解決方案:Multiple Input Operator

如果我們能仿照 operator chaining 的優化思路,引入新的優化機制并滿足以下條件:

  1. 該機制可以組合多輸入的算子;
  2. 該機制支援多路輸入(為被組合的算子提供輸入)

我們就可以将用 forward shuffle 連接配接的的多輸入算子放到一個 task 裡執行,進而消除不必要的 shuffle。Flink 社群很早就關注到了 operator chaining 的不足,在 Flink 1.11 中引入了 streaming api 層的 MultipleInputTransformation 以及對應的 MultipleInputStreamTask。這些 api 滿足了上述條件 2,而 Flink 1.12 在此基礎上在 SQL 層中實作了滿足條件 1 的新算子——multiple input operator,可以參考 FLIP 文檔[1]。

Multiple input operator 是 table 層一個可插拔的優化。它位于 table 層優化的最後一步,周遊生成的執行計劃并将不被 exchange 阻隔的相鄰算子整合進一個 multiple input operator 中。圖 4 展示了該優化對原本 SQL 優化步驟的修改。

Flink SQL 性能優化:multiple input 詳解

圖 4 - 加入 multiple input operator 後的優化步驟

讀者可能會有疑問:為什麼不在現有的 operator chaining 上進行修改,而要另起爐竈呢?實際上,multiple input operator 除了要完成 operator chaining 的工作之外,還需要對各個輸入的優先級進行排序。這是因為一部分多輸入算子(如 hash join 與 nested loop join)對輸入有嚴格的順序限制,若輸入優先級排序不當很可能造成死鎖。由于算子輸入優先級的資訊僅在 table 層的算子中有描述,更加自然的方式是在 table 層引入該優化機制。

值得注意的是,multiple input operator 不同于管理多個 operator 的 operator chaining,其本身就是一整個大 operator,而其内部運算在外界看來就是一個黑盒。Multiple input operator 的内部結構在 operator name 中完全展現,讀者在運作包含該 operator 的作業時,可以從 operator name 看到哪些算子以怎樣的拓撲結構被組合進了 multiple input operator 中。

圖 5 展示了經過 multiple input 優化後的算子的拓撲圖以及 multiple input operator 的透視圖。圖中三個 hash join 算子之間的備援的 shuffle 被移除後,它們可以在一個 task 裡執行,隻不過 operator chaining 沒法處理這種多輸入的情況,将它們放到 multiple input operator 裡執行,由 multiple input operator 管理各個算子的輸入順序和算子之間的調用關系。

Flink SQL 性能優化:multiple input 詳解

圖 5 - 經過 multiple input 優化後的算子拓撲圖

Multiple input operator 的建構和運作過程較為複雜,對此細節有興趣的讀者可以參考設計文檔[2]。

Source 也不能遺漏:Source Chaining

經過 multiple input operator 的優化,我們将圖 1 中的執行計劃優化為圖 6,圖 3 經過 operator chaining 優化後就變為圖 6 的執行圖。

Flink SQL 性能優化:multiple input 詳解

圖 6 - 經過 multiple input operator 優化後的執行計劃

圖 6 中從 store_sales 表産生的 forward shuffle(如紅框所示)表示我們仍有優化空間。正如序言中所說,在大部分作業中,從 source 直接産生的資料由于沒有經過 join 等算子的篩選和加工,shuffle 的資料量是最大的。以 10T 資料下的 TPC-DS q96 為例,如果不進行進一步優化,包含 store_sales 源表的 task 将向網絡中傳輸 1.03T 的資料,而經過一次 join 的篩選後,資料量急速下降至 16.5G。如果我們能将源表的 forward shuffle 省去,作業整體執行效率又能前進一大步。

可惜的是,multiple input operator 也不能覆寫 source shuffle 的場景,這是因為 source 不同于其它任何算子,它沒有任何輸入。Flink 1.12 為此給 operator chaining 新增了 source chaining 功能,将不被 shuffle 阻隔的 source 合并到 operator chaining 中,省去了 source 與下遊算子之間的 forward shuffle。

目前僅有 FLIP-27 source 以及 multiple input operator 可以利用 source chaining 功能,不過這已經足夠解決本文中的優化場景。

結合 multiple input operator 與 source chaining 之後,圖 7 展示了本文優化案例的最終執行方案。

Flink SQL 性能優化:multiple input 詳解

圖 7 - 優化後的執行方案

TPC-DS 測試結果

Multiple input operator 與 source chaining 對大部分作業,特别是批作業有顯著的優化效果。我們利用 TPC-DS 測試集對 Flink 1.12 的整體性能進行了測試,與 Flink 1.10 公布的 12267s 總用時相比,Flink 1.12 的總用時僅為 8708s,縮短了近 30% 的運作時間!

Flink SQL 性能優化:multiple input 詳解

圖 8 - TPC-DS 測試集總用時對比

Flink SQL 性能優化:multiple input 詳解

圖 9 - TPC-DS 部分測試點用時對比

未來計劃

通過 TPC-DS 的測試效果看到,source chaining + multiple input 能夠給我們帶來很大的性能提升。目前整體架構已完成,常用批算子已支援消除備援 exchange 的推導邏輯,後續我們将支援更多的批算子和更精細的推導算法。

流作業的資料 shuffle 雖然不需要像批作業一樣将資料寫入磁盤,但将網絡傳輸變為記憶體傳輸帶來的性能提升也是非常可觀的,是以流作業支援 source chaining + multiple input 也是一個非常令人期待的優化。同時,在流作業上支援該優化還需要很多工作,例如流算子上消除備援 exchange 的推導邏輯暫未支援,一些算子需要重構以消除輸入資料是 binary 的要求等等,這也是為什麼 Flink 1.12 暫未在流作業中推出推出該優化的原因。後續版本我們将逐漸完成這些工作,也希望更多社群的力量加入我們一起盡早的将更多的優化落地。

另外,阿裡雲實時計算團隊圍繞 Apache Flink 為核心打造的實時大資料平台,在阿裡巴巴内部提供全集團範圍的流批一體資料分析服務,同時也通過阿裡雲向外界提供 Flink 企業級雲産品,服務廣大中小企業。我們的技術團隊圍繞開源大資料技術體系建構,包括來自 Apache Flink/Hadoop/HBase/Kafka/Hive/Druid 等多個頂級開源項目的衆多 PMC/Committer 成員,加入實時計算團隊将可以與衆多技術大神共同探索大資料技術世界,感興趣的同學請速聯系:[email protected]

參考連結:

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink

[2]

https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI/