天天看点

Flume-NG启动过程源码分析(三)(原创)

  分析了flume如何加载配置文件的,动态加载也只是重复运行getconfiguration()。

  本篇分析加载配置文件后各个组件是如何运行的?

  加载完配置文件订阅者application类会收到订阅信息执行:

  materializedconfiguration

conf就是getconfiguration()方法获取的配置信息,是simplematerializedconfiguration的一个实例。

  handleconfigurationevent方法在中有过大致分析,包括:stopallcomponents()和startallcomponents(conf)。application中的materializedconfiguration就是materializedconfiguration

conf,stopallcomponents()方法中的materializedconfiguration是旧的配置信息,需要先停掉旧的组件,然后startallcomponents(conf)将新的配置信息赋给materializedconfiguration并依次启动各个组件。

  1、先看startallcomponents(conf)方法。代码如下:

  三大组件都是通过supervisor.supervise(entry.getvalue(),new

supervisorpolicy.alwaysrestartpolicy(),

lifecyclestate.start)启动的,其中,channel启动之后还要待所有的channel完全启动完毕之后才可再去启动sink和source。如果channel没有启动完毕就去启动另外俩组件,会出现错误,以为一旦sink或者source建立完毕就会立即与channel通信获取数据。稍后会分别分析sink和source的启动。

  supervisor是lifecyclesupervisor的一个对象,该类的构造方法会构造一个有10个线程,上限是20的线程池供各大组件使用。构造方法如下:

  supervise(lifecycleaware lifecycleaware,supervisorpolicy policy,

lifecyclestate desiredstate)方法则是具体执行启动各个组件的方法。flume的所有组件均实现自

Flume-NG启动过程源码分析(三)(原创)

  该方法首先monitorservice是否是正常运行状态;然后构造supervisoree process = new

supervisoree(),进行赋值并构造一个监控进程monitorrunnable,放入线程池去执行。

  monitorrunnable.run()方法:

   上面的 lifecycleaware.stop()和lifecycleaware.start()就是执行的sink、source、channel等的对应方法。

  这里的start需要注意如果是channel则是直接执行start方法;如果是sink或者pollablesource的实现类,则会在start()方法中启动一个线程来循环的调用process()方法来从channel拿数据(sink)或者向channel送数据(source);如果是eventdrivensource的实现类,则没有process()方法,通过执行start()来执行想channel中送数据的操作(可以在此添加线程来实现相应的逻辑)。

  2、stopallcomponents()方法。顾名思义,就是停止所有组件的方法。该方法代码如下:

  首先,需要注意的是,stopallcomponents()放在startallcomponents(materializedconfiguration

materializedconfiguration)方法之前的原因,由于配置文件的动态加载这一特性的存在,使得每次加载之前都要先把旧的组件停掉,然后才能去加载最新配置文件中的配置;

  其次,首次执行stopallcomponents()时,由于配置文件尚未赋值,所以并不会执行停止所有组件的操作以及停止monitorserver。再次加载时会依照顺序依次停止对source、sink以及channel的监控,通过supervisor.unsupervise(entry.getvalue())停止对其的监控,然后停止monitorserver。supervisor.unsupervise方法如下:

  该方法首先会检查正在运行的组件当中是否有此组件supervisedprocesses.containskey(lifecycleaware);如果存在,则对此组件标记为已取消监控supervisoree.status.discard

=

true;将状态设置为stop,并停止组件lifecycleaware.stop();然后从删除此组件的监控记录,包括从记录正在处于监控的组件的结构supervisedprocesses以及记录组件及其对应的运行线程的结构monitorfutures中删除相应的组件信息,并且needtopurge

= true会使得两小时执行一次的线程池清理操作。

  有一个问题就是,sink和source是如何找到对应的channel的呢??其实前面章节就已经讲解过,分别在abstractconfigurationprovider.loadsources方法中通过channelselector配置source对应的channel,而在source中通过getchannelprocessor()获取channels,通过channelprocessor.processeventbatch(eventlist)将events发送到channel中;而在abstractconfigurationprovider.loadsinks方法中sink.setchannel(channelcomponent.channel)来设置此sink对应的channel,然后在sink的实现类中通过getchannel()获取设置的channel,并使用channel.take()从channel中获取event进行处理。

  

  以上三节是flume-ng的启动、配置文件的加载、配置文件的动态加载、组件的执行的整个流程。文中的疏漏之处,请各位指教,我依然会后续继续完善这些内容的。

  后续还有更精彩的章节。。。。