flink時間系統系列篇幅目錄:
一、時間系統概述介紹
二、Processing Time源碼分析
三、Event Time源碼分析
四、時間系統在視窗函數中的應用分析
五、ProcessFunction 使用分析
六、執行個體講解:如何做定時輸出
ProcessFunction 是flink 提供面向使用者low-level 層級的api,通過ProcessFunction可以通路state、注冊處理時間/事件時間定時器來幫助我們完成一些比較複雜的操作,但是其有一個限制那就是隻用使用在keyedStream中,是由于根據getRuntimeContext 得到的StreamingRuntimeContext 隻提供了KeyedStateStore的通路權限,是以隻能通路keyd state, 另外根據前面的分析可知,注冊的定時器必須是與key相關,也就解釋了在ProcessFunction中隻能在keyedStream做定時器注冊。目前在flink中,提供了ProcessFunction與KeyedProcessFunction 這兩個面向使用者的api,但是ProcessFunction卻無法幫助我們注冊定時器,透過源碼(ProcessOperator)可以發現,注冊時會主動抛出UnsupportedOperationException異常。今天重點在于分析KeyedProcessFunction 是如何完成定時功能。
首先以官方文檔為例來了解其用法,完成單詞計數,并且定時輸出功能,文檔裡面是定義了一個繼承ProcessFunction 的的類,猜想這裡應該是很早之前的版本文檔。

做一個簡單的代碼流程分析:首先得到一個Tuple2[String,String]類型的資料流,然後按照第一個位置的字段進行分組,那麼相同的字段發送到下遊相同的節點,後面使用繼承ProcessFunction 的CountWithTimeoutFunction 這麼個函數,在内部定義了一個名為myState 類型為ValueState的狀态,資料處理邏輯processElement:當一條資料流入,首先判斷myState中是否存在該key的資料,不存在則計數1,存在則+1, 然後更新到myState,然後通過ctx.timerService.registerEventTimeTimer注冊一個事件時間往後推遲60s 的定時用,當達到觸發條件(watermark大于等于注冊的時間)就會觸發定時任務執行onTimer 方法,然後執行判斷并且輸出。
接下來從源碼角度去了解ProcessFunction是如何實作這個功能的,從KeyedStream 為入口檢視:
我們重點所要分析的類就是KeyedProcessOperator,它繼承了AbstractUdfStreamOperator并且實作了Triggerable接口,而AbstractUdfStreamOperator 又繼承了AbstractStreamOperator,
該operator在初始化open中定義了一個名為user-timers 的InternalTimerService服務,然後包裝在TimerService對象中,提供給ContextImpl對象與OnTimerContextImpl對象,
在看其processElement方法,将ContextImpl對象最為參數傳給了使用者ProcessFunction函數的processElement方法中,也就為使用者api層級提供了通路時間、注冊定時器的入口,
接下來看下定時器的執行邏輯,在open初始化方法中初始化InternalTimerService傳入了一個Triggerable 類型的this對象,也就是目前KeyedProcessOperator對象,由之前的分析可知最後定時調用會調用onEventTime或者onProcessingTime方法,
都會調用invokeUserFunction,