天天看点

Flink 项目Scala模板入门

当前的很多构建工具,可以提供项目模板来让开发人员快速初始化项目文件,这对于降低学习难度,提高项目开发效率来说,起到积极的作用。Flink项目的初始化,可以借助Maven工具来构建。在之前JDK和Maven环境搭建完成,并正确设置环境变量后,即可以用mvn archetype:generate快速生成项目文件。当前支持Scala和Java两种项目模板。下面分别进行介绍。

首先打开命令行CMD窗体,并切换到项目的根目录中,执行如下命令:

mvn archetype:generate                            ^
      -DarchetypeGroupId=org.apache.flink            ^
      -DarchetypeArtifactId=flink-quickstart-scala      ^
      -DarchetypeVersion=1.12.0                     ^
      -DgroupId=com.myflink                         ^
      -DartifactId=flink-scala                        ^
      -Dpackage=com.example                       ^
      -DinteractiveMode=false       

上述脚本每行的^符号为Windows操作系统命令行的换行符,而Linux操作系统下为\符号。当前这里换行是为了更加的清晰,即也可以写成一行。其中:

l -DarchetypeArtifactId=flink-quickstart-scala

表示基于的项目模板为flink-quickstart-scala,则表示为Scala版本的Flink项目。

l -DarchetypeVersion=1.12.0

表示Flink版本为1.12.0。

l -DgroupId=com.myflink

代表组织和整个项目的唯一标志。

l -DartifactId=flink-scala

具体项目的名称,也是生成项目文件夹的名称。

l -Dpackage=com.example

项目源码的包名。

l -DinteractiveMode=false

表示不启用交互模式,这样提示信息更少。

在命令行执行tree flink-scala /F则可以显示flink-scala目录的项目结构,如下所示。

C:\src\flink-scala
│ ─pom.xml
│
└─src
    └─main
        ├─resources
        │      └─log4j2.properties
        │
        └─scala
            └─com
                └─example
                      └─ BatchJob.scala
                      └─ StreamingJob.scala      

其中BatchJob.scala是批处理示例文件,而StreamingJob.scala是流处理示例文件。此处修改BatchJob.scala文件,给出一个用scala语言编写统计单词个数的示例代码,如代码所示:

package com.example
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
//object可以直接运行
object BatchJob {
  //启动函数
  def main(args: Array[String]) {
    //参数处理,如 --参数名 参数值
    val params: ParameterTool = ParameterTool.fromArgs(args)
    //获取批处理执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //演示数据
    val mytxt = env.fromElements(
      "Hello Word",
      "Hello Flink",
      "Apache Flink")
    //单词统计
    val wc = mytxt.flatMap(line => line.split("\\s"))
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)
    //可以写入一个文件,便于查看
    wc.setParallelism(1)
    //--output xxx
    if (params.has("output")) {
      //WriteMode.OVERWRITE覆盖模式
      wc.writeAsCsv(params.get("output"), "\n", ",",WriteMode.OVERWRITE)
      env.execute("Scala WordCount Demo")
    } else {
      //便于IDEA调试,实际部署一般不用
      wc.print()
    }
  }
}      

我们先不需太过关注具体API的意义,这个后面会慢慢分章节进行详细说明。默认情况下,resources目录下的log4j2.properties配置文件给出的日志级别为INFO,则打印的信息比较多,这里可以将日志级别修改为ERROR,具体如下所示:

rootLogger.level = ERROR
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n