flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。
Spark Streaming对接FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Poll拉取数据。
6.1 Poll方式
(1)安装flume1.6以上
(2)下载依赖包
spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目录下
(3)修改flume/lib下的scala依赖包版本
从spark安装目录的jars文件夹下找到scala-library-2.11.8.jar 包,替换掉flume的lib目录下自带的scala-library-2.10.1.jar。
(4)写flume的agent,注意既然是拉取的方式,那么flume向自己所在的机器上产数据就行
(5)编写flume-poll.conf配置文件
a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname=hdp-node-01 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 2000 |
flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-poll.conf -Dflume.root.logger=INFO,console |
服务器上的 /root/data目录下准备数据文件data.txt
(5)启动spark-streaming应用程序,去flume所在机器拉取数据
(6)代码实现
需要添加pom依赖
[AppleScript] 纯文本查看 复制代码
?
1 2 3 4 5 | < dependency > < groupId > org.apache.spark < / groupId > < artifactId > spark - streaming - flume_ 2.1 1 < / artifactId > < version > 2.0 . 2 < / version > < / dependency > |
具体代码如下:
[AppleScript] 纯文本查看 复制代码 ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | package cn.itcast.Flume import org.apache.spark. { SparkConf , SparkContext } import org.apache.spark.streaming. { Seconds , StreamingContext } import org.apache.spark.streaming.dstream. { DStream , ReceiverInputDStream } import org.apache.spark.streaming.flume. { FlumeUtils , SparkFlumeEvent } / / todo : sparkStreaming整合flume ----采用的是拉模式 object SparkStreamingPollFlume { def main ( args : Array[String] ) : Unit = { / / 1 、创建sparkConf val sparkConf : SparkConf = new SparkConf ( ) .setAppName ( "SparkStreamingPollFlume" ) .setMaster ( "local[2]" ) / / 2 、创建sparkContext val sc = new SparkContext ( sparkConf ) sc.setLogLevel ( "WARN" ) / / 3 、创建streamingContext val ssc = new StreamingContext ( sc , Seconds ( 5 ) ) ssc.checkpoint ( "./flume" ) / / 4 、通过FlumeUtils调用createPollingStream方法获取flume中的数据 val pollingStream : ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream ( ssc , "192.168.200.100" , 8888 ) / / 5 、获取flume中 event 的body { "headers" : xxxxxx , "body" : xxxxx } val data : DStream[String] = pollingStream.map ( x = > new String ( x. event .getBody.array ( ) ) ) / / 6 、切分每一行 , 每个单词计为 1 val wordAndOne : DStream[ ( String , Int ) ] = data .flatMap ( _.split ( " " ) ) .map ( ( _ , 1 ) ) / / 7 、相同单词出现的次数累加 val result : DStream[ ( String , Int ) ] = wordAndOne.updateStateByKey ( updateFunc ) / / 8 、打印输出 result . print ( ) / / 9 、开启流式计算 ssc.start ( ) ssc.awaitTermination ( ) } / / currentValues : 他表示在当前批次每个单词出现的所有的 1 ( hadoop , 1 ) ( hadoop , 1 ) ( hadoop , 1 ) / / historyValues : 他表示在之前所有批次中每个单词出现的总次数 ( hadoop , 100 ) def updateFunc ( currentValues : Seq[Int] , historyValues : Option[Int] ) : Option[Int] = { val newValue : Int = currentValues.sum + historyValues.getOrElse ( ) Some ( newValue ) } } | |
(7)观察IDEA控制台输出
6.2 Push方式
(1)编写flume-push.conf配置文件
#push mode a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname=172.16.43.63 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 2000 |
注意配置文件中指明的hostname和port是spark应用程序所在服务器的ip地址和端口。
flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-push.conf -Dflume.root.logger=INFO,console |
(2)代码实现如下:
[AppleScript] 纯文本查看 复制代码 ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | package cn.test.spark import java.net.InetSocketAddress import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream. { DStream , ReceiverInputDStream } import org.apache.spark.streaming.flume. { FlumeUtils , SparkFlumeEvent } import org.apache.spark.streaming. { Seconds , StreamingContext } import org.apache.spark. { SparkConf , SparkContext } / * * * sparkStreaming整合flume 推模式Push * / object SparkStreaming_Flume_Push { / / newValues 表示当前批次汇总成的 ( word , 1 ) 中相同单词的所有的 1 / / runningCount 历史的所有相同 key 的 value 总和 def updateFunction ( newValues : Seq[Int] , runningCount : Option[Int] ) : Option[Int] = { val newCount = runningCount.getOrElse ( ) + newValues.sum Some ( newCount ) } def main ( args : Array[String] ) : Unit = { / / 配置sparkConf参数 val sparkConf : SparkConf = new SparkConf ( ) .setAppName ( "SparkStreaming_Flume_Push" ) .setMaster ( "local[2]" ) / / 构建sparkContext对象 val sc : SparkContext = new SparkContext ( sparkConf ) / / 构建StreamingContext对象,每个批处理的时间间隔 val scc : StreamingContext = new StreamingContext ( sc , Seconds ( 5 ) ) / / 设置日志输出级别 sc.setLogLevel ( "WARN" ) / / 设置检查点目录 scc.checkpoint ( "./" ) / / flume推数据过来 / / 当前应用程序部署的服务器ip地址,跟flume配置文件保持一致 val flumeStream : ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream ( scc , "172.16.43.63" , 8888 , StorageLevel.MEMORY_AND_DISK ) / / 获取flume中数据,数据存在 event 的body中,转化为String val lineStream : DStream[String] = flumeStream.map ( x = > new String ( x. event .getBody.array ( ) ) ) / / 实现单词汇总 val result : DStream[ ( String , Int ) ] = lineStream.flatMap ( _.split ( " " ) ) .map ( ( _ , 1 ) ) .updateStateByKey ( updateFunction ) result . print ( ) scc.start ( ) scc.awaitTermination ( ) } } } | |
(3) 启动执行
a. 先执行spark代码,
b. 然后在执行flume配置文件。
先把/root/data/ata.txt.COMPLETED 重命名为data.txt
(4) 观察IDEA控制台输出