Failover Sink Processor
Failover Sink Processor維護了一個多個sink的有優先級的清單,按照優先級保證,至少有一個sink是可以幹活的(處理event的)
如果根據優先級發現,優先級高的sink故障了,故障的sink會被轉移到一個故障的池中冷卻.
在冷卻時,故障的sink也會不管嘗試發送event,一旦發送成功,此時會将故障的sink再移動到存活的池中.
必需配置:
sinks – Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority. – Priority value. must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority
案例
使用Flume1監控一個端口,其sink組中的sink分别對接Flume2和Flume3,采用FailoverSinkProcessor,實作故障轉移的功能。

案例是三個agent, 然後有一個channel ,兩個sink ,兩個sink都是從這一個channel裡面拿資料的,如果你有多個sink需要對接一個channel的話,那麼你就需要組成一個sink組.skin組挑哪個sink從channel裡面拿資料?就看sinkProcessor處理器了.
FailoverSinkProcessor是從兩個Sink裡面挑選出一個優先級最高的sink去幹活兒.假如說Sink1優先級最高,那麼就對接Flume2的agent, 而Flume3是不幹活兒的,因為skin2優先級低.
然後模拟一種特殊的情況,就是Flume2的agent挂掉了,那麼flume2的agent綁定的端口就停了,那麼skin1就發不出去了,那麼就相當于Sink1故障了.
這個時候FailoverSinkProcessor就将sink1拿出去,讓剩下的優先級最高的skin2去幹活兒. sink2對接的是flume3,那麼flume3 就可以幹活兒了.
開始編寫配置檔案
配置檔案名字都叫demo4.conf
都放到/root/soft/apache-flume-1.7.0/conf/job/路徑下面
zjj101
#a1是agent的名稱,a1中定義了一個叫r1的source,如果有多個,使用空格間隔
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# 配置sink組
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
# 配置sink的優先級
a1.sinkgroups.g1.processor.priority.k1=100
a1.sinkgroups.g1.processor.priority.k2=90
a1.sinkgroups.g1.processor.sinks=k1 k2
#組名名.屬性名=屬性值
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /root/soft/test.log
#聲明r1的channel選擇器
a1.sources.r1.selector.type = replicating
#定義chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
##定義sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=zjj102
a1.sinks.k1.port=33333
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=zjj103
a1.sinks.k2.port=33333
#連接配接元件 同一個source可以對接多個channel,一個sink隻能從一個channel拿資料!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1
zjj102
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#組名名.屬性名=屬性值
a1.sources.r1.type=avro
a1.sources.r1.bind=zjj102
a1.sources.r1.port=33333
#定義sink
a1.sinks.k1.type=logger
#定義chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
#連接配接元件 同一個source可以對接多個channel,一個sink隻能從一個channel拿資料!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
zjj103
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#組名名.屬性名=屬性值
a1.sources.r1.type=avro
a1.sources.r1.bind=zjj103
a1.sources.r1.port=33333
#定義sink
a1.sinks.k1.type=logger
#定義chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
#連接配接元件 同一個source可以對接多個channel,一個sink隻能從一個channel拿資料!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
啟動項目
啟動順序是 zjj101 必須要在 zjj102 zjj103 之後啟動,因為zjj102 和zjj103開放監聽了端口.
啟動zjj102
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console
啟動zjj103
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console
啟動zjj101
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console
測試
再開一個zjj101終端
往test.log裡面輸出内容
[root@zjj101 soft]# echo 2020121111111111111 >> test.log
[root@zjj101 soft]# pwd
/root/soft
此時觀察zjj102 和zjj103日志發現,隻有zjj102日志有輸出,說明接收到了資料,
20/10/26 12:09:44 INFO sink.LoggerSink: Event: { headers:{} body: 32 30 32 30 31 32 31 31 31 31 31 31 31 31 31 31 2020121111111111 }
我現在模拟發生故障,我給zjj102的agent關閉掉… 殺死33333端口的程式.