天天看點

Flink 項目Scala模闆入門

目前的很多建構工具,可以提供項目模闆來讓開發人員快速初始化項目檔案,這對于降低學習難度,提高項目開發效率來說,起到積極的作用。Flink項目的初始化,可以借助Maven工具來建構。在之前JDK和Maven環境搭建完成,并正确設定環境變量後,即可以用mvn archetype:generate快速生成項目檔案。目前支援Scala和Java兩種項目模闆。下面分别進行介紹。

首先打開指令行CMD窗體,并切換到項目的根目錄中,執行如下指令:

mvn archetype:generate                            ^
      -DarchetypeGroupId=org.apache.flink            ^
      -DarchetypeArtifactId=flink-quickstart-scala      ^
      -DarchetypeVersion=1.12.0                     ^
      -DgroupId=com.myflink                         ^
      -DartifactId=flink-scala                        ^
      -Dpackage=com.example                       ^
      -DinteractiveMode=false       

上述腳本每行的^符号為Windows作業系統指令行的換行符,而Linux作業系統下為\符号。目前這裡換行是為了更加的清晰,即也可以寫成一行。其中:

l -DarchetypeArtifactId=flink-quickstart-scala

表示基于的項目模闆為flink-quickstart-scala,則表示為Scala版本的Flink項目。

l -DarchetypeVersion=1.12.0

表示Flink版本為1.12.0。

l -DgroupId=com.myflink

代表組織和整個項目的唯一标志。

l -DartifactId=flink-scala

具體項目的名稱,也是生成項目檔案夾的名稱。

l -Dpackage=com.example

項目源碼的包名。

l -DinteractiveMode=false

表示不啟用互動模式,這樣提示資訊更少。

在指令行執行tree flink-scala /F則可以顯示flink-scala目錄的項目結構,如下所示。

C:\src\flink-scala
│ ─pom.xml
│
└─src
    └─main
        ├─resources
        │      └─log4j2.properties
        │
        └─scala
            └─com
                └─example
                      └─ BatchJob.scala
                      └─ StreamingJob.scala      

其中BatchJob.scala是批處理示例檔案,而StreamingJob.scala是流處理示例檔案。此處修改BatchJob.scala檔案,給出一個用scala語言編寫統計單詞個數的示例代碼,如代碼所示:

package com.example
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
//object可以直接運作
object BatchJob {
  //啟動函數
  def main(args: Array[String]) {
    //參數處理,如 --參數名 參數值
    val params: ParameterTool = ParameterTool.fromArgs(args)
    //擷取批處理執行環境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //示範資料
    val mytxt = env.fromElements(
      "Hello Word",
      "Hello Flink",
      "Apache Flink")
    //單詞統計
    val wc = mytxt.flatMap(line => line.split("\\s"))
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)
    //可以寫入一個檔案,便于檢視
    wc.setParallelism(1)
    //--output xxx
    if (params.has("output")) {
      //WriteMode.OVERWRITE覆寫模式
      wc.writeAsCsv(params.get("output"), "\n", ",",WriteMode.OVERWRITE)
      env.execute("Scala WordCount Demo")
    } else {
      //便于IDEA調試,實際部署一般不用
      wc.print()
    }
  }
}      

我們先不需太過關注具體API的意義,這個後面會慢慢分章節進行詳細說明。預設情況下,resources目錄下的log4j2.properties配置檔案給出的日志級别為INFO,則列印的資訊比較多,這裡可以将日志級别修改為ERROR,具體如下所示:

rootLogger.level = ERROR
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n