天天看点

spark笔记之Spark Streaming整合flume实战

flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。

Spark Streaming对接FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Poll拉取数据。

6.1 Poll方式

(1)安装flume1.6以上

(2)下载依赖包

spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目录下

(3)修改flume/lib下的scala依赖包版本

从spark安装目录的jars文件夹下找到scala-library-2.11.8.jar 包,替换掉flume的lib目录下自带的scala-library-2.10.1.jar。

(4)写flume的agent,注意既然是拉取的方式,那么flume向自己所在的机器上产数据就行

(5)编写flume-poll.conf配置文件

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#source

a1.sources.r1.channels = c1

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /root/data

a1.sources.r1.fileHeader = true

#channel

a1.channels.c1.type =memory

a1.channels.c1.capacity = 20000

a1.channels.c1.transactionCapacity=5000

#sinks

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink

a1.sinks.k1.hostname=hdp-node-01

a1.sinks.k1.port = 8888

a1.sinks.k1.batchSize= 2000                           

flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-poll.conf -Dflume.root.logger=INFO,console

服务器上的 /root/data目录下准备数据文件data.txt

spark笔记之Spark Streaming整合flume实战

(5)启动spark-streaming应用程序,去flume所在机器拉取数据

(6)代码实现

需要添加pom依赖

[AppleScript] 纯文本查看 复制代码

?

1

2

3

4

5

<

dependency

>

<

groupId

>

org.apache.spark

<

/

groupId

>

<

artifactId

>

spark

-

streaming

-

flume_

2.1

1

<

/

artifactId

>

<

version

>

2.0

.

2

<

/

version

>

<

/

dependency

>

具体代码如下:

[AppleScript] 纯文本查看 复制代码

?

01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

package cn.itcast.Flume

import org.apache.spark.

{

SparkConf

,

SparkContext

}

import org.apache.spark.streaming.

{

Seconds

,

StreamingContext

}

import org.apache.spark.streaming.dstream.

{

DStream

,

ReceiverInputDStream

}

import org.apache.spark.streaming.flume.

{

FlumeUtils

,

SparkFlumeEvent

}

/

/

todo

:

sparkStreaming整合flume

----采用的是拉模式

object SparkStreamingPollFlume

{

def

main

(

args

:

Array[String]

)

:

Unit

=

{

/

/

1

、创建sparkConf

val sparkConf

:

SparkConf

=

new

SparkConf

(

)

.setAppName

(

"SparkStreamingPollFlume"

)

.setMaster

(

"local[2]"

)

/

/

2

、创建sparkContext

val sc

=

new

SparkContext

(

sparkConf

)

sc.setLogLevel

(

"WARN"

)

/

/

3

、创建streamingContext

val ssc

=

new

StreamingContext

(

sc

,

Seconds

(

5

)

)

ssc.checkpoint

(

"./flume"

)

/

/

4

、通过FlumeUtils调用createPollingStream方法获取flume中的数据

val pollingStream

:

ReceiverInputDStream[SparkFlumeEvent]

=

FlumeUtils.createPollingStream

(

ssc

,

"192.168.200.100"

,

8888

)

/

/

5

、获取flume中

event

的body

{

"headers"

:

xxxxxx

,

"body"

:

xxxxx

}

val

data

:

DStream[String]

=

pollingStream.map

(

x

=

>

new

String

(

x.

event

.getBody.array

(

)

)

)

/

/

6

、切分每一行

,

每个单词计为

1

val wordAndOne

:

DStream[

(

String

,

Int

)

]

=

data

.flatMap

(

_.split

(

" "

)

)

.map

(

(

_

,

1

)

)

/

/

7

、相同单词出现的次数累加

val

result

:

DStream[

(

String

,

Int

)

]

=

wordAndOne.updateStateByKey

(

updateFunc

)

/

/

8

、打印输出

result

.

print

(

)

/

/

9

、开启流式计算

ssc.start

(

)

ssc.awaitTermination

(

)

}

/

/

currentValues

:

他表示在当前批次每个单词出现的所有的

1

(

hadoop

,

1

)

(

hadoop

,

1

)

(

hadoop

,

1

)

/

/

historyValues

:

他表示在之前所有批次中每个单词出现的总次数  

(

hadoop

,

100

)

def updateFunc

(

currentValues

:

Seq[Int]

,

historyValues

:

Option[Int]

)

:

Option[Int]

=

{

val newValue

:

Int

=

currentValues.sum

+

historyValues.getOrElse

(

)

Some

(

newValue

)

}

}

(7)观察IDEA控制台输出

spark笔记之Spark Streaming整合flume实战

6.2 Push方式

(1)编写flume-push.conf配置文件

#push mode

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#source

a1.sources.r1.channels = c1

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /root/data

a1.sources.r1.fileHeader = true

#channel

a1.channels.c1.type =memory

a1.channels.c1.capacity = 20000

a1.channels.c1.transactionCapacity=5000

#sinks

a1.sinks.k1.channel = c1

a1.sinks.k1.type = avro

a1.sinks.k1.hostname=172.16.43.63

a1.sinks.k1.port = 8888

a1.sinks.k1.batchSize= 2000                        

注意配置文件中指明的hostname和port是spark应用程序所在服务器的ip地址和端口。

flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-push.conf -Dflume.root.logger=INFO,console

(2)代码实现如下:

[AppleScript] 纯文本查看 复制代码

?

01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

package cn.test.spark

import java.net.InetSocketAddress

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.dstream.

{

DStream

,

ReceiverInputDStream

}

import org.apache.spark.streaming.flume.

{

FlumeUtils

,

SparkFlumeEvent

}

import org.apache.spark.streaming.

{

Seconds

,

StreamingContext

}

import org.apache.spark.

{

SparkConf

,

SparkContext

}

/

*

*

*

sparkStreaming整合flume  推模式Push

*

/

object SparkStreaming_Flume_Push

{

/

/

newValues 表示当前批次汇总成的

(

word

,

1

)

中相同单词的所有的

1

/

/

runningCount 历史的所有相同

key

value

总和

def updateFunction

(

newValues

:

Seq[Int]

,

runningCount

:

Option[Int]

)

:

Option[Int]

=

{

val newCount

=

runningCount.getOrElse

(

)

+

newValues.sum

Some

(

newCount

)

}

def

main

(

args

:

Array[String]

)

:

Unit

=

{

/

/

配置sparkConf参数

val sparkConf

:

SparkConf

=

new

SparkConf

(

)

.setAppName

(

"SparkStreaming_Flume_Push"

)

.setMaster

(

"local[2]"

)

/

/

构建sparkContext对象

val sc

:

SparkContext

=

new

SparkContext

(

sparkConf

)

/

/

构建StreamingContext对象,每个批处理的时间间隔

val scc

:

StreamingContext

=

new

StreamingContext

(

sc

,

Seconds

(

5

)

)

/

/

设置日志输出级别

sc.setLogLevel

(

"WARN"

)

/

/

设置检查点目录

scc.checkpoint

(

"./"

)

/

/

flume推数据过来

/

/

当前应用程序部署的服务器ip地址,跟flume配置文件保持一致

val flumeStream

:

ReceiverInputDStream[SparkFlumeEvent]

=

FlumeUtils.createStream

(

scc

,

"172.16.43.63"

,

8888

,

StorageLevel.MEMORY_AND_DISK

)

/

/

获取flume中数据,数据存在

event

的body中,转化为String

val lineStream

:

DStream[String]

=

flumeStream.map

(

x

=

>

new

String

(

x.

event

.getBody.array

(

)

)

)

/

/

实现单词汇总

val

result

:

DStream[

(

String

,

Int

)

]

=

lineStream.flatMap

(

_.split

(

" "

)

)

.map

(

(

_

,

1

)

)

.updateStateByKey

(

updateFunction

)

result

.

print

(

)

scc.start

(

)

scc.awaitTermination

(

)

}

}

}

(3) 启动执行

a. 先执行spark代码,

spark笔记之Spark Streaming整合flume实战

b. 然后在执行flume配置文件。

先把/root/data/ata.txt.COMPLETED 重命名为data.txt

spark笔记之Spark Streaming整合flume实战

(4) 观察IDEA控制台输出

spark笔记之Spark Streaming整合flume实战

继续阅读