Window在流式計算中很重要,因為”流”是一個無終點的持續輸入,是以通過window機制來分塊,進行聚合等各種處理
Keyed vs Non-Keyed Windows
Non-key window是在整個流上進行分塊,沒法并行處理
Window Assigners
分為4種視窗類型,分别是:
Tumbling Windows;Sliding Windows;Session Windows;Global Windows
Window Functions
注意區分可以累加計算的Function,和必須把一個window内的所有值都緩存起來最後計算的Fucntion
ReduceFunction;AggregateFunction;FoldFunction
ProcessWindowFunction(需要緩存整個window的資料後再計算)
ProcessWindowFunction with IncrementalAggregation
Using per-window state inProcessWindowFunction
Triggers
什麼時候觸發window的計算?一般是window結束的時候,涉及到系統時鐘時間、watermark等,也可以自定義trigger。
Fire and Purge;Default Triggers of WindowAssigners;Built-inand Custom Triggers
Evictors
用來在window的trigger觸發後,排除一些值
Allowed Lateness
一般違反watermark規則的逾時資料被丢棄,如果實際場景特殊,也可以設定不丢棄逾時情況。不丢棄就意味着對之前的結果做更改,需要緩存之前的結果,是以有一些性能開銷。下面是些注意事項,具體細節可以參考官方文檔
Gettinglate data as a side output
Lateelements considerations
Workingwith window results
Interactionof watermarks and windows
Consecutivewindowed operations
Useful state size considerations
接下來通過一個實際例子來說明window的使用方法:
##滴滴司機每小時的最高收入
我們乘坐滴滴打車的時候,每次付款都會向滴滴公司發送一個消息,裡面包括行程情況和所付的費用,如果滴滴想了解每小時司機的收入,最高收入是多少,可以通過下面的代碼實作:
// configure thedata source
DataStream<DidiFare> fares = xxxx;
// compute tipsper hour for each driver
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((DidiFare fare) ->fare.driverId).timeWindow(Time.hours(1))
.aggregate(new HourlyTipsSolution.AddTips(), newHourlyTipsSolution.WrapWithWindowInfo());
// find thehighest total tips in each hour
DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
.timeWindowAll(Time.hours(1)).maxBy(2);
// print theresult on stdout
printOrTest(hourlyMax);
上面的代碼首先計算每個司機,每隔一小時的收入之和;然後基于上一步的資料(上一步的資料形成一個新的流),每隔一小時計算出收入最大的那個。
public static class AddTips implements AggregateFunction<
DidiFare, // input type
Float, // accumulator type
Float // output type
>
{
@Override
public Float createAccumulator() {
return 0F;
}
@Override
public Float add(DidiFare fare, Float aFloat) {
return fare.tip + aFloat;
}
@Override
public Float getResult(Float aFloat) {
return aFloat;
}
@Override
public Float merge(Float aFloat, Float accumulator) {
return aFloat + accumulator;
}
}
上面是個自定義的aggregate函數,在window中不必緩存資料,任一時刻隻有一個值。
public static class WrapWithWindowInfo extends ProcessWindowFunction<
Float, Tuple3<Long, Long, Float>, Long, TimeWindow> {
@Override
public void process(Long key, Context context, Iterable<Float> elements,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {
Float sumOfTips = elements.iterator().next();//after aggregation, there is only one value
out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
}
}
上面的代碼是把window結果,結合window的時間熟悉,拼成一個新的流的元素。
歡迎閱讀,有問題可以通過郵件kaiyuan0989愛特163.com一起探讨。