天天看點

Flume故障轉移Failover Sink Processor和demo *

Failover Sink Processor

Failover Sink Processor維護了一個多個sink的有優先級的清單,按照優先級保證,至少有一個sink是可以幹活的(處理event的)

如果根據優先級發現,優先級高的sink故障了,故障的sink會被轉移到一個故障的池中冷卻.

在冷卻時,故障的sink也會不管嘗試發送event,一旦發送成功,此時會将故障的sink再移動到存活的池中.

必需配置:

sinks – Space-separated list of sinks that are participating in the group

processor.type default The component type name, needs to be failover

processor.priority. – Priority value. must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority

案例

使用Flume1監控一個端口,其sink組中的sink分别對接Flume2和Flume3,采用FailoverSinkProcessor,實作故障轉移的功能。

Flume故障轉移Failover Sink Processor和demo *

案例是三個agent, 然後有一個channel ,兩個sink ,兩個sink都是從這一個channel裡面拿資料的,如果你有多個sink需要對接一個channel的話,那麼你就需要組成一個sink組.skin組挑哪個sink從channel裡面拿資料?就看sinkProcessor處理器了.

FailoverSinkProcessor是從兩個Sink裡面挑選出一個優先級最高的sink去幹活兒.假如說Sink1優先級最高,那麼就對接Flume2的agent, 而Flume3是不幹活兒的,因為skin2優先級低.

然後模拟一種特殊的情況,就是Flume2的agent挂掉了,那麼flume2的agent綁定的端口就停了,那麼skin1就發不出去了,那麼就相當于Sink1故障了.

這個時候FailoverSinkProcessor就将sink1拿出去,讓剩下的優先級最高的skin2去幹活兒. sink2對接的是flume3,那麼flume3 就可以幹活兒了.

開始編寫配置檔案

配置檔案名字都叫demo4.conf

都放到/root/soft/apache-flume-1.7.0/conf/job/路徑下面

zjj101

#a1是agent的名稱,a1中定義了一個叫r1的source,如果有多個,使用空格間隔
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# 配置sink組
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
# 配置sink的優先級
a1.sinkgroups.g1.processor.priority.k1=100
a1.sinkgroups.g1.processor.priority.k2=90
a1.sinkgroups.g1.processor.sinks=k1 k2

#組名名.屬性名=屬性值
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /root/soft/test.log 
#聲明r1的channel選擇器
a1.sources.r1.selector.type = replicating

#定義chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

##定義sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=zjj102
a1.sinks.k1.port=33333

a1.sinks.k2.type=avro
a1.sinks.k2.hostname=zjj103
a1.sinks.k2.port=33333

#連接配接元件 同一個source可以對接多個channel,一個sink隻能從一個channel拿資料!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1      

zjj102

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#組名名.屬性名=屬性值
a1.sources.r1.type=avro
a1.sources.r1.bind=zjj102
a1.sources.r1.port=33333


#定義sink
a1.sinks.k1.type=logger

#定義chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#連接配接元件 同一個source可以對接多個channel,一個sink隻能從一個channel拿資料!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1      

zjj103

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#組名名.屬性名=屬性值
a1.sources.r1.type=avro
a1.sources.r1.bind=zjj103
a1.sources.r1.port=33333


#定義sink
a1.sinks.k1.type=logger

#定義chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#連接配接元件 同一個source可以對接多個channel,一個sink隻能從一個channel拿資料!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1      

啟動項目

啟動順序是 zjj101 必須要在 zjj102 zjj103 之後啟動,因為zjj102 和zjj103開放監聽了端口.

啟動zjj102

flume-ng agent -n a1  -c conf/  -f  "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf"  -Dflume.root.logger=DEBUG,console      

啟動zjj103

flume-ng agent -n a1  -c conf/  -f  "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf"  -Dflume.root.logger=DEBUG,console      

啟動zjj101

flume-ng agent -n a1  -c conf/  -f  "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf"  -Dflume.root.logger=DEBUG,console      

測試

再開一個zjj101終端

往test.log裡面輸出内容

[root@zjj101 soft]# echo 2020121111111111111 >> test.log
[root@zjj101 soft]# pwd
/root/soft      

此時觀察zjj102 和zjj103日志發現,隻有zjj102日志有輸出,說明接收到了資料,

20/10/26 12:09:44 INFO sink.LoggerSink: Event: { headers:{} body: 32 30 32 30 31 32 31 31 31 31 31 31 31 31 31 31 2020121111111111 }      

我現在模拟發生故障,我給zjj102的agent關閉掉… 殺死33333端口的程式.