整合Flume和Kafka完成实时数据采集
- 架构
- 配置文件
- 启动
-
- 启动agent2
- 启动agent1
- 启动消费者消费数据
架构
flume版本为1.7
agent1: exec source + memory channel + avro sink
agent2: avro source + memory channel + kafka sink
exec source:实时监控一个文件的内容是否有增加
avro source: 监听avro端口,并且接收来自外部avro信息,
avro sink:一般用于跨节点传输,主要绑定数据移动目的地的ip和port
配置文件
agent1
cd $FLUME_HOME/conf
vim exec-memory-avro.conf
# Name the components on this agent
a1.sources = exec-source
a1.sinks = avro-sink
a1.channels = memory-channel
# Describe/configure the source
a1.sources.exec-source.type = exec
a1.sources.exec-source.command = tail -F /opt/software/data/data.log
a1.sources.exec-source.shell = /bin/sh -c
#Describe the sink
a1.sinks.avro-sink.type = avro
a1.sinks.avro-sink.hostname = shouhou155
a1.sinks.avro-sink.port = 44444
# Use a channel which buffers events in memory
a1.channels.memory-channel.type = memory
a1.channels.memory-channel.capacity = 1000
a1.channels.memory-channel.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.exec-source.channels = memory-channel
a1.sinks.avro-sink.channel = memory-channel
agent2
cd $FLUME_HOME/conf
vim avro-memory-kafka.conf
b1.sources = avro-source
b1.sinks = kafka-sink
b1.channels = memory-channel
b1.sources.avro-source.type = avro
b1.sources.avro-source.bind = shouhou155
b1.sources.avro-source.port = 44444
# Describe the sink
b1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
b1.sinks.kafka-sink.kafka.bootstrap.servers=192.168.3.153:9092
b1.sinks.kafka-sink.kafka.topic=mytest2
b1.sinks.kafka-sink.serializer.class=kafka.serializer.StringEncoder
b1.sinks.kafka-sink.kafka.producer.acks=1
b1.sinks.kafka-sink.custom.encoding=UTF-8
b1.channels.memory-channel.type = memory
b1.channels.memory-channel.capacity = 1000
b1.channels.memory-channel.transactionCapacity = 100
b1.sources.avro-source.channels = memory-channel
b1.sinks.kafka-sink.channel = memory-channel
agent2的source为agent1的sink,agent2的sink为kafka-sink
kafka sink 配置可以参考官网:链接
启动
注意:启动时要先启动agent2的avro-source,才能去启动agent1,不然agent1启动会被拒绝连接
启动agent2
#在shouhou155上启动agent2
flume-ng agent \
--name b1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
“Avro source avro-source started.” 说明启动成功
启动agent1
#在shouhou153上启动agent1
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
启动参数说明
1、flume-ng agent 运行一个Flume Agent
2、--conf 指定配置文件,这个配置文件必须在全局选项的--conf参数定义的目录下。
3、-z Zookeeper连接字符串。以逗号分隔的主机名列表:port
4、-p Zookeeper中的基本路径,用于存储代理配置
5、--name a1 Agent1的名称a1,agent2的名称为b1
6、-Dflume.root.logger=INFO,console
该参数将会把flume的日志输出到console,为了将其输出到日志文件(默认在$FLUME_HOME/logs),可以将console改为LOGFILE形式,具体的配置可以修改$FLUME_HOME/conf/log4j.properties
-Dflume.log.file=./wchatAgent.logs 该参数直接输出日志到目标文件
启动消费者消费数据
在控制台启动一个消费者
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server shouhou153:9092,shouhou155:9092,shouhou156:9092 --topic mytest2
hello spark
hello spark1
hello spark2
hello spark3
hello spark4
hello spark5
对data.log追加新的内容,查看是否可以消费的到。
在消费的时候,可以增加flumeBatchSize参数,含义是:一批中要处理多少条消息。较大的批次可提高吞吐量,同时增加延迟。
添加方法在agent2的配置文件中增加 :b1.sinks.kafka-sink.flumeBatchSize=5