天天看點

windows環境下flink入門demo執行個體前言碎語Apache Flink是什麼?進入正題文末結語

前言碎語

為了應對凱京科技集團的飛速發展,凱京科技研發中心2019定下了資料中台的目标。資料處理我們選擇了批處理+流處理結合的大資料應用軟體新秀Apache Flink,前幾天阿裡又發出好資訊稱将開源Blink(Flink早期分支遷出疊代優化),是以今天來近距離感受下Flink。部落客之前沒接觸過大資料相關的東西,是以不細究其設計概念了。目标就是跑一個最簡單的流處理的例子,後面慢慢深入後在和大家分享具體的元件概念以及api設計。

Apache Flink是什麼?

Apache Flink 是一個分布式大資料處理引擎,可對有限資料流和無限資料流進行有狀态計算。可部署在各種叢集環境,對各種大小的資料規模進行快速計算。上面是非常官方的描述,說白了我們為什麼選擇Flink,是因為他在社群口碑非常不錯。在國内的話有阿裡這種大資料大流量的公司一直在輸出,當然像騰訊、華為、餓了麼、滴滴等也都有使用Apache Flink。

進入正題

本篇博文涉及到的軟體工具以及下載下傳位址:

Apache Flink :

https://flink.apache.org/downloads.html

Netcat:

https://eternallybored.org/misc/netcat/

Netcat是一個有“瑞士軍刀”美譽的網絡工具,這裡用來綁定端口等待Apache Flink的連接配接

第一步:啟動Flink

從上面的位址下載下傳Flink後是一個壓縮包,解壓後的目錄結構如下:

windows環境下flink入門demo執行個體前言碎語Apache Flink是什麼?進入正題文末結語

/conf/flink-conf.yaml裡有一些Flink的基本配置資訊,如,jobmanager、taskmanager的端口和jvm記憶體(預設1024M)大小,web控制台的端口(預設8081)等。我們可以不該任何配置,然後進入到bin下,執行start-cluster.bat。這裡要注意不是并不是flink.bat。flink.bat是用來送出job的。還有要確定相關的端口沒有被占用

windows環境下flink入門demo執行個體前言碎語Apache Flink是什麼?進入正題文末結語

運作成功後會有兩個java黑視窗(一個TaskManager、一個JobManager),如果隻有一個java黑視窗,很可能是你的TaskManager因為端口占用沒有啟動起來,成功後通路:

http://localhost:8081.

就會看到如下的web管理控制台了:

windows環境下flink入門demo執行個體前言碎語Apache Flink是什麼?進入正題文末結語

如果啟動失敗的話,上面箭頭所指向的地方應該是0.

第二步:job任務編寫

1.首先需要建立一個maven工程,然後導入Flink的接口依賴

<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.1</version>           
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.1</version>           
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.1</version>           

2.編寫具體的job,官方提供了一個單詞統計的demo

package com.kl;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.ReduceFunction;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

public static void main(String[] args) throws Exception {

// the host and the port to connect to
  final String hostname;
  final int port;
  try {
     final ParameterTool params = ParameterTool.fromArgs(args);
     hostname = params.has("hostname") ? params.get("hostname") : "localhost";
     port = params.has("port") ? params.getInt("port"):9000;
  } catch (Exception e) {
     System.err.println("No port specified. Please run 'SocketWindowWordCount " +
        "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
        "and port is the address of the text server");
     System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
        "type the input text into the command line");
     return;
  }
  // get the execution environment
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  // get input data by connecting to the socket
  DataStream<String> text = env.socketTextStream(hostname, port, "\\n");
  // parse the data, group it, window it, and aggregate the counts
  DataStream<WordWithCount> windowCounts = text
        .flatMap(new FlatMapFunction<String, WordWithCount>() {
           public void flatMap(String value, Collector<WordWithCount> out) {
              for (String word : value.split("\\\s")) {
                 out.collect(new WordWithCount(word, 1L));
              } }})
        .keyBy("word")
        .timeWindow(Time.seconds(5))
        .reduce(new ReduceFunction<WordWithCount>() {
           public WordWithCount reduce(WordWithCount a, WordWithCount b) {
              return new WordWithCount(a.word, a.count + b.count);
           }});
  // print the results with a single thread, rather than in parallel
  windowCounts.print().setParallelism(1);
  env.execute("Socket Window WordCount");           

}

/**

\* Data type for words with count.
*/           

public static class WordWithCount {

public String word;
  public long count;
  public WordWithCount() {}
  public WordWithCount(String word, long count) {
     this.word = word;
     this.count = count;
  }
  @Override
  public String toString() {
     return word + " : " + count;
  }           

上面demo實作了從啟動參數中擷取ip和端口,然後連接配接從輸入流接收文本資訊,然後統計文本裡單詞出現的次數。因為要打成可運作的jar,是以,還需要引入maven的jar打包插件,如下:

<plugins>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>1.2.1</version>
        <executions>
            <execution>
                <phase>package</phase>
                <goals>
                    <goal>shade</goal>
                </goals>
                <configuration>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>com.kl.SocketWindowWordCount</mainClass>
                        </transformer>
                    </transformers>
                </configuration>
            </execution>
        </executions>
    </plugin>
</plugins>           

mainClass标簽中就是你的main方法所在類全類名。然後mvn install就可以打出一個可運作的jar包了。

第三步:Netcat監聽端口,等待連接配接

從上面貼的位址下載下傳Netcat後,是一個壓縮包,有些安全軟體可能會報病毒,請忽略就好了。然後解壓檔案目錄如下:

windows環境下flink入門demo執行個體前言碎語Apache Flink是什麼?進入正題文末結語

進入到這個目錄,然後執行: nc64.exe -l -p 9000。相當于打開了9000端口,并監聽了入站資訊。最後實作的效果就是從這個視窗中輸入的資料,回車後會發送Apache Flink中我們送出的job中處理輸出,是以這裡的9000端口,要和我們等下啟動job的啟動參數端口一緻。

第四步:送出job運作

運作job有兩種方式:可以通過Flink.bat運作,也可以通過web控制台運作。

指令行運作:

flink run E:\flinkWorkingspce\flinkdemo\target\finlk-demo-1.0-SNAPSHOT.jar --port 9000

web控制台運作:

windows環境下flink入門demo執行個體前言碎語Apache Flink是什麼?進入正題文末結語

如上圖,點選Add New後選擇你的jar包然後上傳,上傳成功就會在清單裡列出來。然後選中你上傳的jar。就會出現如下圖的輸入框,可以輸入你的啟動參數,然後點選submit送出就可以了

windows環境下flink入門demo執行個體前言碎語Apache Flink是什麼?進入正題文末結語

第五步:驗證效果

送出後如果沒有問題,job的詳情頁面如下:

windows環境下flink入門demo執行個體前言碎語Apache Flink是什麼?進入正題文末結語

這個時候我們從Netcat的監聽的黑視窗中敲入一些長文本,就會在Flink的job裡統計輸出出來如:

windows環境下flink入門demo執行個體前言碎語Apache Flink是什麼?進入正題文末結語

文末結語

Flink的Windows環境入門執行個體還算順利,這隻是第一步,後面Apache Flink的生産落地肯定還會有更多的問題和挑戰。我們會把落地過程中的問題拿到osc分享、來和大家一起交流,歡迎大家關注凱京科技。