天天看點

flink watermark介紹及watermark的視窗觸發機制

作者:Java機械師

Flink的三種時間

在談watermark之前,首先需要了解flink的三種時間概念。在flink中,有三種時間戳概念:Event Time 、Processing Time 和 Ingestion Time。其中watermark隻對Event Time類型的時間戳有用。這三種時間概念分别表示:

Processing time

處理時間,指執行算子操作的機器的目前時間。當基于處理時間運作時,所有關于時間的操作(如時間視窗)都将使用執行算子操作的主機的本地時間。例如,當時間視窗為一小時,如果應用程式在9:15 am開始運作,則第一個視窗将包括在9:15 am到10:00 am之間被處理的事件,下一個視窗将包含在10:00 am到11:00 am之間被處理的事件,依此類推。

處理時間是最簡單的時間概念,不需要流和機器之間的協調。它提供了最佳的性能和最低的延遲。但是,在分布式和異步環境中,處理時間不能提供确定性,因為它容易受到上流系統(例如從消息隊列)到達Flink的速度、flink内部operators之間互動的速度,以及中斷(排程或其他情況)等因素的影響。

Event Time

事件時間,是每個event在其生産裝置上産生的時間,即元素在到達flink之前,本身就自帶的時間戳。

是以說,Event Time的時間戳取決于資料,而與其他時間無關。使用Event Time,必須在從執行環境中先引入EventTime的時間屬性。如:

java複制代碼val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調用時刻開始給env建立的每一個stream追加時間特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
           

然後通過Dstream的assignTimestampsAndWatermarks方法指定event time時間戳,具體操作不做贅述。

在理想情況下,事件時間是有序的。但實際上,由于分布式操作,以及網絡延遲等原因,event可能不是按照event time的順序到達的。是以flink對處理亂序資料的方案是提供一個允許延遲時間,在允許延遲時間内到達的元素将會重新觸發一次計算。這個延遲時間時相對event time而不是其他時間的,而event time不是由flink決定的,那麼如何判斷目前的event time到底時多少呢?flink通過一個watermark來确定與維護目前event time的最大值。這也是本文将會在後面重點解釋的。

Ingestion time

Ingestion time是event進入Flink的時間,即執行source操作時的時間。

Ingestion time從概念上講介于Event Time和Processing time之間。

與Processing time相比 ,它花費的資源會稍微多一些,但結果卻更可預測。由于 Ingestion time使用穩定的時間戳(僅在addSource處配置設定了一次),是以對記錄的不同視窗操作将引用相同的時間戳,而在Processing time中,每個視窗操作都會更新事件的Processing time,是以可能一個上遊視窗中的記錄會配置設定給不同的下遊視窗(基于本地系統時鐘和任何可能的延誤)。

與Event Time相比,Ingestion time程式無法處理任何亂序事件或遲到的資料,但是程式不必指定如何生成watermarks。

下圖為三種時間語義的圖解:

flink watermark介紹及watermark的視窗觸發機制

watermark

用我自己的語言總結,在flink的視窗計算中,的watermark就是觸發視窗計算的一種機制。 那麼,watermark到底是以怎樣的一種形式存在的呢?實際上,watermark就是一種特殊的event,它被參雜在Dstream中,watermark由flink的某個操作生成後,就在整個程式中随event一同流轉,如下圖所示:

flink watermark介紹及watermark的視窗觸發機制

以下是watermark的代碼,可以看出watermark的就是一個流元素,僅包含一個時間戳屬性:

java複制代碼public final class Watermark extends StreamElement {
 
	/** The watermark that signifies end-of-event-time. */
	public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
 
	// ------------------------------------------------------------------------
 
	/** The timestamp of the watermark in milliseconds. */
	private final long timestamp;
 
	/**
	 * Creates a new watermark with the given timestamp in milliseconds.
	 */
	public Watermark(long timestamp) {
		this.timestamp = timestamp;
	}
 
           

watermark的視窗觸發機制

watermark會根據資料流中event的時間戳發生變化。通常情況下,event都是亂序的,不按時間排序的。watermark的計算邏輯為:目前最大的 event time - 最大允許延遲時間(MaxOutOfOrderness)。在同一個分區内部,當watermark大于或者等于視窗的結束時間時,才能觸發該視窗的計算,即watermark>=windows endtime。如下圖所示:

flink watermark介紹及watermark的視窗觸發機制

根據上圖分析: MaxOutOfOrderness = 5s,視窗的大小為:10s。 watermark分别為:12:08、12:15、12:30 計算邏輯為:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s

  • 對于 [12:00,12:10) 視窗,需要在WM=12:15時,才能被觸發計算,參與計算的event為:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不參與計算,因為還未到視窗時間,也就是event time 為 [12:00,12:10] 視窗内的event才能參與計算。 注意,如果過了這個視窗期,再收到 [12:00,12:10] 視窗内的event,就算超過了最大允許延遲時間(MaxOutOfOrderness),不會再參與計算,也就是資料被強制丢掉了。
  • 對于 [12:10,12:20] 和 [12:20,12:30] 視窗,會在WM=12:30時,被同時觸發計算,參與**[12:10,12:20]** 視窗計算的event為:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);參與 [12:20,12:30] 視窗計算的event為:event(12:20)/event(12:20);在這個過程中event(12:05)會被丢棄,不會參與計算,因為已經超了最大允許延遲時間(MaxOutOfOrderness)

遲到的事件

在介紹watermark時,提到了現實中往往處理的是亂序event,即當event處于某些原因而延後到達時,往往會發生該event time < watermark的情況,是以flink對處理亂序event的watermark有一個允許延遲的機制,這個機制就是最大允許延遲時間(MaxOutOfOrderness),允許在一定時間内遲到的event仍然視為有效event。

并行流的Watermarks

watermark可以在source處生成(也可以在source後通過其他算子生成,如map、filter等),如果source有多個并行度,那麼每個并行度會單獨生成一個watermark,這些watermark定義了各分區的event time。 當并行度發生變化(即上遊的一個分區可能被下遊多個分區使用時),每個分區的watermark是會廣播至下遊的每個分區的,如一些聚合多個流的操作,如 keyBy(…) 或者partition(…),此類操作的watermark是在所有輸入流中取最小的watermark。當帶有watermark的流通過此類算子時,會根據每個分區的watermark來更新watermark。

舉個例子:當上遊并行度數為4,下遊的某個分區的視窗中的watermark如下:

flink watermark介紹及watermark的視窗觸發機制
  1. 當已到達的watermark分别為2、4、3、6時,視窗中的watermark為2,觸發watermark為2的對應視窗計算,并将watermark=2廣播至下遊。
  2. 當第一個視窗的watermark被更新為4時,所有分區中已到達最小的watermark是3,則将視窗的watermark更新為3,觸發對應視窗的計算,并将watermark=3廣播至下遊。
  3. 當第二個分區的watermark被更新為7,所有分區中已到達最小的watermark還是3,不做處理。
  4. 當第三個分區的watermark被更新為6,所有分區中已到達最小的watermark是4,則将視窗的watermark更新為4,觸發對應視窗的計算,并将watermark=4廣播至下遊。

下圖顯示了event和watermark在一個并行流的示例,以及算子如何跟蹤事件時間的:

flink watermark介紹及watermark的視窗觸發機制

watermark配置設定器

當watermark完全基于event time時,如果沒有元素到達,則watermark不會被更新,這就說明,當一段時間沒有元素到達,則在這個時間間隙内,watermark不會增加,那麼也不會觸發視窗計算。顯然,如果這段時間很長的話,那麼該視窗中已經到達的元素将會等待很久才會被輸出計算結果。

為了避免這種情況,可以使用周期性的watermark配置設定器(AssignerWithPeriodicWatermarks 下面馬上提到),這些配置設定器不僅僅基于event time進行配置設定。比如,可以使用一個配置設定器,當一段時間沒有接收到新的event時,則将目前時間作為watermark。

watermark的兩種配置設定器,flink生成watermark有兩種機制:

  • AssignerWithPeriodicWatermarks :配置設定時間戳并定期生成watermark(可以取決于event time,或基于處理時間)。
  • AssignerWithPunctuatedWatermarks:配置設定時間戳并根據每一個元素生成watermark(每來一個元素都進行一次判斷,相更消耗性能)

通常情況下會使用第一種機制,原因除了更節省性能外,在上面的配置設定器中也有提到。

下面分别對兩種機制進行介紹。

AssignerWithPeriodicWatermarks

對每個元素都調用extractTimestamp方法擷取時間戳,并維護一個最大時間戳。通過ExecutionConfig.setAutoWatermarkInterval(...)定義生成watermark的間隔(每n毫秒) 。根據這個間隔,周期性調用配置設定器的getCurrentWatermark()方法,為watermark配置設定值。

在flink自帶的BoundedOutOfOrdernessGenerator配置設定器中, getCurrentWatermark是定期将目前watermark更新為最大時間戳減去允許延遲時間的值。

以下是兩個使用AssignerWithPeriodicWatermarks 生成的時間戳配置設定器的簡單示例:

java複制代碼/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
    val maxOutOfOrderness = 3500L // 3.5 seconds
 
    var currentMaxTimestamp: Long = _
 
    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }
 
    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}
 
/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
    val maxTimeLag = 5000L // 5 seconds
 
    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }
 
    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}
           

AssignerWithPunctuatedWatermarks

根據每個元素的event time生成watermark,通過extractTimestamp(...)方法為元素配置設定時間戳,通過checkAndGetNextWatermark(...)檢查元素的watermark并更新watermark。

checkAndGetNextWatermark(...)方法的第二個參數是extractTimestamp(...) 傳回的時間戳,根據這個時間戳決定是否要生成watermark。每當checkAndGetNextWatermark(...) 方法傳回一個非空watermark,并且該watermark大于上一個watermark時,就會更新watermark。

java複制代碼class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
 
	override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
		element.getCreationTime
	}
 
	override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
		if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
	}
}           

繼續閱讀