分析了flume如何加載配置檔案的,動态加載也隻是重複運作getconfiguration()。
本篇分析加載配置檔案後各個元件是如何運作的?
加載完配置檔案訂閱者application類會收到訂閱資訊執行:
materializedconfiguration
conf就是getconfiguration()方法擷取的配置資訊,是simplematerializedconfiguration的一個執行個體。
handleconfigurationevent方法在中有過大緻分析,包括:stopallcomponents()和startallcomponents(conf)。application中的materializedconfiguration就是materializedconfiguration
conf,stopallcomponents()方法中的materializedconfiguration是舊的配置資訊,需要先停掉舊的元件,然後startallcomponents(conf)将新的配置資訊賦給materializedconfiguration并依次啟動各個元件。
1、先看startallcomponents(conf)方法。代碼如下:
三大元件都是通過supervisor.supervise(entry.getvalue(),new
supervisorpolicy.alwaysrestartpolicy(),
lifecyclestate.start)啟動的,其中,channel啟動之後還要待所有的channel完全啟動完畢之後才可再去啟動sink和source。如果channel沒有啟動完畢就去啟動另外倆元件,會出現錯誤,以為一旦sink或者source建立完畢就會立即與channel通信擷取資料。稍後會分别分析sink和source的啟動。
supervisor是lifecyclesupervisor的一個對象,該類的構造方法會構造一個有10個線程,上限是20的線程池供各大元件使用。構造方法如下:
supervise(lifecycleaware lifecycleaware,supervisorpolicy policy,
lifecyclestate desiredstate)方法則是具體執行啟動各個元件的方法。flume的所有元件均實作自

該方法首先monitorservice是否是正常運作狀态;然後構造supervisoree process = new
supervisoree(),進行指派并構造一個監控程序monitorrunnable,放入線程池去執行。
monitorrunnable.run()方法:
上面的 lifecycleaware.stop()和lifecycleaware.start()就是執行的sink、source、channel等的對應方法。
這裡的start需要注意如果是channel則是直接執行start方法;如果是sink或者pollablesource的實作類,則會在start()方法中啟動一個線程來循環的調用process()方法來從channel拿資料(sink)或者向channel送資料(source);如果是eventdrivensource的實作類,則沒有process()方法,通過執行start()來執行想channel中送資料的操作(可以在此添加線程來實作相應的邏輯)。
2、stopallcomponents()方法。顧名思義,就是停止所有元件的方法。該方法代碼如下:
首先,需要注意的是,stopallcomponents()放在startallcomponents(materializedconfiguration
materializedconfiguration)方法之前的原因,由于配置檔案的動态加載這一特性的存在,使得每次加載之前都要先把舊的元件停掉,然後才能去加載最新配置檔案中的配置;
其次,首次執行stopallcomponents()時,由于配置檔案尚未指派,是以并不會執行停止所有元件的操作以及停止monitorserver。再次加載時會依照順序依次停止對source、sink以及channel的監控,通過supervisor.unsupervise(entry.getvalue())停止對其的監控,然後停止monitorserver。supervisor.unsupervise方法如下:
該方法首先會檢查正在運作的元件當中是否有此元件supervisedprocesses.containskey(lifecycleaware);如果存在,則對此元件标記為已取消監控supervisoree.status.discard
=
true;将狀态設定為stop,并停止元件lifecycleaware.stop();然後從删除此元件的監控記錄,包括從記錄正在處于監控的元件的結構supervisedprocesses以及記錄元件及其對應的運作線程的結構monitorfutures中删除相應的元件資訊,并且needtopurge
= true會使得兩小時執行一次的線程池清理操作。
有一個問題就是,sink和source是如何找到對應的channel的呢??其實前面章節就已經講解過,分别在abstractconfigurationprovider.loadsources方法中通過channelselector配置source對應的channel,而在source中通過getchannelprocessor()擷取channels,通過channelprocessor.processeventbatch(eventlist)将events發送到channel中;而在abstractconfigurationprovider.loadsinks方法中sink.setchannel(channelcomponent.channel)來設定此sink對應的channel,然後在sink的實作類中通過getchannel()擷取設定的channel,并使用channel.take()從channel中擷取event進行處理。
以上三節是flume-ng的啟動、配置檔案的加載、配置檔案的動态加載、元件的執行的整個流程。文中的疏漏之處,請各位指教,我依然會後續繼續完善這些内容的。
後續還有更精彩的章節。。。。