一、Flink在IDEA上的環境配置
Flink在IDEA上的開發是建立的maven項目,IDEA上maven的配置方法不在介紹,可自行參考網上資料進行配置maven環境。
1、打開IDEA,建立一個maven工程:
file->new->project->Maven->Next
2、設定GroupId和ArtifacId:
注意Version使用1.0
繼續建立項目,然後點選Finsh完成建立
3、配置項目的maven倉庫:
file->Settings->Maven,設定好你的maven倉庫和路徑,之後點選Apply,再點ok
4、添加pom檔案:
把以下配置放到你項目的pom檔案中
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>
<!-- flink的hadoop相容 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- flink的hadoop相容 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- flink的scala的api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- flink streaming的scala的api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- flink的java的api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- flink streaming的java的api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- flink運作時的webUI -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- flink 的kafkaconnector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- 使用rocksdb儲存flink的state -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- flink操作hbase -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<!-- flink操作es -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
<version>1.5.3</version>
<scope>compile</scope>
</dependency>
<!-- mysql連接配接驅動 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
<scope>compile</scope>
</dependency>
<!-- redis連接配接 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<scope>compile</scope>
</dependency>
<!-- 解析dom -->
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.9.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>cn.wanghaomiao</groupId>
<artifactId>JsoupXpath</artifactId>
<version>0.3.2</version>
<scope>compile</scope>
</dependency>
</dependencies>
5、在main中建立scala目錄
6、配置scala的環境:
1) file->Project Structure->Modules,把你的scala目錄Mark as成Sources,再把Language level 設定成 8-Lambdas,type annotations etc. ,再點Apply
2) Project Structure下配置Libiraies,選擇 + ,選中Scala SDK (未安裝Scala的自行搜尋安裝),點選ok選擇scala,再點選Apply,再點OK,這樣就配置好開發環境
3)設定java Compiler,選擇你的jdk版本(我的1.8)點選Apply,再點選OK,
最後點選Import Changes即可,至此flink開發環境配置完成。
二、Flink開發wordCount
1、在scala下建立包:
2、在com\hn\operator包下建立scala檔案:
3、 log4j.properties 檔案配置
在resources下建立一個log4j.properties檔案,把以下内容放到log4j.properties檔案裡,是為了在控制台上少列印東西,配置了一個日志輸出目錄
log4j.rootLogger=WARN, console, file
log4j.logger.akka=WARN, console
log4j.logger.org.apache.kafka=WARN
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zookeeper=WARN
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
# 注意這裡要修改成自己的日志目錄
log4j.appender.file.file=D://tmp//flink//flink_log
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
4、寫wordCount代碼:
package com.hn.operator
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object SocketWordCount {
def main(args: Array[String]): Unit = {
//生成了配置對象
val config = new Configuration()
//打開flink-webui
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
//配置webui的日志檔案,否則列印日志到控制台,這樣你的控制台就清淨了
config.setString("web.log.path", "D:\\tmp\\flink\\flink_log")
//配置taskManager的日志檔案,否則列印日志到控制台,這樣你的控制台就清淨了
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "D:\\tmp\\flink\\flink_log")
//獲得local運作環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
//定義socket的source源
val text: DataStream[String] = env.socketTextStream( hostname="localhost", port = 6666)
//scala開發需要加一行隐式轉換,否則在調用operator的時候會報錯,作用是找到scala類型的TypeInformation
import org.apache.flink.api.scala._
//定義operators,作用是解析資料,分組,并且求wordCount
val wordCount: DataStream[(String, Int)] = text.flatMap(_.split(" ")).map((_,1)).keyBy(_._1).sum( position = 1)
//定義sink,列印資料到控制台
wordCount.print()
//定義任務的名稱并運作
//注意:operator是惰性的,隻有遇到execute才執行
env.execute(jobName = "SocketWordCount")
}
}
5、運作測試wordCount:
1)先在cmd下啟動一個socketServer :ncat -lp 6666(注意:要在本地安裝ncat,使用nc也可以)
2)再啟動SocketWordCount 程式,然後在cmd上輸入統計的資料,會把統計結果實時的顯示在程式的控制台上。
3)在浏覽器上輸入 http://localhost:8080/ 能通路flink的web頁面
注意:能通路web頁面的前提是你已經安裝好Hadoop和Flink,并且已經啟動成功