天天看点

scribe数据接入flume问题解决方法

项目中有个程序将数据打包成scribe的格式后传递给scribe,现在需要继续使用这个发送程序,所以相当于flume需要接收scribe的数据。

一、配置方法

最新的flume-ng中有对scribe数据接收的支持,只需在flume配置文件中指定source的类型为Scribe Source 即可:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1           

在sink入hdfs的文件名、路径中,scribe配置的category值是可见的,可以通过配置把category值生成在hdfs的路径或文件名中:

# Describe the sink k1 to hadoop
collector1.sinks.k1.type = hdfs
collector1.sinks.k1.channel = c1
collector1.sinks.k1.hdfs.path = /quantone/flume/%{category}/10.0.3.83
collector1.sinks.k1.hdfs.fileType = DataStream
collector1.sinks.k1.hdfs.writeFormat = TEXT
collector1.sinks.k1.hdfs.rollInterval = 300
collector1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%{category}
collector1.sinks.k1.hdfs.round = true
collector1.sinks.k1.hdfs.roundValue = 5
collector1.sinks.k1.hdfs.roundUnit = minute
collector1.sinks.k1.hdfs.useLocalTimeStamp = true           

但是目前在Kafka Sink中,确无法解析出%{category}的值

# Describe the sink k2 to kafka
collector1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
collector1.sinks.k2.topic = %{category}
collector1.sinks.k2.channel = c2
collector1.sinks.k2.brokerList = 10.0.3.178:9092,10.0.3.179:9092
collector1.sinks.k2.requiredAcks = 1
collector1.sinks.k2.batchSize = 20
           

此处想通过配置category来指定kafka的topic是无法生效的,最终会将topic指定为%{category}导致数据无法写入kafka,指定kafka的topic有两种方式,一是在此处配置kafka的真实topic,二是在event的header中写入topic,可以用 Static Interceptor将数据写入header,如下:

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = topic
a1.sources.r1.interceptors.i1.value = flume_test           

二、数据分流

如果要按逻辑将不同类型的数据选择性地发给hadoop或者kafka,我们依然需要使用interceptors,方法是先将键-值对写入event的header中,然后在下一级的source中用selector将不同的数据放入不同的channel中,达到区分数据流向的目的,配置如下:

agent层中配置interceptors,将scribe中的category作为static类型的值存放在header中,键为flowtype

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i2
a1.sources.r1.interceptors.i2.type = static
a1.sources.r1.interceptors.i2.key = flowtype
a1.sources.r1.interceptors.i2.value = %{category}           

在collector层的source配置中,添加一个 channel selector:

# Describe the source
collector.sources.r1.type = avro
collector.sources.r1.port = 5150
collector.sources.r1.bind = 0.0.0.0
collector.sources.r1.channels = c1 c2
collector.sources.r1.selector.type = multiplexing
collector.sources.r1.selector.header = flowtype
collector.sources.r1.selector.mapping.tohadoop = c1
collector.sources.r1.selector.mapping.tokafka = c2
collector.sources.r1.selector.default = c1           

这样对于category为“tohadoop”的数据,将会被放置到c1中,而category为“tokafka”的数据将会被放到c2里,然后定义两个sink  k1,k2,k1从c1取数据存入hadoop,k2从c2取数据存入kafka即可