天天看點

Flume-NG啟動過程源碼分析(三)(原創)

  分析了flume如何加載配置檔案的,動态加載也隻是重複運作getconfiguration()。

  本篇分析加載配置檔案後各個元件是如何運作的?

  加載完配置檔案訂閱者application類會收到訂閱資訊執行:

  materializedconfiguration

conf就是getconfiguration()方法擷取的配置資訊,是simplematerializedconfiguration的一個執行個體。

  handleconfigurationevent方法在中有過大緻分析,包括:stopallcomponents()和startallcomponents(conf)。application中的materializedconfiguration就是materializedconfiguration

conf,stopallcomponents()方法中的materializedconfiguration是舊的配置資訊,需要先停掉舊的元件,然後startallcomponents(conf)将新的配置資訊賦給materializedconfiguration并依次啟動各個元件。

  1、先看startallcomponents(conf)方法。代碼如下:

  三大元件都是通過supervisor.supervise(entry.getvalue(),new

supervisorpolicy.alwaysrestartpolicy(),

lifecyclestate.start)啟動的,其中,channel啟動之後還要待所有的channel完全啟動完畢之後才可再去啟動sink和source。如果channel沒有啟動完畢就去啟動另外倆元件,會出現錯誤,以為一旦sink或者source建立完畢就會立即與channel通信擷取資料。稍後會分别分析sink和source的啟動。

  supervisor是lifecyclesupervisor的一個對象,該類的構造方法會構造一個有10個線程,上限是20的線程池供各大元件使用。構造方法如下:

  supervise(lifecycleaware lifecycleaware,supervisorpolicy policy,

lifecyclestate desiredstate)方法則是具體執行啟動各個元件的方法。flume的所有元件均實作自

Flume-NG啟動過程源碼分析(三)(原創)

  該方法首先monitorservice是否是正常運作狀态;然後構造supervisoree process = new

supervisoree(),進行指派并構造一個監控程序monitorrunnable,放入線程池去執行。

  monitorrunnable.run()方法:

   上面的 lifecycleaware.stop()和lifecycleaware.start()就是執行的sink、source、channel等的對應方法。

  這裡的start需要注意如果是channel則是直接執行start方法;如果是sink或者pollablesource的實作類,則會在start()方法中啟動一個線程來循環的調用process()方法來從channel拿資料(sink)或者向channel送資料(source);如果是eventdrivensource的實作類,則沒有process()方法,通過執行start()來執行想channel中送資料的操作(可以在此添加線程來實作相應的邏輯)。

  2、stopallcomponents()方法。顧名思義,就是停止所有元件的方法。該方法代碼如下:

  首先,需要注意的是,stopallcomponents()放在startallcomponents(materializedconfiguration

materializedconfiguration)方法之前的原因,由于配置檔案的動态加載這一特性的存在,使得每次加載之前都要先把舊的元件停掉,然後才能去加載最新配置檔案中的配置;

  其次,首次執行stopallcomponents()時,由于配置檔案尚未指派,是以并不會執行停止所有元件的操作以及停止monitorserver。再次加載時會依照順序依次停止對source、sink以及channel的監控,通過supervisor.unsupervise(entry.getvalue())停止對其的監控,然後停止monitorserver。supervisor.unsupervise方法如下:

  該方法首先會檢查正在運作的元件當中是否有此元件supervisedprocesses.containskey(lifecycleaware);如果存在,則對此元件标記為已取消監控supervisoree.status.discard

=

true;将狀态設定為stop,并停止元件lifecycleaware.stop();然後從删除此元件的監控記錄,包括從記錄正在處于監控的元件的結構supervisedprocesses以及記錄元件及其對應的運作線程的結構monitorfutures中删除相應的元件資訊,并且needtopurge

= true會使得兩小時執行一次的線程池清理操作。

  有一個問題就是,sink和source是如何找到對應的channel的呢??其實前面章節就已經講解過,分别在abstractconfigurationprovider.loadsources方法中通過channelselector配置source對應的channel,而在source中通過getchannelprocessor()擷取channels,通過channelprocessor.processeventbatch(eventlist)将events發送到channel中;而在abstractconfigurationprovider.loadsinks方法中sink.setchannel(channelcomponent.channel)來設定此sink對應的channel,然後在sink的實作類中通過getchannel()擷取設定的channel,并使用channel.take()從channel中擷取event進行處理。

  

  以上三節是flume-ng的啟動、配置檔案的加載、配置檔案的動态加載、元件的執行的整個流程。文中的疏漏之處,請各位指教,我依然會後續繼續完善這些内容的。

  後續還有更精彩的章節。。。。