1.4版本: Flink1.4 視窗函數 在定義 視窗配置設定器
之後,我們需要在每個視窗上指定我們要執行的計算。這是視窗函數的責任,一旦系統确定視窗準備好處理資料,視窗函數就處理每個視窗中的元素。
視窗函數可以是
ReduceFunction
, FoldFunction
或 WindowFunction
其中之一。前兩個函數執行更有效率,因為Flink可以在每個視窗中元素到達時增量地聚合。 WindowFunction
将獲得一個包含在視窗中所有元素的疊代器以及元素所在視窗的附加元資訊(gets an Iterable for all the elements contained in a window and additional meta information about the window to which the elements belong)。
使用
WindowFunction
的視窗轉換不能像其他函數那麼有效率,是因為Flink在調用函數之前必須在内部緩存視窗中的所有元素。這可以通過将 WindowFunction
與 ReduceFunction
FoldFunction
組合使用來緩解,以獲得視窗元素的增量聚合以及 WindowFunction
接收的附加視窗中繼資料。 1. ReduceFunction http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E5%9B%9B%20%E7%AA%97%E5%8F%A3%E5%87%BD%E6%95%B0.md#1-reducefunction
ReduceFunction
指定如何組合輸入資料的兩個元素以産生相同類型的輸出元素。Flink使用
ReduceFunction
增量聚合視窗的元素。
ReduceFunction
可以如下定義和使用:
Java版本:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
Scala版本:
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
上述示例獲得視窗中的所有元素元組的第二個字段之和。
2. FoldFunction http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E5%9B%9B%20%E7%AA%97%E5%8F%A3%E5%87%BD%E6%95%B0.md#2-foldfunction
FoldFunction
指定視窗的輸入元素如何與輸出類型的元素合并。
FoldFunction
會被每一個加入到視窗中的元素和目前的輸出值增量地調用(The FoldFunction is incrementally called for each element that is added to the window and the current output value),第一個元素與一個預定義的輸出類型的初始值合并。
FoldFunction
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
上述示例将所有輸入元素的Long值追加到初始化為空的字元串中。
備注:
fold()不能應用于會話視窗或者其他可合并的視窗中。
3. WindowFunction的一般用法 http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E5%9B%9B%20%E7%AA%97%E5%8F%A3%E5%87%BD%E6%95%B0.md#3-windowfunction
WindowFunction
将獲得一個包含視窗中所有元素的疊代器,并且擁有所有視窗函數的最大靈活性。但是這些是以性能和資源消耗為代價的,因為元素不能增量聚合,相反還需要在内部緩存,直到視窗做好準備處理。
WindowFunction
的接口如下所示:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}
WindowFunction
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
/* ... */
public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple<String, Long> in: input) {
count++;
}
out.collect("Window: " + window + "count: " + count);
}
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
/* ... */
class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {
def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window $window count: $count")
}
}
該示例展示
WindowFunction
計算視窗中元素個數。 此外,視窗函數将視窗相關資訊添加到輸出中。
使用WindowFunction做簡單的聚合操作如計數操作,性能是相當差的。下一章節我們将展示如何将ReduceFunction跟WindowFunction結合起來,來擷取增量聚合以及WindowFunction的添加資訊。
4. ProcessWindowFunction http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E5%9B%9B%20%E7%AA%97%E5%8F%A3%E5%87%BD%E6%95%B0.md#4-processwindowfunction
在可以使用
WindowFunction
的地方,你也可以使用
ProcessWindowFunction
。這個與
WindowFunction
非常相似,隻是該接口允許查詢更多關于context的資訊,context是視窗執行的地方。
ProcessWindowFunction
的接口如下:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* The context holding window metadata
*/
public abstract class Context {
/**
* @return The window that is being evaluated.
*/
public abstract W window();
}
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
@throws[Exception]
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])
/**
* The context holding window metadata
*/
abstract class Context {
/**
* @return The window that is being evaluated.
*/
def window: W
}
}
ProcessWindowFunction
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction());
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction())
5. 帶有增量聚合的WindowFunction http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E5%9B%9B%20%E7%AA%97%E5%8F%A3%E5%87%BD%E6%95%B0.md#5-windowfunction
WindowFunction
可以與
ReduceFunction
FoldFunction
組合使用,以便在元素到達視窗時增量聚合元素。當視窗關閉時,
WindowFunction
提供聚合結果。這允許在通路
WindowFunction
的額外視窗元資訊的同時增量計算視窗(This allows to incrementally compute windows while having access to the additional window meta information of the WindowFunction.)。
你也可以使用ProcessWindowFunction而不是WindowFunction進行增量視窗聚合。
5.1 使用FoldFunction增量視窗聚合 http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E5%9B%9B%20%E7%AA%97%E5%8F%A3%E5%87%BD%E6%95%B0.md#5-1-foldfunction
以下示例展現了增量
FoldFunction
如何與
WindowFunction
組合以提取視窗中的事件數,并傳回視窗的key和結束時間。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction())
// Function definitions
private static class MyFoldFunction
implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
Integer cur = acc.getField(2);
acc.setField(2, cur + 1);
return acc;
}
}
private static class MyWindowFunction
implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
public void apply(String key,
TimeWindow window,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
out.collect(new Tuple3<String, Long, Integer>(key, window.getEnd(),count));
}
}
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
5.2 使用ReduceFunction增量視窗聚合 http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/Flink/%5BFlink%5DFlink1.3%20Stream%E6%8C%87%E5%8D%97%E5%9B%9B%20%E7%AA%97%E5%8F%A3%E5%87%BD%E6%95%B0.md#5-2-reducefunction
以下示例展現了如何将增量
ReduceFunction
WindowFunction
組合以傳回視窗中的最小事件以及視窗的開始時間。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(new MyReduceFunction(), new MyWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyWindowFunction
implements WindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void apply(String key,
TimeWindow window,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
}
}
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
window: TimeWindow,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((window.getStart, min))
}
)
Flink版本:1.3
原文:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-functions