flink一緻性詳解
當在分布式系統中引入狀态時,自然也引入了一緻性問題。一緻性實際上是"正确性級别"的另一種說法,也就是說在成功處理故障并恢複之後得到的結果,與沒有發生任何故障時得到的結果相比,前者到底有多正确?舉例來說,假設要對最近一小時登入的使用者計數。在系統經曆故障之後,計數結果是多少?如果有偏差,是有漏掉的計數還是重複計數?
在流進行中,一緻性可以分為3個級别:
at-most-once: 這其實是沒有正确性保障的委婉說法——故障發生之後,計數結果可能丢失。同樣的還有udp。
at-least-once: 這表示計數結果可能大于正确值,但絕不會小于正确值。也就是說,計數程式在發生故障後可能多算,但是絕不會少算。
exactly-once: 這指的是系統保證在發生故障後得到的計數結果與正确值一緻。
flink的一個重大價值在于,它既保證了exactly-once,也具有低延遲和高吞吐的處理能力。
目前我們看到的一緻性保證都是由流處理器實作的,也就是說都是在 flink 流處理器内部保證的;而在真實應用中,流處理應用除了流處理器以外還包含了資料源(例如 kafka)和輸出到持久化系統。
端到端的一緻性保證,意味着結果的正确性貫穿了整個流處理應用的始終;每一個元件都保證了它自己的一緻性,整個端到端的一緻性級别取決于所有元件中一緻性最弱的元件。具體可以劃分如下:
内部保證 —— 依賴checkpoint
source 端 —— 需要外部源可重設資料的讀取位置
sink 端 —— 需要保證從故障恢複時,資料不會重複寫入外部系統
而對于sink端,又有兩種具體的實作方式:幂等(idempotent)寫入和事務性(transactional)寫入。
幂等寫入
所謂幂等操作,是說一個操作,可以重複執行很多次,但隻導緻一次結果更改,也就是說,後面再重複執行就不起作用了。
事務寫入
需要建構事務來寫入外部系統,建構的事務對應着 checkpoint,等到 checkpoint 真正完成的時候,才把所有對應的結果寫入 sink 系統中。
不同source和sink的一緻性保證可用下表說明:
檢查點的代碼實踐
檢查點算法:
flink檢查點算法的正式名稱是異步分界線快照(asynchronous barrier snapshotting)。該算法大緻基于chandy-lamport分布式快照算法。
檢查點是flink最有價值的創新之一,因為它使flink可以保證exactly-once,并且不需要犧牲性能。
我們知道,端到端的狀态一緻性的實作,需要每一個元件都實作,對于flink + kafka的資料管道系統(kafka進、kafka出)而言,各元件怎樣保證exactly-once語義呢?利用checkpoint機制,把狀态存盤,發生故障的時候可以恢複,保證内部的狀态一緻性
source —— kafka consumer作為source,可以将偏移量儲存下來,如果後續任務出現了故障,恢複的時候可以由連接配接器重置偏移量,重新消費資料,保證一緻性
sink —— kafka producer作為sink,采用兩階段送出 sink,需要實作一個 twophasecommitsinkfunction
内部的checkpoint機制我們已經有了了解,那source和sink具體又是怎樣運作的呢?接下來我們逐漸做一個分析。
我們知道flink由jobmanager協調各個taskmanager進行checkpoint存儲,checkpoint儲存在 statebackend中,預設statebackend是記憶體級的,也可以改為檔案級的進行持久化儲存。
執行過程實際上是一個兩段式送出,每個算子執行完成,會進行“預送出”,直到執行完sink操作,會發起“确認送出”,如果執行失敗,預送出會放棄掉。
當 checkpoint 啟動時,jobmanager 會将檢查點分界線(barrier)注入資料流;barrier會在算子間傳遞下去。
每個算子會對目前的狀态做個快照,儲存到狀态後端。對于source任務而言,就會把目前的offset作為狀态儲存起來。下次從checkpoint恢複時,source任務可以重新送出偏移量,從上次儲存的位置開始重新消費資料。
具體的兩階段送出步驟總結如下:第一條資料來了之後,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日志但标記為未送出,這就是“預送出”。jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子将狀态存入狀态後端,并通知 jobmanager。sink 連接配接器收到 barrier,儲存目前狀态,存入 checkpoint,通知 jobmanager,并開啟下一階段的事務,用于送出下個檢查點的資料。jobmanager 收到所有任務的通知,發出确認資訊,表示 checkpoint 完成。sink 任務收到 jobmanager 的确認資訊,正式送出這段時間的資料。外部kafka關閉事務,送出的資料可以正常消費了。
第一條資料來了之後,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日志但标記為未送出,這就是“預送出”jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子将狀态存入狀态後端,并通知 jobmanager
sink 連接配接器收到 barrier,儲存目前狀态,存入 checkpoint,通知 jobmanager,并開啟下一階段的事務,用于送出下個檢查點的資料
jobmanager 收到所有任務的通知,發出确認資訊,表示 checkpoint 完成
sink 任務收到 jobmanager 的确認資訊,正式送出這段時間的資料
外部kafka關閉事務,送出的資料可以正常消費了。