作者|任慶盛
關于Flink Connector的詳解,本文将通過四部分展開介紹:
- 連接配接器
- Source API
- Sink API
- Collector的未來發展
一. 連接配接器Connecter的概述-Flink與外部系統的橋梁
1. 連接配接器 Connector

Flink的資料重要的來源和去向
連接配接器是Flink與外部系統間溝通的橋梁。如:我們需要從Kafka裡讀取資料,在Flink裡把資料處理之後再重新寫回到HIVE、elastic search這樣的外部系統裡去。
處理流程中的事件控制:事件處理水印(watermark),檢查點對齊記錄
負載均衡:根據不同并發的負載對資料分區進行合理的配置設定
資料解析與序列化:我們的資料在外部系統裡可能是以二進制的形式存儲的,在資料庫裡可能是以各種列的形式來存儲的。我們再把它讀到Flink裡後,需要對他進行一個解析,之後才能夠進行後面的資料處理。是以我們同樣在寫回外部系統的時候也需要對資料進行一個序列化的操作-把它轉換成外部系統裡對應的存儲格式來進行存儲。
上圖顯示的是一項十分典型的例子。
我們首先從kafka裡通過Source讀取其中的部分記錄。然後把這些記錄送到Flink當中的一些算子進行對應的運算,再通過Sink寫出到elastic search當中去,是以Source和Sink在這個Flink作業的兩端起到了一個接口的作用。
二. Source API- Flink資料的入口
1. Source 接口演進
Source在Flink 1.10版本之前是左側的這兩個接口:SourceFunction API(用來處理流式資料),InputFormat API( 用來處理批式資料)。在Flink 1.10之後,社群引入了一個新的Source API,對整個的Source進行了重構。那麼為什麼我們社群要做這樣的一個工作呢?
批流實作不一緻:生态不斷壯大的過程中,舊的API暴露出來一些問題。其中最直覺的問題就是批流實作的不一緻。
接口簡單但實作複雜:之前的API可能接口實作比較簡單,但實際上對于開發者來講,在實作這個接口的時候,所有的邏輯、所有的操作實作起來是非常複雜的,對于開發者來講也不夠友好。
是以,基于這些問題,在FLIP-27中提出了一個新的Source API的設計。其特點有二:
批流統一:流式資料處理和批式資料處理不需要再維護兩套代碼,用一套代碼就夠了。
實作簡單:Source API定義了很多概念上的抽象,雖然說這些抽象看起來會比較複雜,但是實際上是簡化了開發者操作的開發者開發工作。
2. 核心抽象
1) 記錄分片(Split)
有編号的記錄集合
以Kafka來舉例子。Kafka的分片既可以定義成一整個分區,也可以定義成一個分區裡的某一部分。比如說我從 offset 為 100的資料開始消費,到200号之間我們定義為一個分片;201~300定義成另外一個分片,這樣也是可以的。隻要他是一個記錄的集合、我們給他一個唯一的編号,我們就可以定義這樣的一個記錄分片。
進度可追蹤
我們需要在這個分片當中記錄現在處理到了哪一個位置,我們在記錄檢查點的時候需要知道目前處理了哪些東西,便于一旦出現了故障,可以直接從故障中恢複起來。
記錄分片的所有資訊
以Kafka舉例來講,一個分區的起始和終止位點等資訊是都要包含在整個記錄分片裡的。因為我們在做Checkpoint的時候也是以記錄分片為機關的,是以說記錄分辨裡的資訊也應該是自洽的。
2) 記錄分片枚舉器(Split Enumerator)
發現記錄分片:檢測外部系統中所存在的分片
配置設定記錄分片:Enumerator是處于一個協調者的角色存在的。它需要給我們的Source讀取器配置設定任務。
協調Source讀取器:例如某些讀取器的進度可能太快了,此時便要告訴他稍微慢一點兒來保證watermark大緻是一緻的。
3) Source讀取器(Source Reader)
從記錄分片讀取資料:根據枚舉器配置設定的記錄分片來讀取資料
事件時間水印處理:需要從我們從外部系統中讀下來的資料裡提取事件時間,然後做出對應的水印發送的操作。
資料解析:對從外部系統中讀取到的資料進行反序列化,發送至下遊算子
3. 枚舉器-讀取器架構
分片枚舉器是運作在Job Master上面的,Source讀取器是運作在Task Executor上面的。是以,枚舉器是上司者、協調者的角色,讀取器是執行者的角色。
他們的檢查點存儲也是各自分開的,但之間會存在一些通信。比如說枚舉器是需要給讀取器來配置設定任務,也要通知讀取器後續沒有更多的分片需要處理。由于一個運作環境不一樣,他們兩個之間也不可避免地會存在一些網絡通信。便有了如下通訊棧的定義。
這個通訊棧上面确定了一些event來提供給開發者進行自己的實作。
首先,最上面這層是Source Event,留給開發者自己去定義一些客戶化的操作。比如假使現在設計的一個Source,可能reader在某些條件下可能要暫停讀取,那麼SplitEnumerator可以通過這種Source event的方式發送給Source Reader。
其次,再下面一層分别是叫Operator Coordinator,算子的協調者。它和真正去執行任務的算子通過Operator Event算子事件進行溝通的。我們已經事先定義好了一些算子事件,如添加分片、通知我們的leader沒有新的分片了等。這些對于所有的Source都通用的事件,是在Operator Event這一層來進行抽象的。
Address Lookup是用來定位消息應該發送給哪一個Operator的。因為Flink整個作業執行起來後會有一個加一個有向無環圖的。不同的算子可能運作在不同的Task Manager上面,那麼怎麼去找到對應的task、對應的算子便是這一層的任務。
由于網絡通信的存在,Job Master和Task Executor之間有一個RPC Gateway。所有的Event最終都會通過RPC Gateway、通過RPC調用的方式來進行網絡傳輸。
4. Source讀取器設計
為了簡化Source讀取器實踐步驟,減少開發者工作,社群已經為大家提供了SourceReaderBase。使用者在開發的時候可以直接繼承SourceReaderBase類,進而大大簡化開發者的一些開發工作。那麼我們接下來對 SourceReaderBase進行分析。看上去好像這張圖裡有非常多的元件,但實際上我們可以把它拆成兩部分來了解。
以中間elementQueue隊列作為界限,隊列左側用藍色标出來的部分是需要和外部系統打交道的元件,在elementQueue的右側用橙色标出來的部分是和Flink的引擎側打交道的部分。
首先,左側是由一個或者是多個分片的讀取器構成的,每一個reader通過一個Fetcher來驅動,多個Fetcher會統一由一個Fetcher Manager來管理。這裡的實作也有非常多種,比如說可以隻開一個線程、隻開這一個SplitReader,通過這一個讀取器來消費多個分區。此外,我們也可以根據需求,開多個線程-一個線程運作一個feature,進行一個reader,每個reader負責一個分區來并行的去消費資料。這些完全取決于使用者的實作、選擇。
出于性能考慮,每次SplitReader會從外部系統中取一批資料,把它們放到elementQueue裡。如圖所示,在這個藍色框子裡的是每次取下來的一批資料,而後橙色框是這一批資料下面的每條資料。
其次,elementQueue的右側是由RecordEmitter和SourceOutput組成的。RecordEmitter把每條記錄發送給下遊的另外一個SourceOutput會把記錄輸出出去。每次RecordEmitter會從中間elementQueue裡拿一批資料下來,把它們一條一條發送到下遊。由于RecordEmitter是由主線程來驅動的,該主線程現在的設計裡是用了一個無鎖的mailbox模型,它會把需要執行工作分成一個一個mail,每次工作線程從mailbox裡取出來一個mail然後來進行工作,是以我們應該注意,這裡的實作一定要是無阻塞的。
RecordEmitter每次往下遊發送資料的同時會向下遊彙報-後面會不會還有後續的資料需要處理。與此同時呢,我們也會把目前這個分片的處理進度記錄在SplitStates當中,記錄它目前的狀态、處理到了什麼位置。
當SplitEnumerator在外部系統當中發現了新的分片,它需要通過RPC調用addSplits方法将新的分片添加讀取器。在SplitFetchermanager這一側會根據之前使用者已經標明的線程模型把新分片配置設定出去(如隻有一個線程,那便會給這個線程配置設定一個新任務,再讓reader去讀取這個新的分片。如果整體是多線程的實作的,那便建立一個線程,建立一個reader來單獨去處理分片。同樣我們也要在SplitStates中記錄目前處理的這個進度是怎麼樣的。
5. 建立檢查點
接下來我們來看一下在新的Source API當中是怎麼處理檢查點的。
首先,左側我們的協調者,分片枚舉器。圖中所示,它目前手中還有一個分片(Split_5)沒有配置設定出去。中間箭頭部分是正在傳輸路上的一些分片。虛線是這個檢查點的邊界。我們可以看到二号分片已經在檢查點前面了,四号分片在檢查點後面,最下方的reader正在向SplitEnumerator請求一個新的分片。再看reader,三個reader分别已經配置設定到了某一些Split、也進行了一些處理,已經有Position了。那我們分别來看一下枚舉器和讀取器需要在檢查點的時候存儲哪些東西
· 枚舉器:未配置設定記錄分片(Split_5),已配置設定未存入檢查點記錄分片(Split_4)
· 讀取器:已被配置設定記錄分片(Split_0,1,3),記錄配置設定狀态(Split_2)
6. 三步簡單實作Source
1) Split/SplitState
- Split:外部系統分片
- SplitSerializer:序列化/反序列化Split傳遞給SourceReader
- SplitState:Split狀态,用于Checkpoint與恢複
2) SplitEnumerator
- 發現與訂閱Split
- EnumState:Enumerator的狀态,用于Checkpoint與恢複
- EnumStateSerializer:序列化/反序列化EnumState
3) SourceReader
- SplitReader:與外部系統進行資料互動的接口
- FetcherManager:選擇線程模型(目前已有)
- RecordEmitter:轉換消息類型與處理事件時間
如果我們仔細去想一下就會發現,其實這些東西絕大多數都是和外部系統打交道的,也就是說和Flink引擎本身打交道的部分很少,使用者不再需要去擔心 checkpoint 鎖的問題,多線程的問題等等,能夠把更多的開發精力來集中在開發和外部系統互動的部分上。是以說,新的Source API是通過這些抽象來大大的簡化了開發者的開發。
三. Sink API- Flink資料的出口
如果對Flink有一定的了解的話會發現它可以做到精确一次的語義,資料既不重複也不丢失。那麼為了實作這個“精确一次”Flink也做了很多的工作,其中非常重要的一點就是在Sink端實作了二階段送出。
1. 預送出階段
在預送出階段裡,由于我們的這個分布式系統一般是存在這種“協調者1+執行者n”的模式,那麼在預送出的預送出階段裡,首先我們的協調者是需要請求送出的,也就是說他需要給所有的執行者來發送請求送出的消息,進而來開始整個的二階段送出。
當執行者收到了請求送出的消息,他會做一些送出的準備工作。在所有的準備工作都做完之後,他所有的執行者會向這個協調者回複說明現在已經準備好進行下一步的送出工作了。當協調者收到了所有執行者的“可繼續”請求後,預送出階段結束,進入我們送出第二階段-送出執行階段。
2. 送出執行階段
送出者會向執行者發送決定送出的消息,執行者會把剛剛準備好的送出相關的東西來進行一個處理,來真正的去執行一個送出的動作。在完成之後會向協調者彙報一個回複的結果,回報送出是否正常執行。
一旦協調者決定進入第二個送出執行階段,所有的執行者必須要不打折扣地把指令執行下去。也就是說如果某個協調者在這一階段出了問題的話,他在恢複起來之後還是要把這個決定執行下去的。也就是說一旦決定送出,執行者便必須要把送出這一個動作貫徹下去。
如果在預送出階段某一個執行者準備送出的時候可能出現了一些故障等、沒有做正确的送出動作,那麼他可能向協調者會回應了一個錯誤,比如網絡斷了,也可能經過一段時間逾時之後協調者沒有收到這個三号執行者的回應請求,那麼協調者就會觸發第二階段的復原動作。也就是會告訴所有的執行者“這次送出嘗試失敗了,需要大家復原到之前的狀态”。而後我們的執行者便會出現一個復原動作,撤銷上一步操作。
3. 二階段送出在Flink中的做法
1)預送出階段
以這個檔案系統的Sink來舉個例子。
檔案系統的Sink在接收到了檢查點邊界之後做預送出動作(把目前的資料落盤寫到硬碟上的某一個臨時檔案裡),當預送出階段完成之後,所有的operator會向我們的協調者回複 “已經準備好進行送出”的資訊。
2) 送出執行階段
第二個階段,送出執行階段開啟。JobManager會向所有的算子發送送出執行的指令,Sink在接收到這個指令之後,便會真正的去做最後的送出動作。
我們還是以檔案系統來來舉例子,那麼剛剛我們已經說過了,在預送出的階段資料被寫到了一個臨時檔案裡,那麼在真正的進行送出的時候,臨時檔案會被按照我們事先定義好的這個名字規範重命名,相當于實作了送出。
這裡要注意,臨時檔案這一設定并非無用,它對後續可能發生的復原等狀況具有鋪墊性的作用。我們是巧妙利用了二階段送出的機制來保障精确一次的語義。
4. Sink模型
1) Writer:負責在寫入或預送出的階段,把上遊源源不斷的資料寫到中間的某一個狀态裡去。
2) Committable:上述所說的“中間的狀态”,是可以進行這個送出操作的元件。
3) Committer:把Committable真正的去送出上去
4) Global Commiter:全局送出器。這個元件是可選的、取決于你的外部系統。例:Iceberg。
四. 未來發展
- 完善新Source
因為Source和Sink剛剛推出不久,是以說相對來講還是存在一些問題的。有些開發者可能會有一些新的需求、需要新的更新與提升。目前已經算一個相對穩定的狀态,但還是需要去不斷地完善。
- 遷移現有連接配接器至新API
随着流批一體連接配接器的不斷推進,所有的連接配接器會遷移到新的API上。
- 連接配接器測試架構
連接配接器測試架構嘗試去給所有的connector提供一個相對來講比較一緻、統一的測試标準。測試開發者不再需要去自己寫一些case、考慮各種各樣的測試環境、測試場景等等。讓我們的開發者能夠像搭積木一樣快速的用不同的場景,不同的用例來測試自己的代碼,進而把更多的開發精力集中在開發這個本身的邏輯上面,大大減少開發者的測試負擔。這也是Source API,Sink API和後續的framework研發的一緻目标。是為了讓連接配接器開發更加簡單、門檻更低,進而吸引更多的開發者為Flink生态做貢獻。
活動推薦:
僅需99元即可體驗阿裡雲基于 Apache Flink 建構的企業級産品-實時計算 Flink 版!點選下方連結了解活動詳情:
https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506