天天看點

【建議收藏】Flink watermark分析實戰(上)

作者:CTO修煉之路

摘要

Apache Flink是一個架構和分布式處理引擎,用于對無界和有界資料流進行有狀态計算 flink中提供了時間窗的相關算子計算區域時間内的資料 本次分享基于flink 1.14 此次分享内容中,api示範與舊版略有不同,概念并無不同 本次分享需要對流式資料處理計算有一定的了解

概念篇

Flink時間語義概念簡介

  • 在flink的流式進行中,會涉及到時間的不同概念
Processing Time 處理時間Event Time 事件時間Ingestion Time 注入時間
  • Processing Time 處理時間
每一個執行基于時間操作的算子的本地系統時間,與機器相關
  • Event Time 事件時間
事件發生的時間,通常由資料中的某個字段進行提供。
  • Ingestion Time 注入時間
資料進入flink的事件
【建議收藏】Flink watermark分析實戰(上)

時間語義

  • 就1.14版本而言,根據時間推進和時間判斷的不同标準,一共由兩種時間語義
以process time為依據:處理時間語義以event time為依據:事件時間語義

對于事件時間的重要性和應用場景衆所周知

需求和問題

需求

目前時間13:10,計算[13:00,13:10)分鐘内訂單數量/活躍使用者數量

已知

flink中提供了時間窗的相關算子計算區域時間内的資料.

問題

由于網絡波動或者網絡傳輸的時間消耗, 一條由13:09分産生的資料,在13:11分才進入計算邏輯, 那麼是否要将此資料計入到計算結果中?

在分布式運算中,不同節點的運算速度不同, 時間視窗先接收到一個并發中發送的13:10:00:000的資料, 時間視窗後接收到一個并發中發送的13:09:59:999的資料, 那麼是否要将後接收到的這條資料計入到計算結果中?

分享者回答

如果是以事件時間進行處理的話,應當計入計算結果 如果是以處理時間進行處理的話,可以不計入計算結果 flink1.14不使用注入時間

再次提問

在業務場景中,我們很多需求都是要使用事件時間來作為依據的, 我想要按照時間事件來完成這個需求, 并且将遲到的資料也納入到計算結果中, 應該如何解決?

問題總結

使用EventTime所要面對的問題

資料延遲

網絡延遲性能延遲...亂序

資料源資料相對于時間本身就無序分布式場景下本身有序的資料也難以保持被讀取時有序...

分享者回答

如果使用類似于hive、doris、clinkhous之類的olap資料倉庫, 我們可以等待到一個合适的時間(資料完全到達之後)再進行排序/計算, 而在flink中,提供了一個叫做watermark的機制來完成這個需求,應對這些問題。

watermark

背景

在流式資料中,雖然資料本身是按照時間順序向下遊推送的, 但在網絡環境、分布式等因素下, 導緻到達時間窗中的順序并不是按照原本發送的順序。有時資料發送的本身就不是按照嚴格的事件時間進行推送的

什麼是watermark

以前我對watermark了解不夠深的時候,我以為watermark是flink的時間等待機制, 後來我才知道,watermark是flink的事件時間推進機制,事件時間等待機制,隻是他的一部分。

watermark是解決資料亂序到達的,也可以了解為解決資料延遲到達,watermark在解決上述問題時,要結合flink的window(時間窗)機制,flink中的window(時間窗)是由watermark來觸發的,這就意味着視窗觸發時,資料中timeStamp<=watermark的,均已到達時間窗

watermark 事件時間推進機制

  • 特點
watermark本身也會是上遊向下遊發送資料時,附帶的一個記錄watermark必須是單調遞增的,保證任務的時間一直在往前推進,不可後退watermark由資料中的時間戳來更新

watermark的生命周期

env.getConfig().setAutoWatermarkInterval(200)//(預設值200ms)
如果要禁用watermark機制,可以通過設定watermark生成頻率來實作
env.getConfig().setAutoWatermarkInterval(0)//(預設值200ms)
AssignerWithPeriodicWatermarks  (已過期)周期性生成watermark
AssignerWithPunctuatedWatermarks(已過期) 按照指定标記性事件生成watermark           

watermark的更新機制

當flink開啟watermark時,在所有的并發中的資料首先經過watermark管理,

source算子每200ms從資料中擷取一次時間戳,并更新自己的maxTimeStamp,并廣播到下遊

下遊的算子拿到資料時,并不會根據數中時間進行更新watermark,而是根據上遊發送過來的資料中攜帶的maxTimeStamp來更新自身watermark的值

【建議收藏】Flink watermark分析實戰(上)

而是根據watermark廣播到下遊的maxTimeStamp值進行觸發和結束,計算,

下遊每200ms對比各個并發發送的maxTimeStamp,并根據最小值,重新整理自身的maxTimeStamp并廣播到下遊

當上遊有多個watermark發來的maxTimeStamp值,下遊更新自身maxTimeStamp時取最小值 以最小值為基準,較大值到達時可以分發到他應該到的時間分桶中, 如果收到超出時間窗之外的未來資料,會建立此資料應有的時間窗,并開始緩存,時間窗(桶)的數量時沒有限制的 如果以最大值為基準,會導緻時間窗提前結束,maxTimeStamp較小的被抛棄掉

【建議收藏】Flink watermark分析實戰(上)

問題/需求解決

watermark是如何解決我們讨論之處提出的問題的呢? 我們也是時候上代碼了!!!

完整的watermark使用代碼

WatermarkStrategy<Bean> beanWatermarkStrategy = WatermarkStrategy
               .forGenerator(new WatermarkGeneratorSupplier<Bean>() {
                   @Override
                   public WatermarkGenerator<Bean> createWatermarkGenerator(Context context) {
                      return new WatermarkGenerator<Bean>() {
                          /** 最大時間戳. */
                           private long maxTimestamp;
                          /** 水印生成的最大無序度 */
                           private final long outOfOrdernessMillis = 0;
                           //watermark比較器
                           @Override
                           public void onEvent(Bean event, long eventTimestamp, WatermarkOutput output) {
                               maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
                           }
                           //watermark生成和發送
                           @Override
                           public void onPeriodicEmit(WatermarkOutput output) {
                               output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
                           }
                       };
                   }
               })
//                .noWatermarks()  //建立完全不生成水印的水印政策。這在執行純處理基于時間的流處理的場景中可能很有用。
//                .forMonotonousTimestamps()    //緊跟最大時間時間,完全不容忍亂序
//                .<Bean>forBoundedOutOfOrderness(Duration.ofMillis(0))  //允許亂序的生成政策   最大時間時間-容錯時間
               .withIdleness(Duration.ofSeconds(5))    //當某一并發遲遲沒有資料進來時,多長時間發送一次watermark值
               .withTimestampAssigner(new SerializableTimestampAssigner<Bean>() {
                   @Override
                   public long extractTimestamp(Bean element, long recordTimestamp) {
                       return element.getEventTime();
                   }
               })//watermark提取政策(從資料中)           

小延遲 - watermark推後機制 - BoundedOutOfOrderness政策

  • BoundedOutOfOrderness政策
用wartermark容錯,減慢時間的推進,在遲到資料到達時,讓下遊認為他還沒有遲到

說句人話,實際上就是用已經擷取到的時間戳-允許遲到的時間=watermark值

繼續閱讀