天天看点

Kafka学习之路(四)整合Flume和Kafka完成实时数据采集架构配置文件启动

整合Flume和Kafka完成实时数据采集

  • 架构
  • 配置文件
  • 启动
    • 启动agent2
    • 启动agent1
    • 启动消费者消费数据

架构

flume版本为1.7
Kafka学习之路(四)整合Flume和Kafka完成实时数据采集架构配置文件启动

agent1: exec source + memory channel + avro sink

agent2: avro source + memory channel + kafka sink

exec source:实时监控一个文件的内容是否有增加

avro source: 监听avro端口,并且接收来自外部avro信息,

avro sink:一般用于跨节点传输,主要绑定数据移动目的地的ip和port

配置文件

agent1

cd $FLUME_HOME/conf
vim exec-memory-avro.conf
           
# Name the components on this agent
a1.sources = exec-source
a1.sinks = avro-sink
a1.channels = memory-channel

# Describe/configure the source
a1.sources.exec-source.type = exec
a1.sources.exec-source.command = tail -F /opt/software/data/data.log
a1.sources.exec-source.shell = /bin/sh -c

#Describe the sink
a1.sinks.avro-sink.type = avro
a1.sinks.avro-sink.hostname = shouhou155
a1.sinks.avro-sink.port = 44444

# Use a channel which buffers events in memory
a1.channels.memory-channel.type = memory
a1.channels.memory-channel.capacity = 1000
a1.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.exec-source.channels = memory-channel
a1.sinks.avro-sink.channel = memory-channel
           

agent2

cd $FLUME_HOME/conf
vim avro-memory-kafka.conf
           
b1.sources = avro-source
b1.sinks = kafka-sink
b1.channels = memory-channel

b1.sources.avro-source.type = avro
b1.sources.avro-source.bind = shouhou155
b1.sources.avro-source.port = 44444

# Describe the sink
b1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
b1.sinks.kafka-sink.kafka.bootstrap.servers=192.168.3.153:9092
b1.sinks.kafka-sink.kafka.topic=mytest2
b1.sinks.kafka-sink.serializer.class=kafka.serializer.StringEncoder
b1.sinks.kafka-sink.kafka.producer.acks=1
b1.sinks.kafka-sink.custom.encoding=UTF-8

b1.channels.memory-channel.type = memory
b1.channels.memory-channel.capacity = 1000
b1.channels.memory-channel.transactionCapacity = 100

b1.sources.avro-source.channels = memory-channel
b1.sinks.kafka-sink.channel = memory-channel
           
agent2的source为agent1的sink,agent2的sink为kafka-sink

kafka sink 配置可以参考官网:链接

启动

注意:启动时要先启动agent2的avro-source,才能去启动agent1,不然agent1启动会被拒绝连接

启动agent2

#在shouhou155上启动agent2
flume-ng agent \
--name b1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
           
Kafka学习之路(四)整合Flume和Kafka完成实时数据采集架构配置文件启动
“Avro source avro-source started.” 说明启动成功

启动agent1

#在shouhou153上启动agent1
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
           
Kafka学习之路(四)整合Flume和Kafka完成实时数据采集架构配置文件启动

启动参数说明

1、flume-ng agent 运行一个Flume Agent

2、-‌-conf 指定配置文件,这个配置文件必须在全局选项的-‌-conf参数定义的目录下。

3、-z Zookeeper连接字符串。以逗号分隔的主机名列表:port

4、-p Zookeeper中的基本路径,用于存储代理配置

5、-‌-name a1 Agent1的名称a1,agent2的名称为b1

6、-Dflume.root.logger=INFO,console

该参数将会把flume的日志输出到console,为了将其输出到日志文件(默认在$FLUME_HOME/logs),可以将console改为LOGFILE形式,具体的配置可以修改$FLUME_HOME/conf/log4j.properties

-Dflume.log.file=./wchatAgent.logs 该参数直接输出日志到目标文件

启动消费者消费数据

在控制台启动一个消费者

[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server shouhou153:9092,shouhou155:9092,shouhou156:9092 --topic mytest2
hello spark
hello spark1
hello spark2
hello spark3
hello spark4
hello spark5
           

对data.log追加新的内容,查看是否可以消费的到。

Kafka学习之路(四)整合Flume和Kafka完成实时数据采集架构配置文件启动

在消费的时候,可以增加flumeBatchSize参数,含义是:一批中要处理多少条消息。较大的批次可提高吞吐量,同时增加延迟。

添加方法在agent2的配置文件中增加 :b1.sinks.kafka-sink.flumeBatchSize=5