天天看點

OpenMLDB: 一文了解視窗傾斜優化技術細節

​​openmldb​​是針對ai場景優化的開源資料庫項目,實作了資料與計算一緻性的離線mpp場景和線上oltp場景計算引擎。mpp引擎可基于spark實作,并通過拓展spark源碼實作數倍性能提升。本文主要解釋​​openmldb​​如何基于spark來解決視窗資料的傾斜問題。

資料傾斜是在大資料處理場景下常見的一種現象,它由某一分區資料量過大造成。資料傾斜會導緻傾斜分區與其他分區的運算時間産生巨大差距,換句話說就是傾斜資料分區的計算任務與其cpu資源嚴重不比對。最終會造成多等一的情況——多個小資料量的分區計算完畢後等待傾斜的大資料量分區,隻有傾斜分區計算完畢才能輸出結果。這對效率來說是巨大的災難。

在機器學習的特征計算中,涉及到很多的視窗計算。在視窗計算下,如果出現單一key資料量過大,也會導緻某一分區資料過多,進而産生資料傾斜問題。而傳統資料傾斜中分區優化的方案,如:資料加字首再分區,是不适合視窗計算場景的。它會導緻視窗計算場景下最終計算結果錯誤。是以openmldb提出了一種基于spark的視窗資料傾斜分區優化方案——在擴充視窗資料後,再根據分區鍵以及時間片對傾斜資料進行再分區。

OpenMLDB: 一文了解視窗傾斜優化技術細節

在上圖的資料中,因為主鍵“gender”隻有兩個值,離線計算最好情況下隻能将資料劃分到兩個partition,即并行度隻有2。此時同樣的分區資源,計算任務的資料量差距卻很大。在後續的計算中,“male”所在的分區計算的時間必然比“female”所在分區計算的時間大。當傾斜分區資料量變大的時候,這個時間差距還會被不斷拉大。且由于spark的底層執行裡每個partition隻有一個thread,這使得整個stage周期裡隻有兩個thread在工作,還有很多其他的thread一直處于空閑狀态,這也會導緻嚴重的性能浪費。

對于傾斜資料的優化,解決根本問題的方法就是對傾斜資料進行再分區,把原本一個傾斜分區内龐大的資料塊,分散成多個小的資料分區。以此來達到對大資料進行拆分進而提高計算效率的目的。

在常見的資料再分區政策中,有通過分區鍵加上不同字首進而進行再分區的政策,也有通過多加幾列作為分區鍵進行再分區的政策。但是這些簡單的再分區方案,在視窗計算中,都會造成計算錯誤。

如果采用資料加字首再分區的簡單分區優化方案,原本同一個partition下的資料會被拆分到不同的partition。而視窗計算涉及到資料之間滑動取值的情況,是以如果隻是簡單的将分區内的資料再拆分,視窗計算将無法取到原本相鄰的資料,這會導緻最終計算結果的錯誤。

OpenMLDB: 一文了解視窗傾斜優化技術細節

整體思路

我們的方案總體思路是在上述傾斜資料再分區的基礎上,進一步保證各個再分區的資料塊在視窗計算時結果正确。方案裡采用的方式是在每個再分區的資料塊中,根據視窗需要滑動的資料條數,進行一定的視窗資料擴充。

在優化中,總體上采用的就是再分區+視窗補充的repartition政策來對資料進行分區。思路是采用空間換時間的政策,優點是計算時間短性能高,缺點是補充的視窗資料會造成一定的資料備援,導緻占用更多記憶體。

下面詳細介紹本方案的技術細節,傾斜優化方案具體的實作主要分為五步,以下面sql為例。

OpenMLDB: 一文了解視窗傾斜優化技術細節

這一步需要對總體的資料做一個評估,統計出一些相關的名額,比如資料劃分的分界線,以及partition内資料的條數等。參數介紹如下。

<col>

參數名

解釋

quantile

對于資料的拆分是通過傳入的“quantile”參數來确定的,并且我們采用的是n等分的機制,quantile = 4代表了四等分(不一定能保證嚴格四等分)。

根據“quantile”參數,我們就可以劃分出來不同值的分界線“percentile_i”,根據資料相對于分界線的值可以劃分出不同的資料塊。

precentile

根據(“time“)列(sql中視窗裡order by的值)劃分資料塊的分界線,percentile_i為第i條分界線,(”time“)列符合(percentile_i,percentile_i+1] 的資料為第i個資料塊。

特殊情況:第一個資料塊為(0,percentile_1],最後一塊為(percentile_n,無窮大)

總體來說,第一步的資料評估是對資料各項名額進行統計和計算,并在統計後,對資料進行判斷以及處理,但由于涉及到全量資料的周遊,會比較耗時。對此我們也有一個額外的優化,我們支援通過讀取提前預處理好的distribution表來跳過第一步中統計的部分。這樣就可以在淩晨或者不需要處理業務時,執行統計任務,将資料結果統計完成,來避免使用者需要執行處理邏輯時,在第一步等待時間太久。

OpenMLDB: 一文了解視窗傾斜優化技術細節
OpenMLDB: 一文了解視窗傾斜優化技術細節

這一步根據distribution table中對資料的統計結果,來對資料進行劃分,并對劃分後的資料打上(“part_id”)和(“expanded_row"),作為不同資料塊重分區後的分區标号以及是否為擴充資料的标記。

在最開始的join中,我們采用了broadcast join,來提升join時的效率。broadcast join是spark中一種可以避免shuffle的join,一般一張大表和一張小表進行join時可以使用broadcast join,它是通過将小表的資料廣播到每個executor計算節點上,再通過map聚合的方式,來避免了資料的shuffle。在我們的表中,distribution table比input table小很多,是以剛好可以采用broadcast join。

在join之後,可以得到資料分界線,且當percentile_i為第i條分界線時,符合(percentile_i,percentile_i+1] 的資料就為第i個資料塊,采用固定政策劃分完結果之後。就可以根據劃分結果,生成新的分區标号——“part_id”。表資料介紹如下。

列名

part_id

代表了再分區的id,在addcolumntable中,“part_id”+分區鍵相同的行,就同屬于一個新的partition,如id = 1和id = 3這兩行同屬于一個分區。

expanded_row

代表了目前行是否是擴充的視窗資料,預設值為false。在下述步驟中,新擴充的視窗資料此列的值為true。

對視窗資料進行擴充是openmldb關于視窗傾斜優化中,比較核心的部分。由于資料較多,為了便于了解,下面隻展示“male”部分資料。

具體實作時,我們對每個需要擴充的資料塊進行全體視窗資料的擴充,即通過周遊,對每個需要擴充資料的重分區資料塊都擴充到第一條資料。過程圖解如下,深色代表目前周遊的分區,淺色代表目前分區需要補充的視窗資料。

1.過濾出需要擴充的資料

對于time為1和3,“part_id" = 1的第一個重分區資料塊,由于是時間最先的資料塊,上面已經沒有資料可以給他們補充了,是以會跳過。

OpenMLDB: 一文了解視窗傾斜優化技術細節

對于time為5,“part_id" = 2的第二個重分區資料塊,會将所有時間比目前資料塊前的資料都取出來,也就是“part_id" = 1的資料塊。

OpenMLDB: 一文了解視窗傾斜優化技術細節

對于time為7,“part_id" = 3的第三個重分區資料塊同理,将所有時間比目前資料塊前的資料都取出來,也就是取第一個和第二個資料塊作為擴充的視窗資料。

OpenMLDB: 一文了解視窗傾斜優化技術細節

後續第四個重分區資料塊也同上,将所有需要的資料取出,是以不再贅述。

2.更改過濾資料的id并進行union

将資料取出來之後,我們還需要将(“expanded_row")改成true,代表是擴充的視窗資料。改完(“expanded_row")之後,隻需要不斷的和原來的addcolumn table進行union,我們就完成了一個資料塊的視窗資料擴充。以第二塊資料塊為例子,下圖union table中,不同顔色代表不同的重分區資料塊,可以看到經過filter和union,第二塊資料塊已經擴充好了資料。

OpenMLDB: 一文了解視窗傾斜優化技術細節

對于其他資料塊視窗擴充的方式和第二塊資料塊方式的思路一樣,在過濾以及擴充完後,再和之前的union表進行unoin即可。

下面展示最終第四塊資料塊擴充完視窗上資料後得到的最終union table。

OpenMLDB: 一文了解視窗傾斜優化技術細節

雖然之前我們通過不同色塊來标記不同的再分區資料,但實際上,到了第四步,我們才真正的對資料進行了重分區,底層我們依賴了spark中的repartition函數進行資料重分區。在第三步後,我們可以得到最終的union table,此時隻需要根據分區鍵(“gender”)和(“part_id")進行repartition,就可以将資料拆分到不同的executor上。

OpenMLDB: 一文了解視窗傾斜優化技術細節

在第三步中,我們知道那些"expanded_row" = false的資料列是新補充進來的視窗資料,而且在實際計算中,他們是不需要參與計算的。是以隻需要對"expanded_row" = true的資料進行視窗計算,最終便可得到計算結果。

OpenMLDB: 一文了解視窗傾斜優化技術細節

值得特别說明的是,由于openmldb底層處理引擎是自主研發設計的,是以視窗計算的内部邏輯也是由openmldb實作的。下面貼出相關代碼進行講解。

對于第四步生成的repartitiondf,我們在外層調用了spark的mappartitionswithindex方法。之後對于每個分區,openmldb都建構一個computer計算單元,用來處理接下來的視窗計算。之後則是正式進行視窗計算,調用windowaggiter方法。

在windowaggiter方法裡,我們對傳進來的疊代器inputiter進行了flatmap操作,之後再檢查是否分區内資料有沒有分錯,如果有分錯的row,則會對window進行重新設定。接下來檢查orderkey沒有問題後,會對expandedflag也就是上圖中的(“expanded_row")作判斷,如果為true,則證明目前row是擴充的資料,是以computer計算單元隻進行bufferowonly操作,緩存擴充的視窗資料進記憶體,為之後真實需要計算的資料使用。如果為false,此時expandedflag也為false,computer計算單元就進行真正的計算compute,在compute方法裡會讀取之前緩存的資料并進行計算,之後會傳回處理完成的row。compute方法内部是由c實作的,有興趣的同學可以去檢視​​openmldb​​裡相關源碼。

benchmark性能測試使用kaggle公開資料集,也就是new york city taxi trip duration競賽的資料集,使用測試的sql語句如下:

對比開源版本sparksql以及開源版本openmldb進行測試,測試結果如下。

計算引擎

計算耗時

sparksql(spark 3.0.0)

950.98s

openmldb,未開啟傾斜優化

224.76s

openmldb,開啟傾斜優化,傾斜分區數2

140.74s

openmldb,開啟傾斜優化,傾斜分區數4

94.44s

可以看到,openmldb引擎即使在不開啟傾斜優化的情況下,在不同的傾斜比例中,相對于spark引擎,仍然有4倍以上的性能提升,這種性能提升主要是通過openmldb底層高效的引擎實作來保證的。而openmldb在開啟了視窗傾斜優化之後,通過調整不同的再分區數,相比openmldb不開啟傾斜優化也還能提升大約60%~140%的性能。

openmldb通過擴充視窗資料加上資料再分區的政策,實作了視窗計算下資料傾斜的優化。政策總體上采用了空間換時間的思想,即将原本集中在一個分區中的傾斜資料,在存儲空間上進行視窗資料的擴充,之後再将資料分散至多個分區并行計算,進而增加計算的并行度,來換取更短的計算時間,并在最終實作了效率的大幅提升。此外在資料測試中,我們發現越在極端的傾斜分布下,openmldb越有更好的表現。總的來說,對于視窗計算下的資料傾斜場景,openmldb實作的資料傾斜優化有着不錯的效果。

本文介紹了常見的滑動視窗資料傾斜問題,并且剖析了openmldb解決資料傾斜的實作方案以及展示最終的性能優化結果。如果你對spark優化、大規模特征計算、openmldb資料庫等感興趣,我們會分享更多類似的技術文章,歡迎大家繼續關注 ​​openmldb專欄​​ 。

繼續閱讀