天天看點

Flink 算子簡介

Source Operator

Flink的API層級 為流式/批式處理應用程式的開發提供了不同級别的抽象

  • 第一層是最底層的抽象為有狀态實時流處理,抽象實作是 Process Function,用于底層處理
  • 第二層抽象是 Core APIs,許多應用程式不需要使用到上述最底層抽象的 API,而是使用 Core APIs 進行開發
    • 例如各種形式的使用者自定義轉換(transformations)、聯接(joins)、聚合(aggregations)、視窗(windows)和狀态(state)操作等,此層 API 中處理的資料類型在每種程式設計語言中都有其對應的類。
  • 第三層抽象是 Table API。 是以表Table為中心的聲明式程式設計API,Table API 使用起來很簡潔但是表達能力差
    • 類似資料庫中關系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
    • 允許使用者在編寫應用程式時将 Table API 與 DataStream/DataSet API 混合使用
  • 第四層最頂層抽象是 SQL,這層程式表達式上都類似于 Table API,但是其程式實作都是 SQL 查詢表達式
    • SQL 抽象與 Table API 抽象之間的關聯是非常緊密的
  • 注意:Table和SQL層變動多,還在持續發展中,大緻知道即可,核心是第一和第二層
Flink 算子簡介
  • Flink程式設計模型
Flink 算子簡介

Source來源

  • 元素集合
    • env.fromElements
    • env.fromColletion
    • env.fromSequence(start,end);
  • 檔案/檔案系統
    • env.readTextFile(本地檔案);
    • env.readTextFile(HDFS檔案);
  • 基于Socket
    • env.socketTextStream(“ip”, 8888)
  • 自定義Source,實作接口自定義資料源,rich相關的api更豐富
    • 并行度為1
      • SourceFunction
      • RichSourceFunction
    • 并行度大于1
      • ParallelSourceFunction
      • RichParallelSourceFunction
  • Connectors與第三方系統進行對接(用于source或者sink都可以)
    • Flink本身提供Connector例如kafka、RabbitMQ、ES等
    • 注意:Flink程式打包一定要将相應的connetor相關類打包進去,不然就會失敗
  • Apache Bahir連接配接器
    • 裡面也有kafka、RabbitMQ、ES的連接配接器更多

Sink Operator

  • Sink 輸出源
    • 預定義
      • print
      • writeAsText (過期)
    • 自定義
      • SinkFunction
      • RichSinkFunction
        • Rich相關的api更豐富,多了Open、Close方法,用于初始化連接配接等
    • flink官方提供 Bundle Connector
      • kafka、ES 等
    • Apache Bahir
      • kafka、ES、Redis等

Transformation

  • Map和FlatMap
  • KeyBy
  • filter過濾
  • sum
  • reduce函數
  • sum

視窗滑動

  • 背景
    • 資料流是一直源源不斷産生,業務需要聚合統計使用,比如每10秒統計過去5分鐘的點選量、成交額等
    • Windows 就可以将無限的資料流拆分為有限大小的“桶 buckets”,然後程式可以對其視窗内的資料進行計算
    • 視窗認為是Bucket桶,一個視窗段就是一個桶,比如8到9點是一個桶,9到10點是一個桶
  • 分類
    • time Window 時間視窗,即按照一定的時間規則作為視窗統計
      • time-tumbling-window 時間滾動視窗 (用的多)
      • time-sliding-window 時間滑動視窗 (用的多)
      • session WIndow 會話視窗,即一個會話内的資料進行統計,相對少用
    • count Window 數量視窗,即按照一定的資料量作為視窗統計,相對少用

視窗屬性

  • 滑動視窗 Sliding Windows
    • 視窗具有固定大小
    • 視窗資料有重疊
    • 例子:每10s統計一次最近1min内的訂單數量
Flink 算子簡介

滾動視窗 Tumbling Windows

  • 視窗具有固定大小
  • 視窗資料不重疊
  • 例子:每10s統計一次最近10s内的訂單數量
Flink 算子簡介
  • 視窗大小size 和 滑動間隔 slide
    • tumbling-window:滾動視窗: size=slide,如:每隔10s統計最近10s的資料
    • sliding-window:滑動視窗: size>slide,如:每隔5s統計最近10s的資料
    • size<slide的時候,如每隔15s統計最近10s的資料,那麼中間5s的資料會丢失,是以開發中不用

Flink的狀态State管理

  • 什麼是State狀态
    • 資料流處理離不開狀态管理,比如視窗聚合統計、去重、排序等
    • 是一個Operator的運作的狀态/曆史值,是維護在記憶體中
    • 流程:一個算子的子任務接收輸入流,擷取對應的狀态,計算新的結果,然後把結果更新到狀态裡面
Flink 算子簡介
  • 有狀态和無狀态介紹
    • 無狀态計算: 同個資料進到算子裡面多少次,都是一樣的輸出,比如 filter
    • 有狀态計算:需要考慮曆史狀态,同個輸入會有不同的輸出,比如sum、reduce聚合操作
  • 狀态管理分類
    • ManagedState(用的多)
      • Flink管理,自動存儲恢複
      • 細分兩類
        • Keyed State 鍵控狀态(用的多)
          • 有KeyBy才用這個,僅限用在KeyStream中,每個key都有state ,是基于KeyedStream上的狀态
          • 一般是用richFlatFunction,或者其他richfunction裡面,在open()聲明周期裡面進行初始化
          • ValueState、ListState、MapState等資料結構
        • Operator State 算子狀态(用的少,部分source會用)
          • ListState、UnionListState、BroadcastState等資料結構
    • RawState(用的少)
      • 使用者自己管理和維護
      • 存儲結構:二進制數組
  • State資料結構(狀态值可能存在記憶體、磁盤、DB或者其他分布式存儲中)
    • ValueState 簡單的存儲一個值(ThreadLocal / String)
      • ValueState.value()
      • ValueState.update(T value)
    • ListState 清單
      • ListState.add(T value)
      • ListState.get() //得到一個Iterator
    • MapState 映射類型
      • MapState.get(key)
      • MapState.put(key, value)

Flink的Checkpoint-SavePoint和端到端(end-to-end)狀态一緻性

  • 什麼是Checkpoint 檢查點
    • Flink中所有的Operator的目前State的全局快照
    • 預設情況下 checkpoint 是禁用的
    • Checkpoint是把State資料定時持久化存儲,防止丢失
    • 手工調用checkpoint,叫 savepoint,主要是用于flink叢集維護更新等
    • 底層使用了Chandy-Lamport 分布式快照算法,保證資料在分布式環境下的一緻性
  • 開箱即用,Flink 捆綁了這些檢查點存儲類型:
    • 作業管理器檢查點存儲 JobManagerCheckpointStorage
    • 檔案系統檢查點存儲 FileSystemCheckpointStorage
  • Savepoint 與 Checkpoint 的不同之處
    • 類似于傳統資料庫中的備份與恢複日志之間的差異
    • Checkpoint 的主要目的是為意外失敗的作業提供【重新開機恢複機制】,
    • Checkpoint 的生命周期由 Flink 管理,即 Flink 建立,管理和删除 Checkpoint - 無需使用者互動
    • Savepoint 由使用者建立,擁有和删除, 主要是【更新 Flink 版本】,調整使用者邏輯
    • 除去概念上的差異,Checkpoint 和 Savepoint 的目前實作基本上使用相同的代碼并生成相同的格式

繼續閱讀