天天看点

Spark Streaming + Flume整合Spark Streaming + Flume

Spark Streaming + Flume

以单词计数为例

流程:

41414 ---> Flume ----> 4444 ---> SparkStreaming

flume从41414端口接收数据,处理完数据后,将数据发送到4444端口,SparkStreaming接收4444端口的数据。

Spark Streaming项目

相关依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <!-- 2.11 scala的版本 -->
    <!-- 2.4.4 spark的版本 -->
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.4</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.4.4</version>
    <!--集群中运行打开,本地运行注释-->
    <!--<scope>provided</scope>-->
</dependency>
<!-- spark-streaming-flume的整合jar -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.4.4</version>
</dependency>
           

代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
	def main(args: Array[String]): Unit = {
		val conf = new SparkConf().setAppName("wordCount").setMaster("spark:Spark://7077")
		val ssc = new StreamingContext(conf,Seconds(5)) //每5秒刷新一次
		ssc.sparkContext().setLogLevel("ERROR")  //提高日志输出级别
		//创建DStream
		val flumeStream = FlumeUtils.createStream(
      		ssc,
      		"Spark",  //hostname
      		4444   //port 监听4444端口传来的数据
      	)
      	flumeStream
      		.map(x => new String(x.event.getBody.array()).trim)
      		.flatMap(_.split(" "))
      		.map((_,1))
      		.reduceByKey(_+_)
      		.print()
      	//开启
      	ssc.start()
      	//关闭
      	ssc.awaitTermination()
	}
}
           

打jar包

借助第三方打包插件,将第三方依赖打入到jar包中

<plugin>
     <!-- maven 打包插件 打原始jar包 第三方依赖打入jar包中-->
     <artifactId>maven-assembly-plugin</artifactId>
     <configuration>
         <archive>
             <manifest>
                 <!--这里要替换成jar包main方法所在类 -->
                 <mainClass>per.spark.test.WordCount</mainClass>
             </manifest>
             <manifestEntries>
                 <Class-Path>.</Class-Path>
             </manifestEntries>
         </archive>
         <descriptorRefs>
             <descriptorRef>jar-with-dependencies</descriptorRef>
         </descriptorRefs>
     </configuration>
     <executions>
         <execution>
             <id>make-assembly</id> <!-- this is used for inheritance merges -->
             <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
             <goals>
                 <goal>single</goal>
             </goals>
         </execution>
     </executions>
 </plugin>
           

将打好jar包部署到spark集群中

启动Spark

[[email protected] spark-2.4.4]# sbin/start-all.sh

提交Spark任务

[[email protected] spark-2.4.4]# bin/spark-submit --master spark://Spark:7077 --class per.spark.test.WordCount --total-executor-cores 2 /root/spark-1.0-SNAPSHOT.jar

Flume

配置文件

在flume的conf目录下创建一个streaming-flume.conf文件,添加如下配置

# 声明source、sink、channel的名字
a1.sources = s1 
a1.sinks = k1
a1.channels = c1
 
# 配置 source
a1.sources.s1.type= netcat
a1.sources.s1.bind = Spark
# 从41414端口接收数据
a1.sources.s1.port = 41414
a1.sources.s1.channels = c1
 
# 配置 sink
a1.sinks.k1.type= avro
a1.sinks.k1.hostname= Spark
# 将数据输出到4444端口
a1.sinks.k1.port= 4444
a1.sinks.k1.channel = c1
 
# 配置 channel
a1.channels.c1.type= memory
           

启动flume

[[email protected] flume-1.9.0]# ./bin/flume-ng agent --conf conf/ --conf-file conf/streaming-flume.conf --name a1 -Dflume.root.logger=INFO,console

测试

# 连接41414端口
[[email protected] ~]# nc Saprk 41414
hello hello
ok
hadoop
ok 
           

结果

-------------------------------------------
Time: 1545459835000 ms
-------------------------------------------
(hello,2)
(hadoop,1)
           

继续阅读