本文所讨論的計算資源是指用來執行 task 的資源,是一個邏輯概念。本文會介紹 flink 計算資源相關的一些核心概念,如:slot、slotsharinggroup、colocationgroup、chain等。并會着重讨論 flink 如何對計算資源進行管理和隔離,如何将計算資源使用率最大化等等。了解 flink 中的計算資源對于了解 job 如何在叢集中運作的有很大的幫助,也有利于我們更透徹地了解 flink 原理,更快速地定位問題。
為了更高效地分布式執行,flink會盡可能地将operator的subtask連結(chain)在一起形成task。每個task在一個線程中執行。将operators連結成task是非常有效的優化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少資料在緩沖區的交換,減少了延遲的同時提高整體的吞吐量。
上圖中将keyaggregation和sink兩個operator進行了合并,因為這兩個合并後并不會改變整體的拓撲結構。但是,并不是任意兩個 operator 就能 chain 一起的。其條件還是很苛刻的:
上下遊的并行度一緻
下遊節點的入度為1 (也就是說下遊節點沒有來自其他節點的輸入)
上下遊節點都在同一個 slot group 中(下面會解釋 slot group)
下遊節點的 chain 政策為 always(可以與上下遊連結,map、flatmap、filter等預設是always)
上遊節點的 chain 政策為 always 或 head(隻能與下遊連結,不能與上遊連結,source預設是head)
使用者沒有禁用 chain
operator chain的行為可以通過程式設計api中進行指定。可以通過在datastream的operator後面(如<code>somestream.map(..)</code>)調用<code>startnewchain()</code>來訓示從該operator開始一個新的chain(與前面截斷,不會被chain到前面)。或者調用<code>disablechaining()</code>來訓示該operator不參與chaining(不會與前後的operator chain一起)。在底層,這兩個方法都是通過調整operator的 chain 政策(head、never)來實作的。另外,也可以通過調用<code>streamexecutionenvironment.disableoperatorchaining()</code>來全局禁用chaining。
那麼 flink 是如何将多個 operators chain在一起的呢?chain在一起的operators是如何作為一個整體被執行的呢?它們之間的資料流又是如何避免了序列化/反序列化以及網絡傳輸的呢?下圖展示了operators chain的内部實作:
如上圖所示,flink内部是通過<code>operatorchain</code>這個類來将多個operator鍊在一起形成一個新的operator。<code>operatorchain</code>形成的框框就像一個黑盒,flink 無需知道黑盒中有多少個chainoperator、資料在chain内部是怎麼流動的,隻需要将input資料交給 headoperator 就可以了,這就使得<code>operatorchain</code>在行為上與普通的operator無差别,上面的operaotrchain就可以看做是一個入度為1,出度為2的operator。是以在實作中,對外可見的隻有headoperator,以及與外部連通的實線輸出,這些輸出對應了jobgraph中的jobedge,在底層通過<code>recordwriteroutput</code>來實作。另外,框中的虛線是operator chain内部的資料流,這個流内的資料不會經過序列化/反序列化、網絡傳輸,而是直接将消息對象傳遞給下遊的 chainoperator 處理,這是性能提升的關鍵點,在底層是通過 <code>chainingoutput</code> 實作的,源碼如下方所示,
注:headoperator和chainoperator并不是具體的資料結構,前者指代chain中的第一個operator,後者指代chain中其餘的operator,它們實際上都是<code>streamoperator</code>。
flink 中的計算資源通過 task slot 來定義。每個 task slot 代表了 taskmanager 的一個固定大小的資源子集。例如,一個擁有3個slot的 taskmanager,會将其管理的記憶體平均分成三分分給各個 slot。将資源 slot 化意味着來自不同job的task不會為了記憶體而競争,而是每個task都擁有一定數量的記憶體儲備。需要注意的是,這裡不會涉及到cpu的隔離,slot目前僅僅用來隔離task的記憶體。
通過調整 task slot 的數量,使用者可以定義task之間是如何互相隔離的。每個 taskmanager 有一個slot,也就意味着每個task運作在獨立的 jvm 中。每個 taskmanager 有多個slot的話,也就是說多個task運作在同一個jvm中。而在同一個jvm程序中的task,可以共享tcp連接配接(基于多路複用)和心跳消息,可以減少資料的網絡傳輸。也能共享一些資料結構,一定程度上減少了每個task的消耗。
每一個 taskmanager 會擁有一個或多個的 task slot,每個 slot 都能跑由多個連續 task 組成的一個 pipeline,比如 mapfunction 的第n個并行執行個體和 reducefunction 的第n個并行執行個體可以組成一個 pipeline。
如上文所述的 wordcount 例子,5個task可能會在taskmanager的slots中如下圖分布,2個taskmanager,每個有3個slot:
預設情況下,flink 允許subtasks共享slot,條件是它們都來自同一個job的不同task的subtask。結果可能一個slot持有該job的整個pipeline。允許slot共享有以下兩點好處:
flink 叢集所需的task slots數與job中最高的并行度一緻。也就是說我們不需要再去計算一個程式總共會起多少個task了。
更容易獲得更充分的資源利用。如果沒有slot共享,那麼非密集型操作source/flatmap就會占用同密集型操作 keyaggregation/sink 一樣多的資源。如果有slot共享,将基線的2個并行度增加到6個,能充分利用slot資源,同時保證每個taskmanager能平均配置設定到重的subtasks。
我們将 wordcount 的并行度從之前的2個增加到6個(source并行度仍為1),并開啟slot共享(所有operator都在default共享組),将得到如上圖所示的slot分布圖。首先,我們不用去計算這個job會其多少個task,總之該任務最終會占用6個slots(最高并行度為6)。其次,我們可以看到密集型操作 keyaggregation/sink 被平均地配置設定到各個 taskmanager。
怎麼判斷operator屬于哪個 slot 共享組呢?預設情況下,所有的operator都屬于預設的共享組<code>default</code>,也就是說預設情況下所有的operator都是可以共享一個slot的。而當所有input operators具有相同的slot共享組時,該operator會繼承這個共享組。最後,為了防止不合理的共享,使用者也能通過api來強制指定operator的共享組,比如:<code>somestream.filter(...).slotsharinggroup("group1");</code>就強制指定了filter的slot共享組為<code>group1</code>。
那麼多個tasks(或者說operators)是如何共享slot的呢?
我們先來看一下用來定義計算資源的slot的類圖:
抽象類<code>slot</code>定義了該槽位屬于哪個taskmanager(<code>instance</code>)的第幾個槽位(<code>slotnumber</code>),屬于哪個job(<code>jobid</code>)等資訊。最簡單的情況下,一個slot隻持有一個task,也就是<code>simpleslot</code>的實作。複雜點的情況,一個slot能共享給多個task使用,也就是<code>sharedslot</code>的實作。sharedslot能包含其他的sharedslot,也能包含simpleslot。是以一個sharedslot能定義出一棵slots樹。
接下來我們來看看 flink 為subtask配置設定slot的過程。關于flink排程,有兩個非常重要的原則我們必須知道:(1)同一個operator的各個subtask是不能呆在同一個sharedslot中的,例如<code>flatmap[1]</code>和<code>flatmap[2]</code>是不能在同一個sharedslot中的。(2)flink是按照拓撲順序從source一個個排程到sink的。例如wordcount(source并行度為1,其他并行度為2),那麼排程的順序依次是:<code>source</code> -> <code>flatmap[1]</code> -> <code>flatmap[2]</code> -> <code>keyagg->sink[1]</code> -> <code>keyagg->sink[2]</code>。假設現在有2個taskmanager,每個隻有1個slot(為簡化問題),那麼配置設定slot的過程如圖所示:
注:圖中 sharedslot 與 simpleslot 後帶的括号中的數字代表槽位号(slotnumber)
為<code>source</code>配置設定slot。首先,我們從taskmanager1中配置設定出一個sharedslot。并從sharedslot中為<code>source</code>配置設定出一個simpleslot。如上圖中的①和②。
為<code>flatmap[1]</code>配置設定slot。目前已經有一個sharedslot,則從該sharedslot中配置設定出一個simpleslot用來部署<code>flatmap[1]</code>。如上圖中的③。
為<code>flatmap[2]</code>配置設定slot。由于taskmanager1的sharedslot中已經有同operator的<code>flatmap[1]</code>了,我們隻能配置設定到其他sharedslot中去。從taskmanager2中配置設定出一個sharedslot,并從該sharedslot中為<code>flatmap[2]</code>配置設定出一個simpleslot。如上圖的④和⑤。
為<code>key->sink[1]</code>配置設定slot。目前兩個sharedslot都符合條件,從taskmanager1的sharedslot中配置設定出一個simpleslot用來部署<code>key->sink[1]</code>。如上圖中的⑥。
為<code>key->sink[2]</code>配置設定slot。taskmanager1的sharedslot中已經有同operator的<code>key->sink[1]</code>了,則隻能選擇另一個sharedslot中配置設定出一個simpleslot用來部署<code>key->sink[2]</code>。如上圖中的⑦。
最後<code>source</code>、<code>flatmap[1]</code>、<code>key->sink[1]</code>這些subtask都會部署到taskmanager1的唯一一個slot中,并啟動對應的線程。<code>flatmap[2]</code>、<code>key->sink[2]</code>這些subtask都會被部署到taskmanager2的唯一一個slot中,并啟動對應的線程。進而實作了slot共享。
本文主要介紹了flink中計算資源的相關概念以及原理實作。最核心的是 task slot,每個slot能運作一個或多個task。為了拓撲更高效地運作,flink提出了chaining,盡可能地将operators chain在一起作為一個task來處理。為了資源更充分的利用,flink又提出了slotsharinggroup,盡可能地讓多個task共享一個slot。
<a href="https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html">flink: jobs and scheduling</a>
<a href="https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html">flink concepts</a>