天天看點

Flink Interval Join源碼了解

參考: https://www.jianshu.com/p/179beca9f307

interval join :兩條資料流+between邊界+過期資料清理

demo:

Flink Interval Join源碼了解

 下面看下源碼實作

intervalJoin 屬于 KeyedStream,源碼部分也在KeyedStream中

Flink Interval Join源碼了解
  • KeyedStream的intervalJoin建立并傳回IntervalJoin
  • IntervalJoin提供了between操作,用于設定interval的lowerBound及upperBound,即between邊界區間(預設閉區間),between操作建立并傳回IntervalJoined
  • Flink Interval Join源碼了解
  •  IntervalJoined提供了process操作,process操作裡頭建立了IntervalJoinOperator
  • Flink Interval Join源碼了解

在這裡return的是 left.connect(right).keyBy(keySelector1, keySelector2) , 因為left.connect(right)傳回的是 ConnectedStreams, keySelector1/2是指demo中兩個資料流的keyBy條件,類似與flinksql join中的 on

換類了,現在是IntervalJoinOperator類中了,重點也就在IntervalJoinOperator中

Flink Interval Join源碼了解
  • IntervalJoinOperator繼承了AbstractUdfStreamOperator抽象類,實作了TwoInputStreamOperator及Triggerable接口
  • IntervalJoinOperator重寫了AbstractUdfStreamOperator(StreamOperator定義)的open、initializeState方法,在initializeState中,建立了leftBuffer和rightBuffer兩個MapState,key為Long表示時間時間戳,List<BufferEntry<T>>表示該時刻到來的資料記錄,當左流和右流有資料到達時,會分别調用processElement1()和processElement2()方法,它們都調用了processElement()方法
  • processElement方法中實作了 (1)根據資料時間戳和watermark判斷資料是否late,如果late就return,否則就添加到buffer中     (2)周遊otherBuffer,(也就是左右mapState,一個是buffer,另外個就是otherBuffer),判斷時間是否滿足要求,即判斷有資料在邊界内(ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound),如果在就調用 collect方法   (3)collect方法也就是我們demo中定義的process方法,對兩條流比對上的資料進行資料  (4)計算這條資料的cleanupTime,調用internalTimerService.registerEventTimeTimer注冊清理該資料的timer,到時候就把這條資料從state裡清除

所有intervalJoin的很多功能都是在IntervalJoinOperator中實作的

整體總結下來就是 IntervalJoin 裡設定上下邊界,後在IntervalJoined中進行雙流connect,然後在IntervalJoinOperator中實作資料的比對以及設定資料的過期定時器timer等等。

https://blog.csdn.net/qq_34864753/article/details/111183556 在這篇部落格裡介紹了intervalJoin的watermark生成以及狀态的清理機制,有興趣的可以看看

繼續閱讀