天天看点

ActiveMQ队列消息积压问题调研摘要问题方案一:ActiveMQ自带配置方案二:自定义ActiveMQ插件方案三:调整报警脚本小结

公司运维同事针对ActiveMQ提出了两个问题,其中一个是“队列长时间无人监听时,自动删除该队列”。

调研提出了三种方案。这里是相关记录和说明。

运维同事对生产环境使用的ActiveMQ做了相关监控。这个监控在某个队列出现消息积压时(实际规则更复杂一些,并且正在调整)发送短信报警。运维接到短信后会通知开发负责人。开发负责人再检查系统是否在正常监听相关队列。 

但是,从过往经验来看,只有一次消息积压是业务系统故障导致的;其它情况(没有统计到具体数据,大约五六次)都是业务系统已经不再监听该队列导致的。这使得我们的运维、开发同事半夜三更火急火燎检查问题,结果发现只需要删除那个队列就可以了。 

尤其惹发起床气的是,由于线上ActiveMQ配置了消息持久化,这种消息积压其实并不会对ActiveMQ产生多大的影响,完全可以在第二天上班后再处理。

考虑到大家的睡眠质量和夫妻感情,在JIRA中,我们调研、讨论了三个方案。

在ActiveMQ官方提供的功能列表中,有这样一项功能:Delete Inactive Destination。它可以删除“没有未处理消息、并且没有消费者的Destination”。

这个配置比较简单,在ActiveMQ的配置文件activemq.xml中,做如下改动即可。这里示例的是对queue的配置;topic配置是类似的。

上述示例配置的含义是:这个Broker会每隔10000ms(由schedulePeriodForDestinationPurge配置指定)扫描一次标记有“gcInactiveDestinations="true"”的Queue(由于这里配置的是queue=">",因而实际是扫描所有Queue),将其中“没有未处理消息、并且没有消费者、并且此状态已超过30000ms(由inactiveTimoutBeforeGC配置指定)”的队列删除掉。有点晕。各配置项的具体说明如下。

以下三个配置项中,schedulePeriodForDestinationPurge和gcInactiveDestinations是必填配置;inactiveTimoutBeforeGC是选填配置。

这是针对Broker的配置,用于声明“扫描闲置队列的周期”,单位为毫秒。默认值为0,意为“不扫描”。 

需要说明的是,这里只能配置扫描任务的启动周期、不能配置启动延迟。也就是说,配置好了之后,ActiveMQ服务启动时会立即扫描一次;然后再按照指定时间周期性扫描。

这是针对Destination的配置,用于声明当Broker扫描闲置队列时,是否扫描这个Destination(由queue="xxxx"来指定)。默认值是false。

这也是针对Destination的配置,用于声明这个Destination闲置多长时间后可以被删除。单位毫秒,默认时间60s。 

这个配置必须在gcInactiveDestinations被设置为true的情况下才会生效。

虽然上面介绍了这么多,但实际上,从第一句话中就可以看出这个方案无法解决我们的问题。因为我们的问题是要处理“有消息积压、但没有消费者的Destination”,而这个方案只能删除“没有未处理消息、并且没有消费者的Destination”。 

除此之外,这应该算是最简单可靠的一种方案了。实际上,对大多数原生Queue来说,业务系统会同时下线其生产者与消费者。这个方案可以很好的应对这种情况。

ActiveMQ插件(plugin),也有文档中称为拦截器(Interceptor)。二者其实是相辅相成的:配置时,我们需要一个插件;执行时,我们需要一个拦截器。 

ActiveMQ官方提供了几个插件(日志、统计、时间戳等),可以参见官方说明和开发文档。我们可以参考官方示例来自定义一个插件。

ActiveMQ通过解析activemq.xml中的配置,来加载一个插件的。因此我们从配置入手,逐步搞清楚插件和拦截器是如何工作的。 

activemq.xml中的配置其实很简单,如下所示:

上述配置声明了一个插件,插件类名是net.loyintean.blog.jms.manage.PlugIn,id是linjunPlugin。 

这个类必须包含在ActiveMQ的classpath路径下。我们可以自己打一个jar包,并把jar包放到ActiveMQ的lib路径下;也可以修改相关类路径。总之要保证ActiveMQ能够加载到这个类(及其依赖类)。 

其实按照上面的配置,并不需要为插件配置一个id。不过,插件声明还有其它方式,有些是需要使用id的。这里不多说,可以参考开发文档。 

如配置中的注释所说,声明插件所使用的<bean />标签及语法来自spring。也就是说,spring中的<property />等其它标签,这里也是支持的。不过目前还没有找到对@Autowired等注解的支持方式。

由于我使用的是spring boot,只需要加上一个spring-boot-starter-activemq就可以引入所需依赖jar包了。不使用spring boot的话,需要引入activemq-broker-x.y.z.jar。 

根据ActiveMQ规范,插件必须实现BrokerPlugin接口。这个接口只有一个方法:Broker installPlugin(Broker broker) throws Exception,用于在服务启动、加载插件时,获取当前启动的borker实例,并返回一个Broker实例。 

例如,上文中声明的linjunPlugin代码如下:

似乎有些莫名其妙,但从“装饰者”的角度来理解就轻松愉快了:入参broker是原生实例(当然也可能是其它插件“装饰”过的);出参则是被我们自己的插件“装饰”过的、增强版的实例。 

一般来说,启动过程不会做太多处理;处理逻辑在我们的“装饰者”中——如上面代码里的RemoveDestination。

如上文所说,我们需要提供的是一个“装饰”过的Broker。但是Broker是一个接口,其中有超过50个方法,用于处理Broker在服务期间的各种事件(如服务启动、创建链接、消息收发、事务提交与回滚等等)。直接实现接口未免太丑陋了。ActiveMQ也考虑到了这一点,因此给我们提供了一个适配器(其实同时也是一个装饰者):BrokerFilter。它的代码如下:

借助这个适配器,我们可以专注的处理我们关注的事件。如我们的RemoveDestination,它只需要在服务启动时注册一个定时器,按需删除无人监听的队列即可。代码如下:

除了BrokerFilter这个针对Broker事件做拦截、装饰的类之外,也有针对Destination的DestinationFilter,不赘述。

无论是BrokerFilter还是DestinationFilter,在重写父类的某个方法时,要注意调用super中的对应方法。如RemoveDestination类在覆盖start()方法时,调用了super.start()方法。 

这两个类中的每一个方法,都对应Broker或Destination的一个事件的“处理栈”。如果不调用父类方法,很可能会导致一些基础的、或关键的代码没有执行到,进而出现异常。因此,如果不是非常确定“执行到这里时必须中断当前事件”,否则一定要调用super相应方法。

上面的代码是示例用,还可以进一步完善。但是这个方案是可以满足需求的。

不过,这个方案存在一项风险:当我们删除一个Destination时,其中所有未消费的消息也会随之被删除,即使这些消息已经做了持久化。如果有某个业务系统长时间出现故障、无法连上ActiveMQ,而ActiveMQ在此期间删除了它监听的Destination及其中消息……这个风险概率虽然小,但是影响太大。慎重起见,放弃方案二。

方案三属于运维的范畴。如JIRA中所讨论的,这个问题真正的“痛点”,并不是废弃队列,而是非紧急情况却在半夜报警。因此,由运维同事修改一下脚本,调整“没有消费者”这种问题的监控报警时间就可以了。

最后选定的是方案三。方案一不能满足需求;方案二的风险较大。方案三直击痛点,干脆利落。

这件事也启示我们:做事情之前先想清楚目标,谋定而后动。

本文转自 斯然在天边 51CTO博客,原文链接:http://blog.51cto.com/winters1224/2049432,如需转载请自行联系原作者