天天看點

StreamOperator源碼簡析

StreamOperator是任務執行過程中實際處理類,上層由StreamTask調用,下層調用UserFunction,列舉一些常見的StreamOperator

  • env.addSource對應StreamSource
  • dataStream.map 對應StreamMap
  • dataStrem.window對應WindowOperator
  • dataStream.addSink對應StreamSink
  • dataStream.keyBy(..).process對應KeyedProcessOperator

StreamOperator涉及資料處理、checkpoint、狀态存儲、定時調用等,本篇幅将從源碼角度分析StreamOperator所涉及的核心調用流程。

StreamOperator層級結構

StreamOperator源碼簡析

 最頂層是一個StreamOperator的接口,定義了其生命周期一些方法,繼承接口如下:

  • CheckpointListener接口,notifyCheckpointComplete表示checkpoint完成後的回調方法
  • KeyContext接口,用于目前key的切換,使用在KeyedStream中state的key設定
  • Disposable接口,dispose方法定義了資源釋放
  • Serializable序列化接口

AbstractStreamOperator是StreamOperator的基礎抽象實作類,所有的operator都必須繼承該抽象類;

AbstractUdfStreamOperator 是繼承AbstractStreamOperator的抽象實作類,其内部包含了userFunction, 在Task的生命周期都會調用userFunction中對應的方法;

OneInputStreamOperator、TwoInputStreamOperator都是繼承StreamOperator的接口,分别表示處理一個輸入、兩個輸入的Operator,包含了processElement/processWatermark/processLatencyMarker方法;

  • OneInputStreamOperator實作類StreamMap、WindowOperator、KeyedProcessOperator等單流入處理operator
  • TwoInputStreamOperator實作類CoStreamMap、KeyedCoProcessOperator、IntervalJoinOperator等多流處理operator

StreamSource表示的source端的operator,其既沒有實作OneInputStreamOperator接口也沒有實作TwoInputStreamOperator接口,由于其為流處理的源頭,不需要接受輸入

AbstractStreamOperator/AbstractUdfStreamOperator分析

AbstractStreamOperator是所有operator的基礎抽象類,而AbstractUdfStreamOperator則是面向userFunction調用,接下來分析一下這兩個類,其大部分方法都是由StreamTask觸發調用,用于初始化或者資源釋放等操作,以StreamTask.invoke方法為入口來分析裡面的方法:

  • initializeState狀态初始化,會調用到StreamOperator的initializeState方法,初始化operatorStateBackend/keyedStateBackend狀态後端,定時器恢複初始化,對于KeyedState來說會自動初始化恢複,但是operatorState則需要手動初始化恢複,是以在其繼承的AbstractUdfStreamOperator會調用userFunction的initializeState方法,前提是該userFunction需要實作CheckpointedFunction接口;
  • open初始化方法,在AbstractStreamOperator中是一個空的實作,通常可以在userFunction重寫open方法完成一些使用者初始化工作,例如建立資源連結
  • run方法,在任務正常情況下一直執行的方法,根據收到的不同資料類型調用AbstractStreamOperator不同的方法
  1. 如果是watermark,會調用其processWatermark方法,在該方法裡面做一些定時觸發的判斷與調用
  2. 如果是LatencyMarker,其表示的是一個延時标記,同于統計資料從source到下遊operator耗時,會調用 processLatencyMarker方法,在該方法裡面會上報Histogram類型的metric, 在預設情況下該功能是關閉的
  3. 如果是StreamRecord,也就是處理的業務資料,首先會調用setKeyContextElement方法,用于切換 KeyedStream類型的的statebackend的目前key, 然後調用processElement具體的資料處理流程
  4. 如果是CheckpointBarrier,表示的是需要checkpoint,首先會調用prepareSnapshotPreBarrier方法,在AbstractStreamOperator中是一個空實作doNothing,然後調用snapshotState方法,在AbstractUdfStreamOperator會調用userFunction的snapshotState方法,前提是該userFunction需要實作CheckpointedFunction接口;
  • close方法,任務正常結束調用方法,在AbstractStreamOperator中是一個空的實作,通常可以在userFunction重寫close方法完成一些資源釋放;
  • dispose方法,任務正常結束或者異常結束調用的方法,如果是異常結束那麼就會調用到close方法,正常結束不會重複調用,在dispose裡面完成一些狀态最終資源的釋放;