天天看點

Flink:Window Api

基本概念

視窗 window

Flink:Window Api

一般真實的流都是無界的,怎樣處理無界的資料?

可以把無限的資料流進行切分,得到有限的資料集進行處理 —— 也就是得到有界流

視窗(window)就是将無限流切割為有限流的一種方式,它會将流資料分發到有限大小的桶(bucket)中進行分析

window類型:

  • 時間視窗:按照時間生成 Window。
    • 滾動時間視窗
    • 滑動時間視窗
    • 會話視窗
  • 計數視窗:視窗(window)就是将無限流切割為有限流的一種方式,它會将流 資料分發到有限大小的桶(bucket)中進行分析
    • 滾動計數視窗
    • 滑動計數視窗

滾動視窗-Tumbling Windows

将資料依據固定的視窗長度對資料進行切片。

特點:時間對齊,視窗長度固定,沒有重疊。

滾動視窗配置設定器将每個元素配置設定到一個指定視窗大小的視窗中,滾動視窗有一個固定的大小,并且不會出現重疊。例如:如果你指定了一個 5 分鐘大小的滾動窗 口,視窗的建立如下圖所示:

Flink:Window Api

适用場景:适合做 BI 統計等(做每個時間段的聚合計算)。

滑動視窗-Sliding Windows

滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組成。

特點:時間對齊,視窗長度固定,可以有重疊。

滑動視窗配置設定器将元素配置設定到固定長度的視窗中,與滾動視窗類似,視窗的大小由視窗大小參數來配置,另一個視窗滑動參數控制滑動視窗開始的頻率。是以,滑動視窗如果滑動參數小于視窗大小的話,視窗是可以重疊的,在這種情況下元素會被配置設定到多個視窗中

例如,你有 10 分鐘的視窗和 5 分鐘的滑動,那麼每個視窗中 5 分鐘的視窗裡包 含着上個 10 分鐘産生的資料,如下圖所示:

Flink:Window Api

适用場景:對最近一個時間段内的統計(求某接口最近 5min 的失敗率來決定是否要報警)。

會話視窗-Session Windows

由一系列事件組合一個指定時間長度的 timeout 間隙組成,類似于 web 應用的 session,也就是一段時間沒有接收到新資料就會生成新的視窗。

特點:時間無對齊。

session 視窗配置設定器通過 session 活動來對元素進行分組,session 視窗跟滾動窗 口和滑動視窗相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間周期内不再收到元素,即非活動間隔産生,那個這個視窗就會關閉。一個 session 視窗通過一個 session 間隔來配置,這個 session 間隔定義了非活躍周期的長度,當這個非活躍周期産生,那麼目前的 session 将關閉并且後續的元素将被配置設定到新的 session 視窗中去。

Flink:Window Api

視窗配置設定器

視窗配置設定器 —— window() 方法

我們可以用 .window() 來定義一個視窗,然後基于這個 window 去做一些聚合或者其它處理操作。注意 window () 方法必須在 keyBy 之後才能用。

Flink 提供了更加簡單的 .timeWindow 和 .countWindow 方法,用于定義時間視窗和計數視窗。

  • window() 方法接收的輸入參數是一個 WindowAssigner
  • WindowAssigner 負責将每條輸入的資料分發到正确的 window 中
  • Flink 提供了通用的 WindowAssigner
    • 滾動視窗--tumbling window
    • 滑動視窗--sliding window
    • 會話視窗--session window
    • 全局視窗--global window

建立不同類型的視窗

  • 滾動時間視窗(tumbling time window)
Flink:Window Api
  • 滑動時間視窗(sliding time window)
Flink:Window Api
  • 會話視窗(session window)
Flink:Window Api
  • 滾動計數視窗(tumbling count window)
Flink:Window Api
  • 滑動計數視窗(sliding count window)
Flink:Window Api

視窗函數

window function 定義了要對視窗中收集的資料做的計算操作

可以分為兩類

  • 增量聚合函數(incremental aggregation functions)
    • ReduceFunction, AggregateFunction
  • 全視窗函數(full window functions)
    • 先把視窗所有資料收集起來,等到計算的時候會周遊所有資料
    • ProcessWindowFunction,WindowFunction

時間視窗增量聚合

下面計算每三秒中資料的個數:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStream<String> inputStream = env.socketTextStream("192.168.1.77", 7777);

        DataStream<SensorReading> mapStream = inputStream.map((str) -> {
            String[] split = str.split(" ");
            return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });

        DataStream<Integer> resultStream = mapStream.keyBy("id")
            	//開一個時間視窗
                .timeWindow(Time.seconds(3))
            	//聚合
                .aggregate(new MyAggregateFun());

        resultStream.print();

        env.execute();
    }

    private static class MyAggregateFun implements AggregateFunction<SensorReading, Integer, Integer>{

        //建立一個累加器
        @Override
        public Integer createAccumulator() {
            return 0;
        }

        @Override
        public Integer add(SensorReading value, Integer accumulator) {
            //累加操作
            return accumulator + 1;
        }

        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            return  a + b;
        }
    }
           

測試效果:

Flink:Window Api

全視窗聚合

代碼:

//id 結束時間 個數
        DataStream<Tuple3<String, Long, Integer>> resultStream = mapStream.keyBy("id")
                .timeWindow(Time.seconds(3))
                .apply(new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) {
                        String id = tuple.getField(0);
                        long windowEnd = window.getEnd();
                        int count = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple3<>(id, windowEnd, count));
                    }
                });
           

效果:

Flink:Window Api

計數視窗測試

滑動計數視窗測試:

DataStream<Double> resultStream = mapStream.keyBy("id")
                .countWindow(10, 2)
                .aggregate(new MyAvgFunc());
        resultStream.print();
           

MyAvgFunc.java

/**
     * @author wen.jie
     * @date 2021/9/3 11:00
     * 求平均溫度
     */
    public static class MyAvgFunc implements AggregateFunction<SensorReading, Tuple2<Double, Integer>, Double>{

        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0, 0);
        }

        @Override
        public Tuple2<Double, Integer> add(SensorReading value, Tuple2<Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0 + value.getTemperature(), accumulator.f1 + 1);
        }

        @Override
        public Double getResult(Tuple2<Double, Integer> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }

        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0+b.f0, a.f1+b.f1);
        }
    }
           

效果:每兩條資料滑動一次

Flink:Window Api

其他可選API

  • .trigger() —— 觸發器
    • 定義 window 什麼時候關閉,觸發計算并輸出結果
  • .evictor() —— 移除器
    • 定義移除某些資料的邏輯
  • .allowedLateness() —— 允許處理遲到的資料
  • .sideOutputLateData() —— 将遲到的資料放入側輸出流(旁路輸出)
  • .getSideOutput() —— 擷取側輸出流(旁路輸出)

同過allowedLateness可以處理遲到資料。

在使用“事件時間”視窗時,可能會發生元素遲到的情況,具體表現是,Flink用于跟蹤“事件時間”進度的水位線已經超過了元素所屬視窗的結束時間戳。

//标記旁路輸出
final OutputTag<T> tag = new OutputTag<>("later-data");
//建立源資料
DataStream<T> input = .......;
SingleOutputStreamOperator<T> sumStream = input
    //keyBy:鍵控流轉換算子
    .keyBy("id")
    //視窗轉換算子
    .timeWindow(Time.seconds(15))
    //運作延遲時間
    .allowedLateness(Time.minutes(1))
    //将遲到的資料發送到用OutputTag辨別的旁路輸出流中
    .sideOutputLateData(tag)
    .sum("temperature");

//加載旁路輸出資料
sumStream.getSideOutput(tag).print();