天天看點

Flink批處理優化器之成本估算成本估算

在基于成本的優化器中,成本估算非常重要,它直接影響着候選計劃的生成。在Flink中成本估算依賴于每個不同的運算符所提供的自己的“預算”,本篇我們将分析什麼是成本、運算符如何提供自己的預算以及如何基于預算估算成本。

Flink以類Costs來定義成本,它封裝了一些成本估算的因素同時提供了一些針對成本對象的計算方法(加、減、乘、除)以及對這些因素未知值的認定與校驗。

“cost”一詞也有譯作:開銷、代價,将其視為同義即可。

Flink目前将成本估算的因素劃分為兩大類:

可量化的成本估算因素:指代通過跟蹤一個可量化的測量名額可以計算出的成本估算因素(比如網絡或I/O的位元組數);

啟發式的成本估算因素:指代那些不可定量計算的成本估算因素,是以隻能給出一些定性的經驗值;

目前被納入成本估算的因素如下:

網絡成本;

磁盤I/O成本;

CPU成本;

啟發式網絡成本;

啟發式磁盤成本;

啟發式CPU成本;

可量化的成本估算因素可能經常會被設定為未知的(UNKNOWN,在Costs中以字面常量值-1表示)。當可量化的成本估算因素被置為未知時,所有操作的成本都将變成未知的,是以這将導緻在進行優化裁剪期間,無法決策出哪個偏向的操作。在這種情況下,啟發式的成本估算因素必須能發揮作用,它應該包含一個值來確定以不同政策執行的運算符是可比較的(甚至在無法估算的情況下)。

成本的估算借助于成本估算器(CostEstimator),CostEstimator定義了一系列增加成本的方法,這些方法有待具體的估算器實作,它們大緻分為三大類:

增加傳輸政策的成本;

增加本地政策的成本;

增加屏障的成本;

CostEstimator借助于以上這幾類方法,可完成對一個運算符總成本的計算,具體的計算邏輯封裝在方法costOperator中,該方法接收一個計劃節點(PlanNode)參數,然後按照傳輸政策和本地政策分别進行枚舉與計算。完整的方法如下:

DefaultCostEstimator繼承自CostEstimator,作為預設的(也是唯一的)成本估算器。它實作了上面計算成本邏輯中調用的一系列增加成本的addXXX方法。這些方法中的絕大部分,又依賴于預算提供器(EstimateProvider)所提供的預算資料,然後根據不同的增加成本的算法邏輯,利用這些預算資料做計算。比如我們以新增廣播成本的addBroadcastCost方法為示例,其實廣播傳輸方式說白了就是将資料複制到目前運算符的所有輸出通道中,是以這裡對成本的計算取決于複制因子,代碼如下:

前面我們談論了如何通過CostEstimator來估算成本,但其實CostEstimator是在已獲得預算資料的基礎上應用相關的算法來算出成本的,而用來估算成本的預算資料其實是來自預算提供者(EstimateProvider)。Flink批進行中所有的運算符都有一個基于優化器的内部表示,我們可以稱它們為優化器運算符,這些運算符建立于優化操作之前,且它們都必須實作EstimateProvider接口。各個優化器運算符根據自己的實作以及語義将成本估算相關的資訊暴露給外部查詢。目前被納入預算的資訊有:

輸出的資料流大小:由接口方法getEstimatedOutputSize提供;

輸出的記錄數:由接口方法getEstimatedNumRecords提供;

單個輸出記錄的平均位元組數:由接口方法getEstimatedAvgWidthPerOutputRecord提供;

在dag包下,EstimateProvider接口的繼承關系圖如下:

其中,OptimizerNode是所有被優化的運算符繼承的基類,是以所有優化器運算符都是預算提供者。OptimizerNode為絕大部分的優化器運算符提供了統一的預算計算方法computeOutputEstimates。

為什麼說是絕大部分運算符呢?因為有些運算符是特殊的,比如雙輸入端union運算符BinaryUnionNode以及疊代相關的運算符。

所有的運算符都會在優化時被周遊,Flink提供了一個編号及預算周遊器(IdAndEstimatesVisitor)來對所有運算符進行逐個周遊并計算預算,這一點展現在Optimizer的compile方法的下面這行代碼中:

而在IdAndEstimatesVisitor的postVisit方法中即調用computeOutputEstimates方法來計算預算。下面,我們來分析一下預算是如何計算得出的,總得來說computeOutputEstimates的邏輯被分為兩部分:

各個具體的運算符計算它們特定的預算;

根據編譯提示(CompilerHints)覆寫原有的預算計算;

OptimizerNode将特定運算符的預算計算定義成名為computeOperatorSpecificDefaultEstimates的抽象方法開放給派生類根據自身的特定邏輯實作。然後,如果該運算符如果設定有CompilerHints的話,将會根據CompilerHints覆寫原有的預算結果。

所謂CompilerHints,它是封裝了描述使用者函數行為的編譯提示,它可用于改進優化器對計劃的選擇。如果給某個運算符設定編譯提示的話,那麼在計算預算時,将會用它來覆寫運算符自身給出的中間結果的預算。目前,CompilerHints在優化器中沒有得到太大的機會發揮。

因為CompilerHints沒有被廣泛應用,是以預算的計算還是依賴各個運算符具體提供,是以我們關注一下computeOperatorSpecificDefaultEstimates方法。該方法完全是按照具體運算符的語義特征來實作的,我們選擇看其中的幾個實作:

二進制union運算符的預算就是累加其兩個輸入端:

Cross運算符的處理方式是:

從上面的兩個運算符對預算的計算可見,它們大都依賴上遊運算符的輸出預算。而最初的預算肯定由source運算符決定,因為隻有source才能知道資料的具體規模。

是以,我們來看一下DataSourceNode,很明顯它作為資料的輸入源,是最有可能了解初始資料集大小的運算符,為此Flink定義了一個專門用于統計的對象BaseStatistics,它用于統計對接外部的資料源的預算資訊。但并非每個資料源的資訊都能被統計到,而Flink目前也隻實作了以檔案為輸入的FileInputFormat的預算統計FileBaseStatistics。

原文釋出時間為:2017-03-28

本文作者:vinoYang

繼續閱讀