天天看點

Flink随資料流動的四種StreamElement簡述

Flink為了完成不同的任務,在DataStream中流動的不止是我們的業務資料StreamRecord,還有其餘三種标記,分别是Watermark,StreamStatus,LatencyMarker。這四個類繼承自StreamElement。

(注:有些人可能有疑惑,checkpoint barrier不是插入了資料流中嗎?是的,checkpoint barrier也是随着資料流動,但是它不屬于StreamElement的體系,Flink對其進行單獨處理)

下面分别對四種類型進行介紹

StreamRecord

StreamRecord就是業務資料的一個包裝類,在其value屬性中存儲從資料源中擷取的資料。同時為這條資料附加了timestamp和hasTimestamp屬性。

Watermark

watermark大家都比較熟悉了,在使用事件時間語義時,watermark起到推進時間進度,觸發視窗計算的作用。

當某個watermark流動到某個operator時,表示小于watermark的資料都已經到達,可以觸發視窗計算,觸發定時器。也可以根據watermark處理遲到資料。如果出現watermark的值為Long.MAX_VALUE時,代表對應的資料源關閉了。

StreamStatus

當我們的程式中有多條輸入流時,由于watermark更新時會選擇其中的最小值,如果有一條輸入流沒有資料了,水印也就不會更新,那麼下遊就無法觸發計算,為了解決這個問題,引入了StreamStatus。

StreamStatus用來表示某一條資料流的狀态:

StreamStatus.ACTIVE_STATUS 表示目前資料流處于活動狀态

StreamStatus.IDLE_STATUS 表示目前輸入流處于閑置狀态

當一條輸入流處理IDLE狀态時,其不會再向下遊發送資料和水印,下遊在進行水印更新的時候,會忽略掉這條閑置流。這樣就不會影響正常的資料處理邏輯。下遊可以繼續工作。

當閑置的資料流有了新的資料,其就會轉變為ACTIVE_STATUS。因為水印是不能倒退的,是以隻有這條流的watermark大于等于上次觸發視窗計算的watermark時,才會參與本次的最小水印選擇。

LatencyMarker

Flink為了監控資料處理延遲,在source端周期性插入LatencyMarker,其内部的markedTime屬性表示了建立的時間。随着資料的流動,每當一個operator收到LatencyMarker時,就會用目前時間 - marker.getMarkedTime() 将結果報告給名額收集系統,然後把這個LatencyMarker向下遊轉發。

要注意使用LatencyMarker探測出的延遲并不是端到端延遲,因為其轉發沒有經過使用者邏輯。LatencyMarker會直接emit。由于業務資料處理和LatencyMarker的處理是同步的,随着資料流的流動,其結果和端到端延遲近似。

繼續閱讀