Source Operator
Flink的API層級 為流式/批式處理應用程式的開發提供了不同級别的抽象
- 第一層是最底層的抽象為有狀态實時流處理,抽象實作是 Process Function,用于底層處理
- 第二層抽象是 Core APIs,許多應用程式不需要使用到上述最底層抽象的 API,而是使用 Core APIs 進行開發
- 例如各種形式的使用者自定義轉換(transformations)、聯接(joins)、聚合(aggregations)、視窗(windows)和狀态(state)操作等,此層 API 中處理的資料類型在每種程式設計語言中都有其對應的類。
- 第三層抽象是 Table API。 是以表Table為中心的聲明式程式設計API,Table API 使用起來很簡潔但是表達能力差
- 類似資料庫中關系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
- 允許使用者在編寫應用程式時将 Table API 與 DataStream/DataSet API 混合使用
- 第四層最頂層抽象是 SQL,這層程式表達式上都類似于 Table API,但是其程式實作都是 SQL 查詢表達式
- SQL 抽象與 Table API 抽象之間的關聯是非常緊密的
- 注意:Table和SQL層變動多,還在持續發展中,大緻知道即可,核心是第一和第二層
- Flink程式設計模型
Source來源
- 元素集合
- env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
- 檔案/檔案系統
- env.readTextFile(本地檔案);
- env.readTextFile(HDFS檔案);
- 基于Socket
- env.socketTextStream(“ip”, 8888)
- 自定義Source,實作接口自定義資料源,rich相關的api更豐富
- 并行度為1
- SourceFunction
- RichSourceFunction
- 并行度大于1
- ParallelSourceFunction
- RichParallelSourceFunction
- 并行度為1
- Connectors與第三方系統進行對接(用于source或者sink都可以)
- Flink本身提供Connector例如kafka、RabbitMQ、ES等
- 注意:Flink程式打包一定要将相應的connetor相關類打包進去,不然就會失敗
- Apache Bahir連接配接器
- 裡面也有kafka、RabbitMQ、ES的連接配接器更多
Sink Operator
- Sink 輸出源
- 預定義
- writeAsText (過期)
- 自定義
- SinkFunction
- RichSinkFunction
- Rich相關的api更豐富,多了Open、Close方法,用于初始化連接配接等
- flink官方提供 Bundle Connector
- kafka、ES 等
- Apache Bahir
- kafka、ES、Redis等
- 預定義
Transformation
- Map和FlatMap
- KeyBy
- filter過濾
- sum
- reduce函數
- sum
視窗滑動
- 背景
- 資料流是一直源源不斷産生,業務需要聚合統計使用,比如每10秒統計過去5分鐘的點選量、成交額等
- Windows 就可以将無限的資料流拆分為有限大小的“桶 buckets”,然後程式可以對其視窗内的資料進行計算
- 視窗認為是Bucket桶,一個視窗段就是一個桶,比如8到9點是一個桶,9到10點是一個桶
- 分類
- time Window 時間視窗,即按照一定的時間規則作為視窗統計
- time-tumbling-window 時間滾動視窗 (用的多)
- time-sliding-window 時間滑動視窗 (用的多)
- session WIndow 會話視窗,即一個會話内的資料進行統計,相對少用
- count Window 數量視窗,即按照一定的資料量作為視窗統計,相對少用
- time Window 時間視窗,即按照一定的時間規則作為視窗統計
視窗屬性
- 滑動視窗 Sliding Windows
- 視窗具有固定大小
- 視窗資料有重疊
- 例子:每10s統計一次最近1min内的訂單數量
滾動視窗 Tumbling Windows
- 視窗具有固定大小
- 視窗資料不重疊
- 例子:每10s統計一次最近10s内的訂單數量
- 視窗大小size 和 滑動間隔 slide
- tumbling-window:滾動視窗: size=slide,如:每隔10s統計最近10s的資料
- sliding-window:滑動視窗: size>slide,如:每隔5s統計最近10s的資料
- size<slide的時候,如每隔15s統計最近10s的資料,那麼中間5s的資料會丢失,是以開發中不用
Flink的狀态State管理
- 什麼是State狀态
- 資料流處理離不開狀态管理,比如視窗聚合統計、去重、排序等
- 是一個Operator的運作的狀态/曆史值,是維護在記憶體中
- 流程:一個算子的子任務接收輸入流,擷取對應的狀态,計算新的結果,然後把結果更新到狀态裡面
- 有狀态和無狀态介紹
- 無狀态計算: 同個資料進到算子裡面多少次,都是一樣的輸出,比如 filter
- 有狀态計算:需要考慮曆史狀态,同個輸入會有不同的輸出,比如sum、reduce聚合操作
- 狀态管理分類
- ManagedState(用的多)
- Flink管理,自動存儲恢複
- 細分兩類
- Keyed State 鍵控狀态(用的多)
- 有KeyBy才用這個,僅限用在KeyStream中,每個key都有state ,是基于KeyedStream上的狀态
- 一般是用richFlatFunction,或者其他richfunction裡面,在open()聲明周期裡面進行初始化
- ValueState、ListState、MapState等資料結構
- Operator State 算子狀态(用的少,部分source會用)
- ListState、UnionListState、BroadcastState等資料結構
- Keyed State 鍵控狀态(用的多)
- RawState(用的少)
- 使用者自己管理和維護
- 存儲結構:二進制數組
- ManagedState(用的多)
- State資料結構(狀态值可能存在記憶體、磁盤、DB或者其他分布式存儲中)
- ValueState 簡單的存儲一個值(ThreadLocal / String)
- ValueState.value()
- ValueState.update(T value)
- ListState 清單
- ListState.add(T value)
- ListState.get() //得到一個Iterator
- MapState 映射類型
- MapState.get(key)
- MapState.put(key, value)
- ValueState 簡單的存儲一個值(ThreadLocal / String)
Flink的Checkpoint-SavePoint和端到端(end-to-end)狀态一緻性
- 什麼是Checkpoint 檢查點
- Flink中所有的Operator的目前State的全局快照
- 預設情況下 checkpoint 是禁用的
- Checkpoint是把State資料定時持久化存儲,防止丢失
- 手工調用checkpoint,叫 savepoint,主要是用于flink叢集維護更新等
- 底層使用了Chandy-Lamport 分布式快照算法,保證資料在分布式環境下的一緻性
- 開箱即用,Flink 捆綁了這些檢查點存儲類型:
- 作業管理器檢查點存儲 JobManagerCheckpointStorage
- 檔案系統檢查點存儲 FileSystemCheckpointStorage
- Savepoint 與 Checkpoint 的不同之處
- 類似于傳統資料庫中的備份與恢複日志之間的差異
- Checkpoint 的主要目的是為意外失敗的作業提供【重新開機恢複機制】,
- Checkpoint 的生命周期由 Flink 管理,即 Flink 建立,管理和删除 Checkpoint - 無需使用者互動
- Savepoint 由使用者建立,擁有和删除, 主要是【更新 Flink 版本】,調整使用者邏輯
- 除去概念上的差異,Checkpoint 和 Savepoint 的目前實作基本上使用相同的代碼并生成相同的格式