公司運維同僚針對ActiveMQ提出了兩個問題,其中一個是“隊列長時間無人監聽時,自動删除該隊列”。
調研提出了三種方案。這裡是相關記錄和說明。
運維同僚對生産環境使用的ActiveMQ做了相關監控。這個監控在某個隊列出現消息積壓時(實際規則更複雜一些,并且正在調整)發送短信報警。運維接到短信後會通知開發負責人。開發負責人再檢查系統是否在正常監聽相關隊列。
但是,從過往經驗來看,隻有一次消息積壓是業務系統故障導緻的;其它情況(沒有統計到具體資料,大約五六次)都是業務系統已經不再監聽該隊列導緻的。這使得我們的運維、開發同僚半夜三更火急火燎檢查問題,結果發現隻需要删除那個隊列就可以了。
尤其惹發起床氣的是,由于線上ActiveMQ配置了消息持久化,這種消息積壓其實并不會對ActiveMQ産生多大的影響,完全可以在第二天上班後再處理。
考慮到大家的睡眠品質和夫妻感情,在JIRA中,我們調研、讨論了三個方案。
在ActiveMQ官方提供的功能清單中,有這樣一項功能:Delete Inactive Destination。它可以删除“沒有未處理消息、并且沒有消費者的Destination”。
這個配置比較簡單,在ActiveMQ的配置檔案activemq.xml中,做如下改動即可。這裡示例的是對queue的配置;topic配置是類似的。
上述示例配置的含義是:這個Broker會每隔10000ms(由schedulePeriodForDestinationPurge配置指定)掃描一次标記有“gcInactiveDestinations="true"”的Queue(由于這裡配置的是queue=">",因而實際是掃描所有Queue),将其中“沒有未處理消息、并且沒有消費者、并且此狀态已超過30000ms(由inactiveTimoutBeforeGC配置指定)”的隊列删除掉。有點暈。各配置項的具體說明如下。
以下三個配置項中,schedulePeriodForDestinationPurge和gcInactiveDestinations是必填配置;inactiveTimoutBeforeGC是選填配置。
這是針對Broker的配置,用于聲明“掃描閑置隊列的周期”,機關為毫秒。預設值為0,意為“不掃描”。
需要說明的是,這裡隻能配置掃描任務的啟動周期、不能配置啟動延遲。也就是說,配置好了之後,ActiveMQ服務啟動時會立即掃描一次;然後再按照指定時間周期性掃描。
這是針對Destination的配置,用于聲明當Broker掃描閑置隊列時,是否掃描這個Destination(由queue="xxxx"來指定)。預設值是false。
這也是針對Destination的配置,用于聲明這個Destination閑置多長時間後可以被删除。機關毫秒,預設時間60s。
這個配置必須在gcInactiveDestinations被設定為true的情況下才會生效。
雖然上面介紹了這麼多,但實際上,從第一句話中就可以看出這個方案無法解決我們的問題。因為我們的問題是要處理“有消息積壓、但沒有消費者的Destination”,而這個方案隻能删除“沒有未處理消息、并且沒有消費者的Destination”。
除此之外,這應該算是最簡單可靠的一種方案了。實際上,對大多數原生Queue來說,業務系統會同時下線其生産者與消費者。這個方案可以很好的應對這種情況。
ActiveMQ插件(plugin),也有文檔中稱為攔截器(Interceptor)。二者其實是相輔相成的:配置時,我們需要一個插件;執行時,我們需要一個攔截器。
ActiveMQ官方提供了幾個插件(日志、統計、時間戳等),可以參見官方說明和開發文檔。我們可以參考官方示例來自定義一個插件。
ActiveMQ通過解析activemq.xml中的配置,來加載一個插件的。是以我們從配置入手,逐漸搞清楚插件和攔截器是如何工作的。
activemq.xml中的配置其實很簡單,如下所示:
上述配置聲明了一個插件,插件類名是net.loyintean.blog.jms.manage.PlugIn,id是linjunPlugin。
這個類必須包含在ActiveMQ的classpath路徑下。我們可以自己打一個jar包,并把jar包放到ActiveMQ的lib路徑下;也可以修改相關類路徑。總之要保證ActiveMQ能夠加載到這個類(及其依賴類)。
其實按照上面的配置,并不需要為插件配置一個id。不過,插件聲明還有其它方式,有些是需要使用id的。這裡不多說,可以參考開發文檔。
如配置中的注釋所說,聲明插件所使用的<bean />标簽及文法來自spring。也就是說,spring中的<property />等其它标簽,這裡也是支援的。不過目前還沒有找到對@Autowired等注解的支援方式。
由于我使用的是spring boot,隻需要加上一個spring-boot-starter-activemq就可以引入所需依賴jar包了。不使用spring boot的話,需要引入activemq-broker-x.y.z.jar。
根據ActiveMQ規範,插件必須實作BrokerPlugin接口。這個接口隻有一個方法:Broker installPlugin(Broker broker) throws Exception,用于在服務啟動、加載插件時,擷取目前啟動的borker執行個體,并傳回一個Broker執行個體。
例如,上文中聲明的linjunPlugin代碼如下:
似乎有些莫名其妙,但從“裝飾者”的角度來了解就輕松愉快了:入參broker是原生執行個體(當然也可能是其它插件“裝飾”過的);出參則是被我們自己的插件“裝飾”過的、增強版的執行個體。
一般來說,啟動過程不會做太多處理;處理邏輯在我們的“裝飾者”中——如上面代碼裡的RemoveDestination。
如上文所說,我們需要提供的是一個“裝飾”過的Broker。但是Broker是一個接口,其中有超過50個方法,用于處理Broker在服務期間的各種事件(如服務啟動、建立連結、消息收發、事務送出與復原等等)。直接實作接口未免太醜陋了。ActiveMQ也考慮到了這一點,是以給我們提供了一個擴充卡(其實同時也是一個裝飾者):BrokerFilter。它的代碼如下:
借助這個擴充卡,我們可以專注的處理我們關注的事件。如我們的RemoveDestination,它隻需要在服務啟動時注冊一個定時器,按需删除無人監聽的隊列即可。代碼如下:
除了BrokerFilter這個針對Broker事件做攔截、裝飾的類之外,也有針對Destination的DestinationFilter,不贅述。
無論是BrokerFilter還是DestinationFilter,在重寫父類的某個方法時,要注意調用super中的對應方法。如RemoveDestination類在覆寫start()方法時,調用了super.start()方法。
這兩個類中的每一個方法,都對應Broker或Destination的一個事件的“處理棧”。如果不調用父類方法,很可能會導緻一些基礎的、或關鍵的代碼沒有執行到,進而出現異常。是以,如果不是非常确定“執行到這裡時必須中斷目前事件”,否則一定要調用super相應方法。
上面的代碼是示例用,還可以進一步完善。但是這個方案是可以滿足需求的。
不過,這個方案存在一項風險:當我們删除一個Destination時,其中所有未消費的消息也會随之被删除,即使這些消息已經做了持久化。如果有某個業務系統長時間出現故障、無法連上ActiveMQ,而ActiveMQ在此期間删除了它監聽的Destination及其中消息……這個風險機率雖然小,但是影響太大。慎重起見,放棄方案二。
方案三屬于運維的範疇。如JIRA中所讨論的,這個問題真正的“痛點”,并不是廢棄隊列,而是非緊急情況卻在半夜報警。是以,由運維同僚修改一下腳本,調整“沒有消費者”這種問題的監控報警時間就可以了。
最後標明的是方案三。方案一不能滿足需求;方案二的風險較大。方案三直擊痛點,幹脆利落。
這件事也啟示我們:做事情之前先想清楚目标,謀定而後動。
本文轉自 斯然在天邊 51CTO部落格,原文連結:http://blog.51cto.com/winters1224/2049432,如需轉載請自行聯系原作者