天天看點

Flink的window機制

Window在流式計算中很重要,因為”流”是一個無終點的持續輸入,是以通過window機制來分塊,進行聚合等各種處理

Keyed vs Non-Keyed Windows

Non-key window是在整個流上進行分塊,沒法并行處理

Window Assigners

分為4種視窗類型,分别是:

Tumbling Windows;Sliding Windows;Session Windows;Global Windows

Window Functions

注意區分可以累加計算的Function,和必須把一個window内的所有值都緩存起來最後計算的Fucntion

ReduceFunction;AggregateFunction;FoldFunction

ProcessWindowFunction(需要緩存整個window的資料後再計算)

ProcessWindowFunction with IncrementalAggregation

Using per-window state inProcessWindowFunction

Triggers

什麼時候觸發window的計算?一般是window結束的時候,涉及到系統時鐘時間、watermark等,也可以自定義trigger。

Fire and Purge;Default Triggers of WindowAssigners;Built-inand Custom Triggers

Evictors

用來在window的trigger觸發後,排除一些值

Allowed Lateness

一般違反watermark規則的逾時資料被丢棄,如果實際場景特殊,也可以設定不丢棄逾時情況。不丢棄就意味着對之前的結果做更改,需要緩存之前的結果,是以有一些性能開銷。下面是些注意事項,具體細節可以參考官方文檔

Gettinglate data as a side output

Lateelements considerations

Workingwith window results

Interactionof watermarks and windows

Consecutivewindowed operations

Useful state size considerations

接下來通過一個實際例子來說明window的使用方法:

##滴滴司機每小時的最高收入

我們乘坐滴滴打車的時候,每次付款都會向滴滴公司發送一個消息,裡面包括行程情況和所付的費用,如果滴滴想了解每小時司機的收入,最高收入是多少,可以通過下面的代碼實作:

// configure thedata source
DataStream<DidiFare> fares = xxxx;

// compute tipsper hour for each driver
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((DidiFare fare) ->fare.driverId).timeWindow(Time.hours(1))
.aggregate(new HourlyTipsSolution.AddTips(), newHourlyTipsSolution.WrapWithWindowInfo());

// find thehighest total tips in each hour
DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
    .timeWindowAll(Time.hours(1)).maxBy(2);

// print theresult on stdout
printOrTest(hourlyMax);
           

上面的代碼首先計算每個司機,每隔一小時的收入之和;然後基于上一步的資料(上一步的資料形成一個新的流),每隔一小時計算出收入最大的那個。

    public static class AddTips implements AggregateFunction<
        DidiFare, // input type
        Float,    // accumulator type
        Float     // output type
        >

    {
        @Override
        public Float createAccumulator() {
            return 0F;
        }
        @Override
        public Float add(DidiFare fare, Float aFloat) {
            return fare.tip + aFloat;
        }
        @Override
        public Float getResult(Float aFloat) {
            return aFloat;
        }
        @Override
        public Float merge(Float aFloat, Float accumulator) {
            return aFloat + accumulator;
        }
    }
           

上面是個自定義的aggregate函數,在window中不必緩存資料,任一時刻隻有一個值。

    public static class WrapWithWindowInfo extends ProcessWindowFunction<
        Float, Tuple3<Long, Long, Float>, Long, TimeWindow> {
        @Override
        public void process(Long key, Context context, Iterable<Float> elements,
            Collector<Tuple3<Long, Long, Float>> out) throws Exception {
            Float sumOfTips = elements.iterator().next();//after aggregation, there is only one value
            out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
        }
    }
           

上面的代碼是把window結果,結合window的時間熟悉,拼成一個新的流的元素。

歡迎閱讀,有問題可以通過郵件kaiyuan0989愛特163.com一起探讨。

繼續閱讀