天天看点

Flume总结Flume总结 --huzhan

Flume总结 --huzhan

1、 你是如何实现Flume数据传输的监控的

使用第三方框架 Ganglia 实时监控 Flume。

2 、Flume的 Source,Sink,Channel的作用?你们 Source是什么类型?

1、作用

(1)Source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,

包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy

(2)Channel 组件对采集到的数据进行缓存,可以存放在 Memory 或 File 中。

(3)Sink 组件是用于把数据发送到目的地的组件,目的地包括 HDFS、Logger、avro、

thrift、ipc、file、Hbase、solr、自定义。

2、我公司采用的 Source 类型为

(1)监控后台日志:exec

(2)监控后台产生日志的端口:netcat

Exec spooldir

3、Flume 的Channel Selectors

Channel Selectors,可以让不同的项目日志通过不同的Channel到不同的Sink中去。

官方文档上Channel Selectors 有两种类型:Replicating Channel Selector (default)和Multiplexing Channel Selector

这两种Selector的区别是:Replicating 会 将source过来的events发往所有channel,而Multiplexing可以选择该发往哪些Channel。

4、 Flume 参数调优

1、Source

增加 Source 个(使用 Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source 的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数,适当调大这个参数可以提高 Source 搬运 Event 到 Channel 时的性能。

2、Channel

type 选择 memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。type 选择 file 时 Channel 的容错性更好,但是性能上会比 memory channel 差。使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。Capacity 参数决定 Channel 可容纳最大的 event 条数。transactionCapacity 参数决定每次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大 event条数。

transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。

3、Sink

增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好够用就行,过多的 Sink 会占用系统资源,造成系统资源不必要的浪费。batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数,适当调大这个参数可以提高 Sink 从 Channel 搬出 event 的性能。

5 、Flume 的事务机制

Flume 的事务机制(类似数据库的事务机制):Flume 使用两个独立的事务分别负责从Soucrce 到 Channel,以及从 Channel 到 Sink 的事件传递。比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那么 Soucrce 就将该文件标记为完成。同理,事务以类似的方式处理从 Channel 到 Sink 的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到 Channel 中,等待重新传递。

6、 Flume采集数据会丢失吗?

根据 Flume 的架构原理,Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的,Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是 Channel 采用 memoryChannel,agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。

Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复。

7、介绍一下flume的channel

channel被设计为event中转临时缓冲区,存储source收集并且没有被sink读取的event,平衡source收集和sink读取的速度,可以将其视为flume内部的消息队列。channel线程安全并且具有事务性,支持source写失败写,sink读失败重复读的操作。常见的类型包括Memory Channel,File Channel,Kafka Channel

8、Memory Channel与File Channel的优缺点

Memory Channel读写速度快,但是存储数据量小。Flume进程挂掉、服务器停机或者重启都会导致数据丢失。在资源充足,不关心数据丢失的场景下可以使用。

File Channel存储容量大,无数据丢失的风险。读写速度慢,但可以通过配置多磁盘文件路径,通过磁盘并行写入提高File Channel性能。Flume将Event顺序写入到File Channel文件的末尾,可以通过配置maxFileSize参数配置数据文件大小,当文件大小达到这个值,创建新的文件,并将该文件设置为只读,直到Flume把该文件读取完成,删除该文件。

9、Kafka Channel的优点有哪些

Memory Channel有很大程度丢失数据的风险,File Channel虽然无数据丢失风险,但如果缓存下来的消息来没来得及写入Sink,Agent就出现故障,File Channel中的消息一样不能被继续使用。Kafka的容错能力解决了这一点。

Flume一旦配置了Kafka为Channel,则不再需要配置Sink组件,减少了Flume启动的进程数,降低了服务器内存、磁盘等资源的使用率。

4.Flume的拦截器是什么

Source在将Event写入到Channel之前可以使用拦截器对Event进行各种形式的处理,Source和Channel之间可以设置多个拦截器,不同的拦截器可以设置不同的规则对Event进行处理

5.Flume的选择器是什么

Source发送的Event通过Channel选择器来选择以哪种方式写入到Channel中,Flume提供了三种类型的选择器,复制选择器、复用选择器以及自定义选择器

1)复制选择器:一个Source以复制的方式将一个Event写入到多个Channel中,不同的Sink可以从不同的Channel中获取到相同的Event。

如果Source没有指定Channel选择器,则该SOurce使用复制Channel选择器,复制选择器有一个配置参数optional,该参数指定的所有channel是可选的,当时间写入到这些channel时有失败发生,则忽略这些失败,否则抛出异常,要求Source重试。

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
           

2)复用选择器:需要和拦截器配合使用,根据Event的头信息的不同写入到不同的Channel中。

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.optional.US = c4
a1.sources.r1.selector.default = c4
           

3)自定义选择器:自定义选择器需要实现ChannelSelector接口,或者继承AbstractChannelSelector类。

10、了解Flume的负载均衡和故障转移吗

设置sink组,同一个sink组内有多个sink,不同sink之间可以配置成负载均衡或故障转移

11、Flume的事务机制

flume基于事务传输event(批量传输),使用两个独立的事务分别处理source到channel和channel到sink,失败时会将所有数据回滚进行重试。该事务遵循“最少一次”语义,因此数据不会丢失,但有可能重复。

source-channel之间的重复可以靠TailDir Source自带的断点续传功能解决

put事务:

1)doPut:将批数据先写入到临时缓冲区putLIst(putList就是一个临时的缓冲区)

2)doCommit:检查channel内存队列是否足够合并

3)doRollback:channel内存队列空间不足,回滚,等待内存通道的容量满足合并

channel-sink之间的重复,可以延长等待时间,或者设置UUID拦截器,然后再redis里维护一个布隆表来使下游实时应用去重。

take事务:

1)doTake:将数据取到临时缓冲区takeList

2)将数据发送到下一个节点

3)doCommit:如果数据全部发送成功,则清除临时缓冲区takeList

4)doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列

TairDir Source配置

# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1

# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1

######## source相关配置 ########
# source类型
agent.sources.s1.type = TAILDIR
# 元数据位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 监控的目录
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true

######## channel相关配置 ########
# channel类型
agent.channels.c1.type = file
# 数据存放路径
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 检查点路径
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多缓存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐给sink多少
agent.channels.c1.transactionCapacity = 100

######## sink相关配置 ########
# sink类型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 压缩
agent.sinks.r1.kafka.producer.compression.type = snappy
           

12、Flume参数调优(二)

Source:

1)增加Source个数,可以增大Source读取数据的能力。

2)batchSize参数决定Source一次批量运输到Channel的event条数,适当调大这个参数可以提高Source搬运Event到Channel时的性能。

Channel:

1)使用File Channel时dataDirs配置多个不同盘下的目录可以提高性能

2)Capacity参数决定Channel可容纳最大的Event条数。transactionCapacty参数决定每次Source往Channel里面写的最大event条数和每次sink从channel里面读的最大event条数,transactionCapacty需要大于Source和Sink的batchSize参数

Sink:

1)适当增加Sink的个数可以增加Sink消费event的能力,但过多的sink会占用系统资源,造成不必要的浪费

2)batchSize参数决定Sink批量从Channel读取的event条数,适当调大这个参数可以提高Sink从Channel搬运Event的性能。