天天看點

StreamOperator源碼簡析

聲明:本系列部落格部分是根據SGG的視訊整理而成,非常适合大家入門學習。部分文章是通過爬蟲等技術手段采集的,目的是學習分享,如果有版權問題請留言,随時删除。

StreamOperator是任務執行過程中實際處理類,上層由StreamTask調用,下層調用UserFunction,列舉一些常見的StreamOperator

  • env.addSource對應StreamSource
  • dataStream.map 對應StreamMap
  • dataStrem.window對應WindowOperator
  • dataStream.addSink對應StreamSink
  • dataStream.keyBy(..).process對應KeyedProcessOperator

StreamOperator涉及資料處理、checkpoint、狀态存儲、定時調用等,本篇幅将從源碼角度分析StreamOperator所涉及的核心調用流程。

StreamOperator層級結構

StreamOperator源碼簡析

最頂層是一個StreamOperator的接口,定義了其生命周期一些方法,繼承接口如下:

  • CheckpointListener接口,notifyCheckpointComplete表示checkpoint完成後的回調方法
  • KeyContext接口,用于目前key的切換,使用在KeyedStream中state的key設定
  • Disposable接口,dispose方法定義了資源釋放
  • Serializable序列化接口

AbstractStreamOperator是StreamOperator的基礎抽象實作類,所有的operator都必須繼承該抽象類;

AbstractUdfStreamOperator 是繼承AbstractStreamOperator的抽象實作類,其内部包含了userFunction, 在Task的生命周期都會調用userFunction中對應的方法;

OneInputStreamOperator、TwoInputStreamOperator都是繼承StreamOperator的接口,分别表示處理一個輸入、兩個輸入的Operator,包含了processElement/processWatermark/processLatencyMarker方法;

  • OneInputStreamOperator實作類StreamMap、WindowOperator、KeyedProcessOperator等單流入處理operator
  • TwoInputStreamOperator實作類CoStreamMap、KeyedCoProcessOperator、IntervalJoinOperator等多流處理operator

StreamSource表示的source端的operator,其既沒有實作OneInputStreamOperator接口也沒有實作TwoInputStreamOperator接口,由于其為流處理的源頭,不需要接受輸入

AbstractStreamOperator/AbstractUdfStreamOperator分析

  • initializeState狀态初始化,會調用到StreamOperator的initializeState方法,初始化operatorStateBackend/keyedStateBackend狀态後端,定時器恢複初始化,對于KeyedState來說會自動初始化恢複,但是operatorState則需要手動初始化恢複,是以在其繼承的AbstractUdfStreamOperator會調用userFunction的initializeState方法,前提是該userFunction需要實作CheckpointedFunction接口;
  • open初始化方法,在AbstractStreamOperator中是一個空的實作,通常可以在userFunction重寫open方法完成一些使用者初始化工作,例如建立資源連結
  • run方法,在任務正常情況下一直執行的方法,根據收到的不同資料類型調用AbstractStreamOperator不同的方法 1.如果是watermark,會調用其processWatermark方法,在該方法裡面做一些定時觸發的判斷與調用

    2.如果是LatencyMarker,其表示的是一個延時标記,同于統計資料從source到下遊operator耗時,會調用 processLatencyMarker方法,在該方法裡面會上報Histogram類型的metric, 在預設情況下該功能是關閉的

    3.如果是StreamRecord,也就是處理的業務資料,首先會調用setKeyContextElement方法,用于切換 KeyedStream類型的的statebackend的目前key, 然後調用processElement具體的資料處理流程

    4.如果是CheckpointBarrier,表示的是需要checkpoint,首先會調用prepareSnapshotPreBarrier方法,在AbstractStreamOperator中是一個空實作doNothing,然後調用snapshotState方法,在AbstractUdfStreamOperator會調用userFunction的snapshotState方法,前提是該userFunction需要實作CheckpointedFunction接口;

  • close方法,任務正常結束調用方法,在AbstractStreamOperator中是一個空的實作,通常可以在userFunction重寫close方法完成一些資源釋放;
  • dispose方法,任務正常結束或者異常結束調用的方法,如果是異常結束那麼就會調用到close方法,正常結束不會重複調用,在dispose裡面完成一些狀态最終資源的釋放;
  • setup方法,初始化做一些參數配置
  • notifyCheckpointComplete方法,在checkpoint完成時調用的方法,面向使用者實作的userFunction需要實作CheckpointListener接口

繼續閱讀