一、首先准备一台虚拟机
安装NC
yum -y install nc
然后
nc -lk 9999
二、打开IDEA
(1)创建一个maven项目,并修改pom文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.kgc.demo.scala</groupId>
<artifactId>scala-demo-project</artifactId>
<version>1.0-SNAPSHOT</version>
<name>scala-demo-project</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.kgc.kafak.demo.ThreadProducer</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
(2)等待依赖下载完成后,配置sdk,以及赋予文件权限
三、四个演示操作
操作之前,先关闭虚拟防火墙
systemctl stop firewalld
1.WordCount
虚拟机端执行如下命令,并在IDEA中执行代码后输入字符即可进行wordcount
此处注意要修改虚拟机主机映射地址为自己虚拟机的映射或者 是IP地址,否则连接失败
object WordCountForSocket {
/**
*
*
* 1. 你写的代码在哪里执行
*
*
*
* @param args
*/
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
//每隔5秒钟 做一次workcount
val ssc = new StreamingContext(conf, Durations.seconds(5))
ssc.sparkContext.setLogLevel("Error")
// 在 Driver上执行(执行一次)
println("------------------------------------------------------------------------------------start。。。。。")
//先启动nc -lk 9999
val lines = ssc.socketTextStream("nodefive", 9999)
// 在 exector上执行
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
2.有状态统计
需要设置checkpoint,其他操作同上,一样是要修改主机映射
object UpdateStateByKey {
def main(args: Array[String]): Unit = {
// 计算每个单词出现的次数的总和 hello world 每个商品总的销售额
// 首选要保存上一个批次的计算结果
val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Durations.seconds(5))
ssc.sparkContext.setLogLevel("Error")
ssc.checkpoint("./updatestatebykey")
val lines = ssc.socketTextStream("nodefive", 9999)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
//scala 编程的时,尽量通过参数名传递更多的信息
val result: DStream[(String, Int)] = wordAndOne.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {
Some(preValue.getOrElse(0) + currentValues.sum)
})
result.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
3.DriverHA
文件结构如下(KAFAK.DEMO目录不需要)
(1)首先创建好文件夹,并把word.txt文件放到copyFileWord目录下
word.txt
hello world
hello java
hello scala
其实就是一个输入目录,一个输出目录,
(2)先执行util包中CopyFileToDirectory,复制文件
这里注意两个路径要替换成自己的,在IEDA中选中对应文件/文件夹 右键COPY PATH即可
package util
import java.io.{File, FileInputStream, FileOutputStream}
import java.util.UUID
object CopyFileToDirectory {
def main(args: Array[String]): Unit = {
while (true) {
Thread.sleep(5000);
val uuid = UUID.randomUUID().toString();
println(uuid);
copyFile(new File("D:\\IDEA_project\\aa2_17\\src\\test\\java\\data\\copyFileWord\\word.txt"), new File("D:\\IDEA_project\\aa2_17\\src\\test\\java\\data\\streamingCopyFile\\" + uuid + "----words.txt"));
}
}
/**
* 复制文件到文件夹目录下
*/
def copyFile(fromFile: File, toFile: File): Unit = {
val ins = new FileInputStream(fromFile);
val out = new FileOutputStream(toFile);
val buffer = new Array[Byte](1024 * 1024)
var size = 0
while (size != -1) {
out.write(buffer, 0, buffer.length);
size = ins.read(buffer)
}
ins.close();
out.close();
}
}
(3)复制文件成功后,再执行SparkStreamingDriverHA
package sparkstreaming.demo
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Durations, StreamingContext}
object SparkStreamingDriverHA {
val checkpointDir = "data/streamingCheckpoint"
def main(args: Array[String]): Unit = {
//checkpoint中有就恢复,没有就创建
val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointDir, createStreamingContext)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
def createStreamingContext() = {
println("-----------------Create new StreamingContext ----------------")
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("DriverHA")
val ssc: StreamingContext = new StreamingContext(conf, Durations.seconds(5))
ssc.sparkContext.setLogLevel("Error")
/**
* 默认checkpoint 存储:
* 1.配置信息
* 2.DStream操作逻辑
* 3.job的执行进度
* * 4.offset
*/
//监测目录,有文件就处理
ssc.checkpoint(checkpointDir)
val lines: DStream[String] = ssc.textFileStream("D:\\IDEA_project\\aa2_17\\src\\test\\java\\data\\streamingCopyFile")
val words: DStream[String] = lines.flatMap(line => {
line.trim.split(" ")
})
val pairWords: DStream[(String, Int)] = words.map(word => {
(word, 1)
})
val result: DStream[(String, Int)] = pairWords.reduceByKey((v1: Int, v2: Int) => {
v1 + v2
})
result.print()
/**
* 更改逻辑
*/
// result.foreachRDD(pairRDD => {
// pairRDD.filter(one => {
// println("*********** filter *********")
// true
// })
// pairRDD.foreach(println)
// })
ssc
}
}
4.WindowOperatorDemo
package sparkstreaming.demo
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
object WindowOperatorDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Durations.seconds(5))
ssc.sparkContext.setLogLevel("Error")
val lines = ssc.socketTextStream("192.168.202.143", 9999)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
// 窗口长度 计算数据的范围
// 窗口的步长 触发计算的时间间隔
// 都必须是 时间间隔的整数倍
// 前一天 24小时内, 一小时触发一次
// val result = wordAndOne.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2,
// Durations.seconds(15),
// Durations.seconds(5))
ssc.checkpoint("./windowoprator")
val result = wordAndOne.reduceByKeyAndWindow((total: Int, enterValue: Int) => total + enterValue,
(total: Int, leaveValue: Int) => total - leaveValue,
Durations.seconds(15),
Durations.seconds(5))
result.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}