天天看點

Flume之——配置多個Sink源(一個Source對應多個Channel和Sink)

轉載請注明出處:https://blog.csdn.net/l1028386804/article/details/98055100

配置模型如下圖:

Flume之——配置多個Sink源(一個Source對應多個Channel和Sink)

Flume的配置如下:

myagent.sources = r1
myagent.sinks = k1 k2
myagent.channels = c1 c2
myagent.sources.r1.selector.type = replicating
# 訂閱/配置Source源
myagent.sources.r1.type = http
myagent.sources.r1.port = 5140
myagent.sources.r1.handler = org.apache.flume.source.http.JSONHandler

myagent.sources.r1.channels = c1 c2

# 設定Memmory Channel
myagent.sinks.k1.channel = c1
myagent.channels.c1.type = memory
myagent.channels.c1.capacity = 1000
myagent.channels.c1.transactionCapacity = 100

myagent.sinks.k2.channel = c2
myagent.channels.c2.type = memory
myagent.channels.c2.capacity = 1000
myagent.channels.c2.transactionCapacity = 100

#####發送到Kafka####
# 訂閱k1 Sink
myagent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.k1.topic = test
myagent.sinks.k1.brokerList = binghe100:9092
myagent.sinks.k1.requiredAcks = 1
myagent.sinks.k1.batchSize = 20

#####發送到HDFS#####
# 訂閱Sink
myagent.sinks.k2.type = hdfs
myagent.sinks.k2.hdfs.path = hdfs://binghe100:9000/flume/%Y%m%d
myagent.sinks.k2.hdfs.filePrefix = log_%H_%M
myagent.sinks.k2.hdfs.fileSuffix = .log
myagent.sinks.k2.hdfs.useLocalTimeStamp = true
myagent.sinks.k2.hdfs.writeFormat = Text
myagent.sinks.k2.hdfs.fileType = DataStream
#### 一個小時儲存一次
myagent.sinks.k2.hdfs.round = true
myagent.sinks.k2.hdfs.roundValue = 1
myagent.sinks.k2.hdfs.roundUnit = hour
#### 檔案達到1M寫新檔案
myagent.sinks.k2.hdfs.rollInterval = 0
myagent.sinks.k2.hdfs.rollSize=1048576
myagent.sinks.k2.hdfs.rollCount=0

myagent.sinks.k2.hdfs.batchSize = 100
myagent.sinks.k2.hdfs.threadsPoolSize = 10
myagent.sinks.k2.hdfs.idleTimeout = 0
myagent.sinks.k2.hdfs.minBlockReplicas = 1
           

配置說明如下圖所示:

Flume之——配置多個Sink源(一個Source對應多個Channel和Sink)

直接監聽Nginx日志變化的配置如下:

myagent.sources = s1
myagent.sinks = k1 k2
myagent.channels = c1 c2
myagent.sources.s1.selector.type = replicating
# 訂閱/配置Source源
myagent.sources.s1.batchsize=10
myagent.sources.s1.type = exec
myagent.sources.s1.command = tail -F /usr/local/nginx-1.17.2/logs/access.log
 
myagent.sources.s1.channels = c1 c2
 
# 設定Memmory Channel
myagent.sinks.k1.channel = c1
myagent.channels.c1.type = memory
myagent.channels.c1.capacity = 1000
myagent.channels.c1.transactionCapacity = 100
 
myagent.sinks.k2.channel = c2
myagent.channels.c2.type = memory
myagent.channels.c2.capacity = 1000
myagent.channels.c2.transactionCapacity = 100
 
#####發送到Kafka####
# 訂閱k1 Sink
myagent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.k1.topic = test
myagent.sinks.k1.brokerList = binghe100:9092
myagent.sinks.k1.requiredAcks = 1
myagent.sinks.k1.batchSize = 20
 
#####發送到HDFS#####
# 訂閱Sink
myagent.sinks.k2.type = hdfs
myagent.sinks.k2.hdfs.path = hdfs://binghe100:9000/flume/%Y%m%d
myagent.sinks.k2.hdfs.filePrefix = log_%H_%M
myagent.sinks.k2.hdfs.fileSuffix = .log
myagent.sinks.k2.hdfs.useLocalTimeStamp = true
myagent.sinks.k2.hdfs.writeFormat = Text
myagent.sinks.k2.hdfs.fileType = DataStream
#### 一個小時儲存一次
myagent.sinks.k2.hdfs.round = true
myagent.sinks.k2.hdfs.roundValue = 1
myagent.sinks.k2.hdfs.roundUnit = hour
#### 檔案達到1M寫新檔案
myagent.sinks.k2.hdfs.rollInterval = 0
myagent.sinks.k2.hdfs.rollSize=1048576
myagent.sinks.k2.hdfs.rollCount=0
 
myagent.sinks.k2.hdfs.batchSize = 100
myagent.sinks.k2.hdfs.threadsPoolSize = 10
myagent.sinks.k2.hdfs.idleTimeout = 0
myagent.sinks.k2.hdfs.minBlockReplicas = 1
           

注意:flume中sink到hdfs,檔案系統頻繁産生檔案,檔案滾動配置不起作用,可以将minBlockReplicas設定為1。如下設定項:

myagent.sinks.k2.hdfs.minBlockReplicas = 1
           

參考文章連結如下:

1.Flume中的HDFS Sink配置參數說明:

http://lxw1234.com/archives/2015/10/527.htm

2.Flume(NG)架構設計要點及配置實踐

http://shiyanjun.cn/archives/915.html

3.Flume NG 簡介及配置實戰

https://yq.aliyun.com/articles/50487

4.flume官網

http://flume.apache.org/FlumeUserGuide.html#hdfs-sink