天天看点

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

文章目录

  • Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEA
    • 1. 需求分析
    • 2. 启动Flink
    • 3. 在IntelliJ IDEA中开发调试SlidingWindowWordCount程序
      • 3.1 新建Project
      • 3.2 修改pom.xml
      • 3.3 创建package
      • 3.5 执行前的处理
      • 3.6 运行结果

1. 需求分析

任务描述:

手工模拟流式数据产生,通过socket实时产生一些单词,使用Flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来。

Flink程序开发步骤

  1. 获得一个执行环境
  2. 加载/创建 初始化数据
  3. 指定操作数据的transaction算子
  4. 指定把计算好的数据放在哪
  5. 调用execute()触发执行程序

    【注意】Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。(跟Spark中算子的执行一样,都可以称为懒(lazy)执行)。

    延迟计算的好处:在开发一个复杂程序时,Flink可以讲复杂的程序转化成一个执行计划,将执行计划作为一个整体的单元执行!(提高计算效率)

执行环境

  • Ubuntu 18.04
  • Java 1.8
  • Flink 1.9.1
  • Maven 3.3.9

2. 启动Flink

进入Flink的安装目录

cd /usr/local/flink
./bin/start-cluster.sh
           

jps

查看启动的java线程,可以看到TaskManagerRunner和StandaloneSessionClusterEntrypoint已启动。

3. 在IntelliJ IDEA中开发调试SlidingWindowWordCount程序

3.1 新建Project

启动进入IDEA,如下图所示,新建一个项目。

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

执行如下图所示的操作:(注意不要勾选"Create from archetype"前面的小方框)

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

如下图所示,填写GroupId和ArtifactId。这里的GroupId是cn.stu.silver ,ArtifactId是streaming-wordcount。

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

这时生成的项目目录结构如下图所示。

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

3.2 修改pom.xml

该程序依赖Flink Java API,因此,我们需要通过Maven进行编译打包。打开pom.xml,然后,将pom.xml内容修改如下,用来声明该独立应用程序的信息以及与Flink的依赖关系:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.stu.silver</groupId>
    <artifactId>streaming-wordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>

    </dependencies>


</project>
           

3.3 创建package

如下图所示,创建Package。

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

如下图所示,输入package的名称为“cn.stu.silver”。

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

如下图所示,新建一个java class文件。

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

如下图所示,输入文件名称SlidingWindowWordCount。

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

###3.4 SlidingWindowWordCount.java

package cn.stu.silver;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
//import org.apache.flink.runtime.state.filesystem.FsStateBackend;
//import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 滑动窗口计算
 *
 * 通过socket模拟产生单词数据
 * flink对数据进行统计计算
 *
 * 需要实现每隔1秒对最近2秒内的数据进行汇总计算
 *
 */
public class SlidingWindowWordCount {

    public static void main(String[] args) throws Exception{
        //获取需要的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9000--java");
            port = 9000;
        }

        //获取flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "ubuntu";
        String delimiter = "\n";
        //连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        // a a c

        // a 1
        // a 1
        // c 1
        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒
                .sum("count");//在这里使用sum或者reduce都可以
                /*.reduce(new ReduceFunction<WordWithCount>() {
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                        return new WordWithCount(a.word,a.count+b.count);
                                    }
                                })*/
        //把数据打印到控制台并且设置并行度
        windowCounts.print().setParallelism(1);

        //这一行代码一定要实现,否则程序不执行
        env.execute("Socket window count");

    }

    public static class WordWithCount{
        public String word;
        public long count;
        public  WordWithCount(){}
        public WordWithCount(String word,long count){
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

           

注意修改程序中hostname变量为自己的hostname,如果不清楚的话可以在终端中输入

hostname

查看自己的主机名。

3.5 执行前的处理

编写完代码以后会发现代码中会出现大量标红,说明相关的依赖没有导入。

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

如下图所示,在左侧目录树的pom.xml文件上单击鼠标右键,在弹出的菜单中选择Maven,再在弹出的菜单中选择Generate Sources and Update Folders,接着再在弹出的菜单中选择Reimport。

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

如果是第一次下载相关依赖需要等待几分钟:

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

3.6 运行结果

打开SlidingWindowWordCount.java代码文件,在这个代码文件的代码区域,鼠标右键单击,弹出菜单中选中“Run SlidingWindowWordCount.main()”。

同时在终端中输入

nc -l 9000
           

接着可以在终端中输入一下字符,按下回车会发现IntelliJ IDEA控制台会打印出窗口内出现的字符计数,结果如下:

Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEAFlink Streaming流式滑动窗口单词计数_With IntelliJ IDEA

完成!