使用flink實作一個簡單的wordcount
- 一、背景
- 二、需求
- 三、前置條件
-
- 1、jdk版本要求
- 2、maven版本要求
- 四、實作步驟
-
- 1、建立 flink 項目
- 2、編寫程式步驟
-
- 1、建立Stream執行上下文
- 2、監聽系統的9999端口,建立一個socket資料源
- 3、将擷取到的每一行資料以","分割,那麼每行資料就成了一個數組
- 4、然後将上一步擷取到的數組資料組成 (詞,次數)這種格式
- 5、然後在以詞進行分組
- 6、統計10s鐘内出現的各個詞的個數
- 7、進行求和
- 8、執行程式
- 3、程式代碼如下
- 五、運作結果
-
- 1、使用 nc -l 9999 開啟端口監聽
- 2、運作java程式
- 六、程式代碼
一、背景
最近在學習
flink
,此處記錄一下學習
flink
的第一個示例,寫一個簡單的
word count
程式。
二、需求
程式監聽系統中的
9999
端口,并統計出
該10秒鐘内
每個單詞出現的次數。每1行的詞由
逗号
進行分割。
三、前置條件
1、jdk版本要求
注意:官方要求1.8及以上的版本
2、maven版本要求
注意:官方要求3.0.4及以上的版本
四、實作步驟
1、建立 flink 項目
- 執行如下指令
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
- 執行上一步指令或會提示輸入自己項目的
,groupId
和artifactId
,此處輸入自己像輸入的即可。version
- 生成代碼如下截圖
2、編寫程式步驟
1、建立Stream執行上下文
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、監聽系統的9999端口,建立一個socket資料源
env.socketTextStream("localhost", 9999)
3、将擷取到的每一行資料以","分割,那麼每行資料就成了一個數組
輸入資料 | 轉換後的結果 | 描述 |
---|---|---|
a,b,c,d,e,e,f,g | [a,b,c,d,e,e,f,g] | 及以逗号分割轉成數組 |
4、然後将上一步擷取到的數組資料組成 (詞,次數)這種格式
輸入資料 | 轉換後的結果 |
---|---|
[a,b,c,d,e,e,f,g] | (a,1)… 即數組中的每個資料都轉換成了以自身作為key,且值是1的 Tuple2 格式 |
5、然後在以詞進行分組
資料 | 分組字段索引 | 描述 |
---|---|---|
(a,1) | 詞是Tuple2的第一個字段,即 為 |
6、統計10s鐘内出現的各個詞的個數
此步需要設定一個 window
7、進行求和
8、執行程式
env.execute("StreamingJob");
3、程式代碼如下
/**
* flink stream word count
*/
public class StreamingJob {
public static void main(String[] args) throws Exception {
// 建立一個stream執行上下文
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 建構一個socket資料源,用于從本地9999端口擷取資料
env.socketTextStream("localhost", 9999)
// 将每行資料以","分割,此時每行就是一個數組
.flatMap(new FlatMapFunction<String, String[]>() {
@Override
public void flatMap(String input, Collector<String[]> collector) throws Exception {
collector.collect(input.split(","));
}
})
// 将每個詞包裝成 (詞,1) 這種格式
.flatMap(new FlatMapFunction<String[], Tuple2<String, Integer>>() {
@Override
public void flatMap(String[] words, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
})
// 根據第一個詞進行分組
.keyBy(0)
// 時間視窗為10s
.timeWindow(Time.seconds(10))
// 根據第二個字段進行統計
.sum(1)
// 資料統計資料
.print();
// 執行程式
env.execute("StreamingJob");
}
}
五、運作結果
1、使用 nc -l 9999 開啟端口監聽
2、運作java程式
六、程式代碼
代碼 https://gitee.com/huan1993/flink-parent/tree/feature/wordcount/