參考: https://www.jianshu.com/p/179beca9f307
interval join :兩條資料流+between邊界+過期資料清理
demo:
下面看下源碼實作
intervalJoin 屬于 KeyedStream,源碼部分也在KeyedStream中
- KeyedStream的intervalJoin建立并傳回IntervalJoin
- IntervalJoin提供了between操作,用于設定interval的lowerBound及upperBound,即between邊界區間(預設閉區間),between操作建立并傳回IntervalJoined
- IntervalJoined提供了process操作,process操作裡頭建立了IntervalJoinOperator
在這裡return的是 left.connect(right).keyBy(keySelector1, keySelector2) , 因為left.connect(right)傳回的是 ConnectedStreams, keySelector1/2是指demo中兩個資料流的keyBy條件,類似與flinksql join中的 on
換類了,現在是IntervalJoinOperator類中了,重點也就在IntervalJoinOperator中
- IntervalJoinOperator繼承了AbstractUdfStreamOperator抽象類,實作了TwoInputStreamOperator及Triggerable接口
- IntervalJoinOperator重寫了AbstractUdfStreamOperator(StreamOperator定義)的open、initializeState方法,在initializeState中,建立了leftBuffer和rightBuffer兩個MapState,key為Long表示時間時間戳,List<BufferEntry<T>>表示該時刻到來的資料記錄,當左流和右流有資料到達時,會分别調用processElement1()和processElement2()方法,它們都調用了processElement()方法
- processElement方法中實作了 (1)根據資料時間戳和watermark判斷資料是否late,如果late就return,否則就添加到buffer中 (2)周遊otherBuffer,(也就是左右mapState,一個是buffer,另外個就是otherBuffer),判斷時間是否滿足要求,即判斷有資料在邊界内(ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound),如果在就調用 collect方法 (3)collect方法也就是我們demo中定義的process方法,對兩條流比對上的資料進行資料 (4)計算這條資料的cleanupTime,調用internalTimerService.registerEventTimeTimer注冊清理該資料的timer,到時候就把這條資料從state裡清除
所有intervalJoin的很多功能都是在IntervalJoinOperator中實作的
整體總結下來就是 IntervalJoin 裡設定上下邊界,後在IntervalJoined中進行雙流connect,然後在IntervalJoinOperator中實作資料的比對以及設定資料的過期定時器timer等等。
https://blog.csdn.net/qq_34864753/article/details/111183556 在這篇部落格裡介紹了intervalJoin的watermark生成以及狀态的清理機制,有興趣的可以看看