天天看点

Flume学习02 — Source

Flume内置了大量的Sourece,其中Avro Source、Thrift Source、Spooling Directory Source、Kafka Source具有较好的性能和较广泛的使用场景,下面主要介绍这几种Source。

类型 说明
Avro Source 支持Avro协议(实际上是Avro RPC),内置支持。
Thrift Source 支持Thrift协议,内置支持。
Exec Source 基于Unix的command在标准输出上生产数据。
JMS Source 从JMS系统(消息、主题)中读取数据,该Source目前只在ActiveMQ中测试。
Spooling Directory Source 监控指定目录内数据变更。
Twitter 1% firehose Source 通过API持续下载Twitter数据,试验性质。
Netcat Source 监控某个端口,将流经端口的每一个文本行数据作为Event输入。
Sequence Generator Source 序列生成器数据源,生产序列数据。
Syslog Sources 读取syslog数据,产生Event,支持UDP和TCP两种协议。
HTTP Source 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式。
Kafka Source 从Kafka Topic中读取数据。
Stress Source 压力测试用。
Legacy Sources 兼容老的Flume OG中Source(0.9.x版本)
Custom Source 自定义Source
Scribe Source

Avro Source

Avro Source监听Avro端口,接收外部Avro客户端发送过来的Avro Event数据。在多级流中,Avro Source可以和前一个Flume Agent的Avro Sink配对,建立分层收集拓扑。Avro Source的配置如下表所示,表中只列出必填项,更多配置项请参考官方用户手册。

配置项 默认值 说明
channels 与Source连接的通道名称。
type Source的类型名称,值为avro。
port Source监听的端口
bind 服务器主机名或者IP地址

Avro Source配置参考如下,a1为Agent的实例名称。

a1.sources = r1                #a1中的Source
a1.channels = c1               #a1中的Channel
a1.sources.r1.type = avro      #Source的类型
a1.sources.r1.channels = c1    #指定Source r1数据发送到的Channel
a1.sources.r1.bind =    #Source绑定的地址
a1.sources.r1.port =       #Source监听的端口
           

Thrift Source

Thrift Source监听Thrift端口,接收外部Thrift客户端发送过来的Thrift Event数据。在多级流中,Thrift Source可以和前一个Flume Agent的Thrift Sink配对,建立分层收集拓扑。Thrift Source支持基于Kerberos身份验证的安全模式。Thrift Source的配置如下表所示,表中只列出必填项,更多配置项请参考官方用户手册。Thrift Source支持基于Kerberos身份验证的安全模式。

配置项 默认值 说明
channels 与Source连接的通道名称。
type Source的类型名称,type值为thrift。
port Source监听的端口
bind 服务器主机名或者IP地址

Thrift Source配置参考如下,a1为Agent的实例名称。

a1.sources = r1                #a1中的Source
a1.channels = c1               #a1中的Channel
a1.sources.r1.type = thrift    #Source的类型
a1.sources.r1.channels = c1    #指定Source r1数据发送到的Channel
a1.sources.r1.bind =    #Source绑定的地址
a1.sources.r1.port =       #Source监听的端口
           

Exec Source

Exec Source在启动时调用的Unix命令,该命令进程会持续地把标准日志数据输出到Exec Source,如果命令进程关闭,Exec Source也会关闭。Exec Source支持cat [named pipe]或者tail -F [file]命令。Exec Source最大的问题就是数据有可能丢失,因为当Channel接收Exec Source数据出错时或者抛出异常时,Exec Client并不能捕获到该错误。建议使用Spooling Directory Source代替。

配置项 默认值 说明
channels 与Source连接的通道名称。
type Source的类型名称,type值为exec。
command 运行的命令

Thrift Source配置参考如下,a1为Agent实例名称。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
           

JMS Source

JMS Source从队列或者Topic中读取数据,目前只在ActiveMQ中测试。在使用JMS Source时,必须在Flume ClassPath中添加JMS JAR包。

配置项 默认值 说明
channels 与Source连接的通道名称。
type Source的类型名称,type值为jms。
initialContextFactory org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory JNDI名称
providerURL JMS Provider URL
destinationName
destinationType 取值queue或者topic

JMS Source配置参考如下,a1为Agent实例名称。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = 
org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
           

Spooling Directory Source

Spooling Directory Source监听系统上的指定目录,当目录中有新的文件创建时,Spooling Directory Source会把新文件的内容读取并推送到Channel中,并且把已读取的文件重命名成指定格式或者把文件删除。由于数据是以文件的形式存放的系统中,Spooling Directory Source的可靠性非常高,即使是Flume Agent崩溃或者重启,数据也可以恢复。

配置项 默认值 说明
channels 与Source连接的通道名称。
type Source的类型名称,type值为spooldir。
spoolDir Source监听的目录

Spooling Directory Source配置参考如下,a1为Agent实例名称。

a1.sources = s1                 #a1中的Source
a1.channels = c1                #a1中的Channel
a1.sources.s1.type = spooldir   #Source的类型
a1.sources. s1.channels = c1    #指定Source r1数据发送到的Channel
a1.sources. s1.spoolDir = /var/log/apache/flumeSpool
           

Kafka Source

Kafka Source从消息队列Kafka Topic中读取日志消息,Kafka Source相当于消息队列的Consumer。可以把多个Kafka Source配置到同一个分组里面,这样每个Source都可以读取同一个Topic中的数据,从而提高性能。

配置项 默认值 说明
channels 与Source连接的通道名称。
type org.apache.flume.source.kafka,KafkaSource
zooKeeperConnect Kafka集群的ZooKeeper路径
groupId flume Kafka Source分组
topic Kafka Topic

Kafka Source配置参考如下,a1为Agent实例名称。

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.channels = channel1
a1.sources.source1.zookeeperConnect = localhost:
a1.sources.source1.topic = test1
a1.sources.source1.groupId = flume
a1.sources.source1.kafka.consumer.timeout.ms = 
           

NetCat Source

NetCat Source监听指定的端口,把接收到的数据按行划分,每行文本都封装成一个Event数据发送给Channel。

配置项 默认值 说明
channels 与Source连接的通道名称。
type netcat
bind 地址
port 端口

Netcat Source配置参考如下, a1为Agent实例名称。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 
a1.sources.r1.bind = 
a1.sources.r1.channels = c1
           

HTTP Source

HTTP Source接收POST和GET发送的Event数据,其中GET主要用户测试,不建议生产环境使用。HTTP数据通过handler(实现HTTPSourceHandler接口)转换成Event,该handler接收HttpServletRequest并返回Event数组。如果handler出现异常,HTTP Source返回400错误。如果Channel满了或者Channel无法接收Event,HTTP Source返回503错误。

配置项 默认值 说明
channels 与Source连接的通道名称。
type http
port HTTP Source监听的端口
handler org.apache.flume.source.http.JSONHandler 可选值有org.apache.flume.source.http.JSONHandler和org.apache.flume.sink.solr.morphline.BlobHandler

HTTP Source配置参考如下, a1为Agent的实例名称。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props