天天看點

(二)Flink環境搭建與Flink開發WordCount案例

一、Flink在IDEA上的環境配置

Flink在IDEA上的開發是建立的maven項目,IDEA上maven的配置方法不在介紹,可自行參考網上資料進行配置maven環境。

1、打開IDEA,建立一個maven工程:

file->new->project->Maven->Next

(二)Flink環境搭建與Flink開發WordCount案例

2、設定GroupId和ArtifacId:

注意Version使用1.0
(二)Flink環境搭建與Flink開發WordCount案例

繼續建立項目,然後點選Finsh完成建立

(二)Flink環境搭建與Flink開發WordCount案例

3、配置項目的maven倉庫:

file->Settings->Maven,設定好你的maven倉庫和路徑,之後點選Apply,再點ok

(二)Flink環境搭建與Flink開發WordCount案例

4、添加pom檔案:

把以下配置放到你項目的pom檔案中

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>compile</scope>
        </dependency>
        <!-- flink的hadoop相容 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop2</artifactId>
            <version>1.7.2</version>
            <scope>compile</scope>
        </dependency>
        <!-- flink的hadoop相容 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.7.2</version>
            <scope>compile</scope>
        </dependency>
        <!-- flink的scala的api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
            <scope>compile</scope>
        </dependency>
        <!-- flink streaming的scala的api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
            <scope>compile</scope>
        </dependency>
        <!-- flink的java的api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.2</version>
            <scope>compile</scope>
        </dependency>
        <!-- flink streaming的java的api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.7.2</version>
            <scope>compile</scope>
        </dependency>
        <!-- flink運作時的webUI -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>1.7.2</version>
            <scope>compile</scope>
        </dependency>        
        <!-- flink 的kafkaconnector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.7.2</version>
            <scope>compile</scope>
        </dependency>
        <!-- 使用rocksdb儲存flink的state -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>1.7.2</version>
            <scope>compile</scope>
        </dependency>
        <!-- flink操作hbase -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hbase_2.11</artifactId>
            <version>1.7.2</version>
            <scope>compile</scope>
        </dependency>
        <!-- flink操作es -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
            <version>1.5.3</version>
            <scope>compile</scope>
        </dependency>
        <!-- mysql連接配接驅動 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.35</version>
            <scope>compile</scope>
        </dependency>
        <!-- redis連接配接 -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
            <scope>compile</scope>
        </dependency>
        <!-- 解析dom -->
        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.9.2</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>cn.wanghaomiao</groupId>
            <artifactId>JsoupXpath</artifactId>
            <version>0.3.2</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
           

5、在main中建立scala目錄

(二)Flink環境搭建與Flink開發WordCount案例
(二)Flink環境搭建與Flink開發WordCount案例

6、配置scala的環境:

1) file->Project Structure->Modules,把你的scala目錄Mark as成Sources,再把Language level 設定成 8-Lambdas,type annotations etc. ,再點Apply

(二)Flink環境搭建與Flink開發WordCount案例

2) Project Structure下配置Libiraies,選擇 + ,選中Scala SDK (未安裝Scala的自行搜尋安裝),點選ok選擇scala,再點選Apply,再點OK,這樣就配置好開發環境

(二)Flink環境搭建與Flink開發WordCount案例
(二)Flink環境搭建與Flink開發WordCount案例

3)設定java Compiler,選擇你的jdk版本(我的1.8)點選Apply,再點選OK,

最後點選Import Changes即可,至此flink開發環境配置完成。

(二)Flink環境搭建與Flink開發WordCount案例

二、Flink開發wordCount

1、在scala下建立包:

(二)Flink環境搭建與Flink開發WordCount案例

2、在com\hn\operator包下建立scala檔案:

(二)Flink環境搭建與Flink開發WordCount案例

3、 log4j.properties 檔案配置

在resources下建立一個log4j.properties檔案,把以下内容放到log4j.properties檔案裡,是為了在控制台上少列印東西,配置了一個日志輸出目錄

log4j.rootLogger=WARN, console, file
log4j.logger.akka=WARN, console
log4j.logger.org.apache.kafka=WARN
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zookeeper=WARN

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
# 注意這裡要修改成自己的日志目錄
log4j.appender.file.file=D://tmp//flink//flink_log
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

           

4、寫wordCount代碼:

package com.hn.operator

import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object SocketWordCount {
  def main(args: Array[String]): Unit = {
    //生成了配置對象
    val config = new Configuration()
    //打開flink-webui
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志檔案,否則列印日志到控制台,這樣你的控制台就清淨了
    config.setString("web.log.path", "D:\\tmp\\flink\\flink_log")
    //配置taskManager的日志檔案,否則列印日志到控制台,這樣你的控制台就清淨了
    config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "D:\\tmp\\flink\\flink_log")

    //獲得local運作環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
    //定義socket的source源
    val text: DataStream[String] = env.socketTextStream( hostname="localhost", port = 6666)

    //scala開發需要加一行隐式轉換,否則在調用operator的時候會報錯,作用是找到scala類型的TypeInformation
    import org.apache.flink.api.scala._

    //定義operators,作用是解析資料,分組,并且求wordCount
    val wordCount: DataStream[(String, Int)] = text.flatMap(_.split(" ")).map((_,1)).keyBy(_._1).sum( position = 1)

    //定義sink,列印資料到控制台
    wordCount.print()

    //定義任務的名稱并運作
    //注意:operator是惰性的,隻有遇到execute才執行
    env.execute(jobName = "SocketWordCount")

  }
}
           

5、運作測試wordCount:

1)先在cmd下啟動一個socketServer :ncat -lp 6666(注意:要在本地安裝ncat,使用nc也可以)

(二)Flink環境搭建與Flink開發WordCount案例

2)再啟動SocketWordCount 程式,然後在cmd上輸入統計的資料,會把統計結果實時的顯示在程式的控制台上。

(二)Flink環境搭建與Flink開發WordCount案例

3)在浏覽器上輸入 http://localhost:8080/ 能通路flink的web頁面

注意:能通路web頁面的前提是你已經安裝好Hadoop和Flink,并且已經啟動成功
           
(二)Flink環境搭建與Flink開發WordCount案例

繼續閱讀