天天看點

使用flink實作一個簡單的wordcount一、背景二、需求三、前置條件四、實作步驟五、運作結果六、程式代碼

使用flink實作一個簡單的wordcount

  • 一、背景
  • 二、需求
  • 三、前置條件
    • 1、jdk版本要求
    • 2、maven版本要求
  • 四、實作步驟
    • 1、建立 flink 項目
    • 2、編寫程式步驟
      • 1、建立Stream執行上下文
      • 2、監聽系統的9999端口,建立一個socket資料源
      • 3、将擷取到的每一行資料以","分割,那麼每行資料就成了一個數組
      • 4、然後将上一步擷取到的數組資料組成 (詞,次數)這種格式
      • 5、然後在以詞進行分組
      • 6、統計10s鐘内出現的各個詞的個數
      • 7、進行求和
      • 8、執行程式
    • 3、程式代碼如下
  • 五、運作結果
    • 1、使用 nc -l 9999 開啟端口監聽
    • 2、運作java程式
  • 六、程式代碼

一、背景

最近在學習

flink

,此處記錄一下學習

flink

的第一個示例,寫一個簡單的

word count

程式。

二、需求

程式監聽系統中的

9999

端口,并統計出

該10秒鐘内

每個單詞出現的次數。每1行的詞由

逗号

進行分割。

三、前置條件

1、jdk版本要求

使用flink實作一個簡單的wordcount一、背景二、需求三、前置條件四、實作步驟五、運作結果六、程式代碼

注意:官方要求1.8及以上的版本

2、maven版本要求

使用flink實作一個簡單的wordcount一、背景二、需求三、前置條件四、實作步驟五、運作結果六、程式代碼

注意:官方要求3.0.4及以上的版本

四、實作步驟

1、建立 flink 項目

  1. 執行如下指令
mvn archetype:generate                               \
     -DarchetypeGroupId=org.apache.flink              \
     -DarchetypeArtifactId=flink-quickstart-java      \
     -DarchetypeVersion=1.9.0
           
  1. 執行上一步指令或會提示輸入自己項目的

    groupId

    ,

    artifactId

    version

    ,此處輸入自己像輸入的即可。
  2. 生成代碼如下截圖
    使用flink實作一個簡單的wordcount一、背景二、需求三、前置條件四、實作步驟五、運作結果六、程式代碼

2、編寫程式步驟

1、建立Stream執行上下文

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2、監聽系統的9999端口,建立一個socket資料源

env.socketTextStream("localhost", 9999)

3、将擷取到的每一行資料以","分割,那麼每行資料就成了一個數組

輸入資料 轉換後的結果 描述
a,b,c,d,e,e,f,g [a,b,c,d,e,e,f,g] 及以逗号分割轉成數組

4、然後将上一步擷取到的數組資料組成 (詞,次數)這種格式

輸入資料 轉換後的結果
[a,b,c,d,e,e,f,g] (a,1)… 即數組中的每個資料都轉換成了以自身作為key,且值是1的 Tuple2 格式

5、然後在以詞進行分組

資料 分組字段索引 描述
(a,1) 詞是Tuple2的第一個字段,即

索引

6、統計10s鐘内出現的各個詞的個數

此步需要設定一個 window

7、進行求和

8、執行程式

env.execute("StreamingJob");

3、程式代碼如下

/**
 * flink stream word count
 */
public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // 建立一個stream執行上下文
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 建構一個socket資料源,用于從本地9999端口擷取資料
        env.socketTextStream("localhost", 9999)
                // 将每行資料以","分割,此時每行就是一個數組
                .flatMap(new FlatMapFunction<String, String[]>() {
                    @Override
                    public void flatMap(String input, Collector<String[]> collector) throws Exception {
                        collector.collect(input.split(","));
                    }
                })
                // 将每個詞包裝成 (詞,1) 這種格式
                .flatMap(new FlatMapFunction<String[], Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String[] words, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        for (String word : words) {
                            collector.collect(new Tuple2<>(word, 1));
                        }
                    }
                })
                // 根據第一個詞進行分組
                .keyBy(0)
                // 時間視窗為10s
                .timeWindow(Time.seconds(10))
                // 根據第二個字段進行統計
                .sum(1)
                // 資料統計資料
                .print();

        // 執行程式
        env.execute("StreamingJob");
    }
}
           

五、運作結果

1、使用 nc -l 9999 開啟端口監聽

2、運作java程式

使用flink實作一個簡單的wordcount一、背景二、需求三、前置條件四、實作步驟五、運作結果六、程式代碼

六、程式代碼

代碼 https://gitee.com/huan1993/flink-parent/tree/feature/wordcount/

繼續閱讀