天天看點

原了解析 | 深入了解 Apache Flink 的網絡協定棧

作者:Nico Kruber

翻譯:曹英傑

Flink 的網絡協定棧是組成 flink-runtime 子產品的核心元件之一,是每個 Flink 作業的核心。它連接配接所有 TaskManager 的各個子任務(Subtask),是以,對于 Flink 作業的性能包括吞吐與延遲都至關重要。與 TaskManager 和 JobManager 之間通過基于 Akka 的 RPC 通信的控制通道不同,TaskManager 之間的網絡協定棧依賴于更加底層的 Netty API。

本文将首先介紹 Flink 暴露給流算子(Stream operator)的高層抽象,然後詳細介紹 Flink 網絡協定棧的實體實作和各種優化、優化的效果以及 Flink 在吞吐量和延遲之間的權衡。

1.邏輯視圖

Flink 的網絡協定棧為彼此通信的子任務提供以下邏輯視圖,例如在 A 通過 keyBy() 操作進行資料 Shuffle :

這一過程建立在以下三種基本概念的基礎上:

▼ 子任務輸出類型(ResultPartitionType):

Pipelined(有限的或無限的):一旦産生資料就可以持續向下遊發送有限資料流或無限資料流。

Blocking:僅在生成完整結果後向下遊發送資料。

▼ 排程政策:

同時排程所有任務(Eager):同時部署作業的所有子任務(用于流作業)。

上遊産生第一條記錄部署下遊(Lazy):一旦任何生産者生成任何輸出,就立即部署下遊任務。

上遊産生完整資料部署下遊:當任何或所有生産者生成完整資料後,部署下遊任務。

▼ 資料傳輸:

高吞吐:Flink 不是一個一個地發送每條記錄,而是将若幹記錄緩沖到其網絡緩沖區中并一次性發送它們。這降低了每條記錄的發送成本是以提高了吞吐量。

低延遲:當網絡緩沖區超過一定的時間未被填滿時會觸發逾時發送,通過減小逾時時間,可以通過犧牲一定的吞吐來擷取更低的延遲。

我們将在下面深入 Flink 網絡協定棧的實體實作時看到關于吞吐延遲的優化。對于這一部分,讓我們詳細說明輸出類型與排程政策。首先,需要知道的是子任務的輸出類型和排程政策是緊密關聯的,隻有兩者的一些特定組合才是有效的。

Pipelined 結果是流式輸出,需要目标 Subtask 正在運作以便接收資料。是以需要在上遊 Task 産生資料之前或者産生第一條資料的時候排程下遊目标 Task 運作。批處理作業生成有界結果資料,而流式處理作業産生無限結果資料。

批處理作業也可能以阻塞方式産生結果,具體取決于所使用的算子和連接配接模式。在這種情況下,必須等待上遊 Task 先生成完整的結果,然後才能排程下遊的接收 Task 運作。這能夠提高批處理作業的效率并且占用更少的資源。

下表總結了 Task 輸出類型以及排程政策的有效組合:

注釋:

[1]目前 Flink 未使用

[2]批處理 / 流計算統一完成後,可能适用于流式作業

此外,對于具有多個輸入的子任務,排程以兩種方式啟動:當所有或者任何上遊任務産生第一條資料或者産生完整資料時排程任務運作。要調整批處理作業中的輸出類型和排程政策,可以參考 ExecutionConfig#setExecutionMode()——尤其是 ExecutionMode,以及 ExecutionConfig#setDefaultInputDependencyConstraint()。

2.實體資料傳輸

為了了解實體資料連接配接,請回想一下,在 Flink 中,不同的任務可以通過 Slotsharing group 共享相同 Slot。TaskManager 還可以提供多個 Slot,以允許将同一任務的多個子任務排程到同一個 TaskManager 上。

對于下圖所示的示例,我們假設 2 個并發為 4 的任務部署在 2 個 TaskManager 上,每個 TaskManager 有兩個 Slot。TaskManager 1 執行子任務 A.1,A.2,B.1 和 B.2,TaskManager 2 執行子任務 A.3,A.4,B.3 和 B.4。在 A 和 B 之間是 Shuffle 連接配接類型,比如來自于 A 的 keyBy() 操作,在每個 TaskManager 上會有 2x4 個邏輯連接配接,其中一些是本地的,另一些是遠端的:

不同任務(遠端)之間的每個網絡連接配接将在 Flink 的網絡堆棧中獲得自己的 TCP 通道。但是,如果同一任務的不同子任務被排程到同一個 TaskManager,則它們與同一個 TaskManager 的網絡連接配接将多路複用并共享同一個 TCP 信道以減少資源使用。在我們的例子中,這适用于 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,如下圖所示:

每個子任務的輸出結果稱為 ResultPartition,每個 ResultPartition 被分成多個單獨的 ResultSubpartition- 每個邏輯通道一個。Flink 的網絡協定棧在這一點的處理上,不再處理單個記錄,而是将一組序列化的記錄填充到網絡緩沖區中進行處理。每個子任務本地緩沖區中最多可用 Buffer 數目為(每個發送方和接收方各一個):

#channels  * buffers-per-channel + floating-buffers-per-gate           

單個 TaskManager 上的網絡層 Buffer 總數通常不需要配置。有關如何在需要時進行配置的詳細資訊,請參閱配置網絡緩沖區的文檔。

▼ 造成反壓(1)

每當子任務的資料發送緩沖區耗盡時——資料駐留在 Subpartition 的緩沖區隊列中或位于更底層的基于 Netty 的網絡堆棧内,生産者就會被阻塞,無法繼續發送資料,而受到反壓。接收端以類似的方式工作:Netty 收到任何資料都需要通過網絡 Buffer 傳遞給 Flink。如果相應子任務的網絡緩沖區中沒有足夠可用的網絡 Buffer,Flink 将停止從該通道讀取,直到 Buffer 可用。這将反壓該多路複用上的所有發送子任務,是以也限制了其他接收子任務。下圖說明了過載的子任務 B.4,它會導緻多路複用的反壓,也會導緻子任務 B.3 無法接受和處理資料,即使是 B.3 還有足夠的處理能力。

為了防止這種情況發生,Flink 1.5 引入了自己的流量控制機制。

3.Credit-based 流量控制

Credit-based 流量控制可確定發送端已經發送的任何資料,接收端都具有足夠的能力(Buffer)來接收。新的流量控制機制基于網絡緩沖區的可用性,作為 Flink 之前機制的自然延伸。每個遠端輸入通道(RemoteInputChannel)現在都有自己的一組獨占緩沖區(Exclusive buffer),而不是隻有一個共享的本地緩沖池(LocalBufferPool)。與之前不同,本地緩沖池中的緩沖區稱為流動緩沖區(Floating buffer),因為它們會在輸出通道間流動并且可用于每個輸入通道。

資料接收方會将自身的可用 Buffer 作為 Credit 告知資料發送方(1 buffer = 1 credit)。每個 Subpartition 會跟蹤下遊接收端的 Credit(也就是可用于接收資料的 Buffer 數目)。隻有在相應的通道(Channel)有 Credit 的時候 Flink 才會向更底層的網絡協定棧發送資料(以 Buffer 為粒度),并且每發送一個 Buffer 的資料,相應的通道上的 Credit 會減 1。除了發送資料本身外,資料發送端還會發送相應 Subpartition 中有多少正在排隊發送的 Buffer 數(稱之為 Backlog)給下遊。資料接收端會利用這一資訊(Backlog)去申請合适數量的 Floating buffer 用于接收發送端的資料,這可以加快發送端堆積資料的處理。接收端會首先申請和 Backlog 數量相等的 Buffer,但可能無法申請到全部,甚至一個都申請不到,這時接收端會利用已經申請到的 Buffer 進行資料接收,并監聽是否有新的 Buffer 可用。

Credit-based 的流控使用 Buffers-per-channel 來指定每個 Channel 有多少獨占的 Buffer,使用 Floating-buffers-per-gate 來指定共享的本地緩沖池(Local buffer pool)大小(可選3),通過共享本地緩沖池,Credit-based 流控可以使用的 Buffer 數目可以達到與原來非 Credit-based 流控同樣的大小。這兩個參數的預設值是被精心選取的,以保證新的 Credit-based 流控在網絡健康延遲正常的情況下至少可以達到與原政策相同的吞吐。可以根據實際的網絡 RRT (round-trip-time)和帶寬對這兩個參數進行調整。

注釋3:如果沒有足夠的 Buffer 可用,則每個緩沖池将獲得全局可用 Buffer 的相同份額(±1)。

▼ 造成反壓(2)

與沒有流量控制的接收端反壓機制不同,Credit 提供了更直接的控制:如果接收端的處理速度跟不上,最終它的 Credit 會減少成 0,此時發送端就不會在向網絡中發送資料(資料會被序列化到 Buffer 中并緩存在發送端)。由于反壓隻發生在邏輯鍊路上,是以沒必要阻斷從多路複用的 TCP 連接配接中讀取資料,也就不會影響其他的接收者接收和處理資料。

▼ Credit-based 的優勢與問題

由于通過 Credit-based 流控機制,多路複用中的一個信道不會由于反壓阻塞其他邏輯信道,是以整體資源使用率會增加。此外,通過完全控制正在發送的資料量,我們還能夠加快 Checkpoint alignment:如果沒有流量控制,通道需要一段時間才能填滿網絡協定棧的内部緩沖區并表明接收端不再讀取資料了。在這段時間裡,大量的 Buffer 不會被處理。任何 Checkpoint barrier(觸發 Checkpoint 的消息)都必須在這些資料 Buffer 後排隊,是以必須等到所有這些資料都被處理後才能夠觸發 Checkpoint(“Barrier 不會在資料之前被處理!”)。

但是,來自接收方的附加通告消息(向發送端通知 Credit)可能會産生一些額外的開銷,尤其是在使用 SSL 加密信道的場景中。此外,單個輸入通道( Input channel)不能使用緩沖池中的所有 Buffer,因為存在無法共享的 Exclusive buffer。新的流控協定也有可能無法做到立即發送盡可能多的資料(如果生成資料的速度快于接收端回報 Credit 的速度),這時則可能增長發送資料的時間。雖然這可能會影響作業的性能,但由于其所有優點,通常新的流量控制會表現得更好。可能會通過增加單個通道的獨占 Buffer 數量,這會增大記憶體開銷。然而,與先前實作相比,總體記憶體使用可能仍然會降低,因為底層的網絡協定棧不再需要緩存大量資料,因為我們總是可以立即将其傳輸到 Flink(一定會有相應的 Buffer 接收資料)。

在使用新的 Credit-based 流量控制時,可能還會注意到另一件事:由于我們在發送方和接收方之間緩沖較少的資料,反壓可能會更早的到來。然而,這是我們所期望的,因為緩存更多資料并沒有真正獲得任何好處。如果要緩存更多的資料并且保留 Credit-based 流量控制,可以考慮通過增加單個輸入共享 Buffer 的數量。

注意:如果需要關閉 Credit-based 流量控制,可以将這個配置添加到 flink-conf.yaml 中:taskmanager.network.credit-model:false。但是,此參數已過時,最終将與非 Credit-based 流控制代碼一起删除。

4.序列号與反序列化

下圖從上面的擴充了更進階别的視圖,其中包含網絡協定棧及其周圍元件的更多詳細資訊,從發送算子發送記錄(Record)到接收算子擷取它:

在生成 Record 并将其傳遞出去之後,例如通過 Collector#collect(),它被傳遞給 RecordWriter,RecordWriter 會将 Java 對象序列化為位元組序列,最終存儲在 Buffer 中按照上面所描述的在網絡協定棧中進行處理。RecordWriter 首先使用 SpanningRecordSerializer 将 Record 序列化為靈活的堆上位元組數組。然後,它嘗試将這些位元組寫入目标網絡 Channel 的 Buffer 中。我們将在下面的章節回到這一部分。

在接收方,底層網絡協定棧(Netty)将接收到的 Buffer 寫入相應的輸入通道(Channel)。流任務的線程最終從這些隊列中讀取并嘗試在 RecordReader 的幫助下通過 SpillingAdaptiveSpanningRecordDeserializer 将累積的位元組反序列化為 Java 對象。與序列化器類似,這個反序列化器還必須處理特殊情況,例如跨越多個網絡 Buffer 的 Record,或者因為記錄本身比網絡緩沖區大(預設情況下為32KB,通過 taskmanager.memory.segment-size 設定)或者因為序列化 Record 時,目标 Buffer 中已經沒有足夠的剩餘空間儲存序列化後的位元組資料,在這種情況下,Flink 将使用這些位元組空間并繼續将其餘位元組寫入新的網絡 Buffer 中。

4.1 将網絡 Buffer 寫入 Netty

在上圖中,Credit-based 流控制機制實際上位于“Netty Server”(和“Netty Client”)元件内部,RecordWriter 寫入的 Buffer 始終以空狀态(無資料)添加到 Subpartition 中,然後逐漸向其中填寫序列化後的記錄。但是 Netty 在什麼時候真正的擷取并發送這些 Buffer 呢?顯然,不能是 Buffer 中隻要有資料就發送,因為跨線程(寫線程與發送線程)的資料交換與同步會造成大量的額外開銷,并且會造成緩存本身失去意義(如果是這樣的話,不如直接将将序列化後的位元組發到網絡上而不必引入中間的 Buffer)。

在 Flink 中,有三種情況可以使 Netty 服務端使用(發送)網絡 Buffer:

  • 寫入 Record 時 Buffer 變滿,或者
  • Buffer 逾時未被發送,或
  • 發送特殊消息,例如 Checkpoint barrier。

▼ 在 Buffer 滿後發送

RecordWriter 将 Record 序列化到本地的序列化緩沖區中,并将這些序列化後的位元組逐漸寫入位于相應 Result subpartition 隊列中的一個或多個網絡 Buffer中。雖然單個 RecordWriter 可以處理多個 Subpartition,但每個 Subpartition 隻會有一個 RecordWriter 向其寫入資料。另一方面,Netty 服務端線程會從多個 Result subpartition 中讀取并像上面所說的那樣将資料寫入适當的多路複用信道。這是一個典型的生産者 - 消費者模式,網絡緩沖區位于生産者與消費者之間,如下圖所示。在(1)序列化和(2)将資料寫入 Buffer 之後,RecordWriter 會相應地更新緩沖區的寫入索引。一旦 Buffer 完全填滿,RecordWriter 會(3)為目前 Record 剩餘的位元組或者下一個 Record 從其本地緩沖池中擷取新的 Buffer,并将新的 Buffer 添加到相應 Subpartition 的隊列中。這将(4)通知 Netty服務端線程有新的資料可發送(如果 Netty 還不知道有可用的資料的話4)。每當 Netty 有能力處理這些通知時,它将(5)從隊列中擷取可用 Buffer 并通過适當的 TCP 通道發送它。

注釋4:如果隊列中有更多已完成的 Buffer,我們可以假設 Netty 已經收到通知。

▼ 在 Buffer 逾時後發送

為了支援低延遲應用,我們不能隻等到 Buffer 滿了才向下遊發送資料。因為可能存在這種情況,某種通信信道沒有太多資料,等到 Buffer 滿了在發送會不必要地增加這些少量 Record 的處理延遲。是以,Flink 提供了一個定期 Flush 線程(the output flusher)每隔一段時間會将任何緩存的資料全部寫出。可以通過 StreamExecutionEnvironment#setBufferTimeout 配置 Flush 的間隔,并作為延遲5的上限(對于低吞吐量通道)。下圖顯示了它與其他元件的互動方式:RecordWriter 如前所述序列化資料并寫入網絡 Buffer,但同時,如果 Netty 還不知道有資料可以發送,Output flusher 會(3,4)通知 Netty 服務端線程資料可讀(類似與上面的“buffer已滿”的場景)。當 Netty 處理此通知(5)時,它将消費(擷取并發送)Buffer 中的可用資料并更新 Buffer 的讀取索引。Buffer 會保留在隊列中——從 Netty 服務端對此 Buffer 的任何進一步操作将在下次從讀取索引繼續讀取。

注釋5:嚴格來說,Output flusher 不提供任何保證——它隻向 Netty 發送通知,而 Netty 線程會按照能力與意願進行處理。這也意味着如果存在反壓,則 Output flusher 是無效的。

▼ 特殊消息後發送

一些特殊的消息如果通過 RecordWriter 發送,也會觸發立即 Flush 緩存的資料。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,這些事件應該盡快被發送,而不應該等待 Buffer 被填滿或者 Output flusher 的下一次 Flush。

▼ 進一步的讨論

與小于 1.5 版本的 Flink 不同,請注意(a)網絡 Buffer 現在會被直接放在 Subpartition 的隊列中,(b)網絡 Buffer 不會在 Flush 之後被關閉。這給我們帶來了一些好處:

  • 同步開銷較少(Output flusher 和 RecordWriter 是互相獨立的)
  • 在高負荷情況下,Netty 是瓶頸(直接的網絡瓶頸或反壓),我們仍然可以在未完成的 Buffer 中填充資料
  • Netty 通知顯著減少

但是,在低負載情況下,可能會出現 CPU 使用率和 TCP 資料包速率的增加。這是因為,Flink 将使用任何可用的 CPU 計算能力來嘗試維持所需的延遲。一旦負載增加,Flink 将通過填充更多的 Buffer 進行自我調整。由于同步開銷減少,高負載場景不會受到影響,甚至可以實作更高的吞吐。

4.2 BufferBuilder 和 BufferConsumer

更深入地了解 Flink 中是如何實作生産者 - 消費者機制,需要仔細檢視 Flink 1.5 中引入的 BufferBuilder 和 BufferConsumer 類。雖然讀取是以 Buffer 為粒度,但寫入它是按 Record 進行的,是以是 Flink 中所有網絡通信的核心路徑。是以,我們需要在任務線程(Task thread)和 Netty 線程之間實作輕量級連接配接,這意味着盡量小的同步開銷。你可以通過檢視源代碼擷取更加詳細的資訊。

5. 延遲與吞吐

引入網絡 Buffer 的目是獲得更高的資源使用率和更高的吞吐,代價是讓 Record 在 Buffer 中等待一段時間。雖然可以通過 Buffer 逾時給出此等待時間的上限,但可能很想知道有關這兩個次元(延遲和吞吐)之間權衡的更多資訊,顯然,無法兩者同時兼得。下圖顯示了不同的 Buffer 逾時時間下的吞吐,逾時時間從 0 開始(每個 Record 直接 Flush)到 100 毫秒(預設值),測試在具有 100 個節點每個節點 8 個 Slot 的群集上運作,每個節點運作沒有業務邏輯的 Task 是以隻用于測試網絡協定棧的能力。為了進行比較,我們還測試了低延遲改進(如上所述)之前的 Flink 1.4 版本。

如圖,使用 Flink 1.5+,即使是非常低的 Buffer 逾時(例如1ms)(對于低延遲場景)也提供高達逾時預設參數(100ms)75% 的最大吞吐,但會緩存更少的資料。

6.結論

了解 Result partition,批處理和流式計算的不同網絡連接配接以及排程類型,Credit-Based 流量控制以及 Flink 網絡協定棧内部的工作機理,有助于更好的了解網絡協定棧相關的參數以及作業的行為。後續我們會推出更多 Flink 網絡棧的相關内容,并深入更多細節,包括運維相關的監控名額(Metrics),進一步的網絡調優政策以及需要避免的常見錯誤等。

via:

https://flink.apache.org/2019/06/05/flink-network-stack.html

繼續閱讀