Spark Streaming + Flume
以单词计数为例
流程:
41414 ---> Flume ----> 4444 ---> SparkStreaming
flume从41414端口接收数据,处理完数据后,将数据发送到4444端口,SparkStreaming接收4444端口的数据。
Spark Streaming项目
相关依赖
<dependency>
<groupId>org.apache.spark</groupId>
<!-- 2.11 scala的版本 -->
<!-- 2.4.4 spark的版本 -->
<artifactId>spark-core_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.4</version>
<!--集群中运行打开,本地运行注释-->
<!--<scope>provided</scope>-->
</dependency>
<!-- spark-streaming-flume的整合jar -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.4.4</version>
</dependency>
代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("wordCount").setMaster("spark:Spark://7077")
val ssc = new StreamingContext(conf,Seconds(5)) //每5秒刷新一次
ssc.sparkContext().setLogLevel("ERROR") //提高日志输出级别
//创建DStream
val flumeStream = FlumeUtils.createStream(
ssc,
"Spark", //hostname
4444 //port 监听4444端口传来的数据
)
flumeStream
.map(x => new String(x.event.getBody.array()).trim)
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
//开启
ssc.start()
//关闭
ssc.awaitTermination()
}
}
打jar包
借助第三方打包插件,将第三方依赖打入到jar包中
<plugin>
<!-- maven 打包插件 打原始jar包 第三方依赖打入jar包中-->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!--这里要替换成jar包main方法所在类 -->
<mainClass>per.spark.test.WordCount</mainClass>
</manifest>
<manifestEntries>
<Class-Path>.</Class-Path>
</manifestEntries>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
将打好jar包部署到spark集群中
启动Spark
[[email protected] spark-2.4.4]# sbin/start-all.sh
提交Spark任务
[[email protected] spark-2.4.4]# bin/spark-submit --master spark://Spark:7077 --class per.spark.test.WordCount --total-executor-cores 2 /root/spark-1.0-SNAPSHOT.jar
Flume
配置文件
在flume的conf目录下创建一个streaming-flume.conf文件,添加如下配置
# 声明source、sink、channel的名字
a1.sources = s1
a1.sinks = k1
a1.channels = c1
# 配置 source
a1.sources.s1.type= netcat
a1.sources.s1.bind = Spark
# 从41414端口接收数据
a1.sources.s1.port = 41414
a1.sources.s1.channels = c1
# 配置 sink
a1.sinks.k1.type= avro
a1.sinks.k1.hostname= Spark
# 将数据输出到4444端口
a1.sinks.k1.port= 4444
a1.sinks.k1.channel = c1
# 配置 channel
a1.channels.c1.type= memory
启动flume
[[email protected] flume-1.9.0]# ./bin/flume-ng agent --conf conf/ --conf-file conf/streaming-flume.conf --name a1 -Dflume.root.logger=INFO,console
测试
# 连接41414端口
[[email protected] ~]# nc Saprk 41414
hello hello
ok
hadoop
ok
结果
-------------------------------------------
Time: 1545459835000 ms
-------------------------------------------
(hello,2)
(hadoop,1)