Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEA
文章目录
- Flink Streaming流式滑动窗口单词计数_With IntelliJ IDEA
-
- 1. 需求分析
- 2. 启动Flink
- 3. 在IntelliJ IDEA中开发调试SlidingWindowWordCount程序
-
- 3.1 新建Project
- 3.2 修改pom.xml
- 3.3 创建package
- 3.5 执行前的处理
- 3.6 运行结果
1. 需求分析
任务描述:
手工模拟流式数据产生,通过socket实时产生一些单词,使用Flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来。
Flink程序开发步骤
- 获得一个执行环境
- 加载/创建 初始化数据
- 指定操作数据的transaction算子
- 指定把计算好的数据放在哪
-
调用execute()触发执行程序
【注意】Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。(跟Spark中算子的执行一样,都可以称为懒(lazy)执行)。
延迟计算的好处:在开发一个复杂程序时,Flink可以讲复杂的程序转化成一个执行计划,将执行计划作为一个整体的单元执行!(提高计算效率)
执行环境
- Ubuntu 18.04
- Java 1.8
- Flink 1.9.1
- Maven 3.3.9
2. 启动Flink
进入Flink的安装目录
cd /usr/local/flink
./bin/start-cluster.sh
用
jps
查看启动的java线程,可以看到TaskManagerRunner和StandaloneSessionClusterEntrypoint已启动。
3. 在IntelliJ IDEA中开发调试SlidingWindowWordCount程序
3.1 新建Project
启动进入IDEA,如下图所示,新建一个项目。
执行如下图所示的操作:(注意不要勾选"Create from archetype"前面的小方框)
如下图所示,填写GroupId和ArtifactId。这里的GroupId是cn.stu.silver ,ArtifactId是streaming-wordcount。
这时生成的项目目录结构如下图所示。
3.2 修改pom.xml
该程序依赖Flink Java API,因此,我们需要通过Maven进行编译打包。打开pom.xml,然后,将pom.xml内容修改如下,用来声明该独立应用程序的信息以及与Flink的依赖关系:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.stu.silver</groupId>
<artifactId>streaming-wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
</dependencies>
</project>
3.3 创建package
如下图所示,创建Package。
如下图所示,输入package的名称为“cn.stu.silver”。
如下图所示,新建一个java class文件。
如下图所示,输入文件名称SlidingWindowWordCount。
###3.4 SlidingWindowWordCount.java
package cn.stu.silver;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
//import org.apache.flink.runtime.state.filesystem.FsStateBackend;
//import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* 滑动窗口计算
*
* 通过socket模拟产生单词数据
* flink对数据进行统计计算
*
* 需要实现每隔1秒对最近2秒内的数据进行汇总计算
*
*/
public class SlidingWindowWordCount {
public static void main(String[] args) throws Exception{
//获取需要的端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("No port set. use default port 9000--java");
port = 9000;
}
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "ubuntu";
String delimiter = "\n";
//连接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
// a a c
// a 1
// a 1
// c 1
DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] splits = value.split("\\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒
.sum("count");//在这里使用sum或者reduce都可以
/*.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word,a.count+b.count);
}
})*/
//把数据打印到控制台并且设置并行度
windowCounts.print().setParallelism(1);
//这一行代码一定要实现,否则程序不执行
env.execute("Socket window count");
}
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word,long count){
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
注意修改程序中hostname变量为自己的hostname,如果不清楚的话可以在终端中输入
hostname
查看自己的主机名。
3.5 执行前的处理
编写完代码以后会发现代码中会出现大量标红,说明相关的依赖没有导入。
如下图所示,在左侧目录树的pom.xml文件上单击鼠标右键,在弹出的菜单中选择Maven,再在弹出的菜单中选择Generate Sources and Update Folders,接着再在弹出的菜单中选择Reimport。
如果是第一次下载相关依赖需要等待几分钟:
3.6 运行结果
打开SlidingWindowWordCount.java代码文件,在这个代码文件的代码区域,鼠标右键单击,弹出菜单中选中“Run SlidingWindowWordCount.main()”。
同时在终端中输入
nc -l 9000
接着可以在终端中输入一下字符,按下回车会发现IntelliJ IDEA控制台会打印出窗口内出现的字符计数,结果如下:
完成!