天天看點

滾動監聽_Flink實戰之滾動視窗、滑動視窗WindowsAPI使用示例

Flink實戰之視窗WindowsAPI使用示例

介紹

Fink的時間視窗(Window)可以分成兩類:

1、CountWindow:按照指定的資料條數生成一個 Window,與時間無關。

2、TimeWindow:按照時間生成 Window。

TimeWindow,可以根據視窗實作原理的不同分成三類:滾動視窗(Tumbling

Window)、滑動視窗(Sliding Window)和會話視窗(Session Window)。

本文介紹滾動視窗(TumblingWindow)、滑動視窗(Sliding Window),并通過例子說明如何使用這些視窗。

接下來文章介紹CountWindow視窗,歡迎關注。

滾動視窗(Tumbling Windows)使用例子

滾動視窗是将資料依據固定的視窗長度對資料進行切片,特點是時間對齊,視窗長度固定,沒有重疊。

滾動視窗配置設定器将每個元素配置設定到一個指定視窗大小的視窗中,如:如果你指定了一個 15 分鐘大小的滾動視窗。

視窗的建立如下圖所示:

滾動監聽_Flink實戰之滾動視窗、滑動視窗WindowsAPI使用示例

示例說明:

進行分組聚合(keyBy:将key相同的分到一個組中) ,定義一個1分鐘的翻滾視窗,每分鐘統計一次單詞出現的次數。

示例代碼如下:

// 建立流處理的執行環境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //2.使用StreamExecutionEnvironment建立DataStream        //Source(可以有多個Source)        //Socket 監聽本地端口8888        // 接收一個socket文本流        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);        // Transformation(s) 對資料進行轉換處理統計,先分詞,再按照word進行分組,最後進行聚合統計        DataStreamString, Integer>> windowCount = lines.flatMap(            public void flatMap(String line, CollectorString, Integer>> collector) throws Exception {                String[] words = line.split(" ");                for (String word : words) {                    //将每個單詞與 1 組合,形成一個元組                    Tuple2<String, Integer> tp = Tuple2.of(word, 1);                    //将組成的Tuple放入到 Collector 集合,并輸出                    collector.collect(tp);                }            }        });        // 1. 滾動視窗(Tumbling Windows)使用例子        //進行分組聚合(keyBy:将key相同的分到一個組中) //定義一個1分鐘的翻滾視窗,每分鐘統計一次        DataStreamString, Integer>> windowStream = windowCount.keyBy(               .timeWindow(Time.minutes(1))                .sum(1);        // 調用Sink (Sink必須調用)        windowStream.print("windows: ").setParallelism(1);        //timePoint+=30;        //啟動(這個異常不建議try...catch... 捕獲,因為它會抛給上層flink,flink根據異常來做相應的重新開機政策等處理)        env.execute("StreamWordCount");
           

在終端通過指令nc -lk 8888 輸入一些資料

第一分鐘輸入

hello world
hello flink
           

第二分鐘輸入

hello spark
           

檢視一下效果

滾動監聽_Flink實戰之滾動視窗、滑動視窗WindowsAPI使用示例

滑動視窗(Sliding Windows)使用例子

滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組成。

特點:時間對齊,視窗長度固定,有重疊。

滾動監聽_Flink實戰之滾動視窗、滑動視窗WindowsAPI使用示例

示例說明:

進行分組聚合(keyBy:将key相同的分到一個組中) ,定義1分鐘的滑動視窗,每30秒滑動一次統計一次單詞出現的次數。

注:1分鐘的視窗和30秒的滑動,30秒滑動幅度是包含前後30秒的輸入資料,即1分鐘産生的資料。

示例代碼如下:

其它代碼與上面滾動視窗一樣

DataStream>> sumed = windowCount.keyBy(                .timeWindow(Time.minutes(1), Time.seconds(30))                .sum(1);
           

在終端通過指令nc -lk 8888 輸入一些資料

第一30秒内輸入

hello world
hello flink
           

第二30秒内輸入

hello spark
           

檢視一下效果

滾動監聽_Flink實戰之滾動視窗、滑動視窗WindowsAPI使用示例

如果覺得文章能幫到您,歡迎關注微信公衆号:大資料技術天涯,共同進步!

持續分享java微服務技術,大資料、人工智能等科技類原創文章。

滾動監聽_Flink實戰之滾動視窗、滑動視窗WindowsAPI使用示例