1. Message Cursors
1.1 概述
ActiveMQ發送持久化消息的典型的厝裡方式是:當消息的消費者準備就緒時,消息發送系統把存儲的消息按批次發送給消費者,在發送完一個批次的消息後,指針的标記位置指向下一個批次的待發消息的位置,進行後續的發送操作。這是一種 比較健壯和靈活的消息發送方式,但是大多數的情況下,消息的消費者不一定一直都處于這種理想的活躍狀态。
是以,從ActiveMQ5.0.0版本開始,消息發送系統采用一種混合型的發送模式,當消息消費者處于活躍狀态時,允許消息發送系統直接把持久化消息發送給消費者,當消費者處于不活躍狀态下時,切換使用Cursors來處理消息的發送。
當消息的消費者處于活躍狀态并且處理能力較強時,被持久化存儲的消息直接發送到與消費者相關聯的發送隊列,如下:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISPrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdsATOfd3bkFGazxCMx8VesATMfhHLlN3XnxCMwEzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5yY2QTMklDNzUDNhdTMwYmY5MGNiZWYyUmNlVGNhdTOi9CX0AzLchDMxIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjL5M3Lc9CX6MHc0RHaiojIsJye.png)
當消息已經出現積壓,消費者再開始活躍,或者是消費者的消費速度比消息的發送速度慢時,消息将從Pending Cursor中提取,并發送與消費者關聯的發送隊列。見下圖:
1.) Store-base
從ActiveMQ5.0 開始,預設使用此種類型的cursor,其能夠滿足大多數場景的使用要求。同時支援非持久消息的處理,Store-based内嵌了File-based的模式,非持久消息直接被Non-persistent pending Cursor所處理。工作模式見下圖:
2). VM
相關消息引用存儲在記憶體中,當滿足條件時,消息直接被發送到消費者與值相關的發送隊列,處理速度非常快,但出現慢消費或者消費者長時間處于不活躍狀态的情況下,無法适應。工作模式見下圖:
3) .File-based
當記憶體設定達到設定的限制,消息被存儲到磁盤中的臨時檔案中。工作模式見下圖:
4). 配置使用
在預設的情況下,ActiveMQ會根據使用的Message Store來決定使用何種類型的Message Cursors,但是可以根據destination來配置Message Cursors,例如:
1. 對Topic subscripbers
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="org.apache" producerFlowControl="false" memoryLimit="1mb">
<dispatchPolicy>
<<strictOrderDispatchPolicy/>
</dispatchPolicy>
<deadLetterStrategy>
<individualDealLetterStrategy topicPrefix="Test.DLQ"/>
</deadLetterStrategy>
<pendingSubscriberPolicy>
<vmCursor/>
</pendingSubscriberPolicy>
<pendingDurableSubscriberPolicy>
<vmDurableCursor/>
</pendingDurableSubscriberPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
配置說明:
有效的Subscriber類型是vmCursor和fileCursor,預設是store based cursor。有效的持久化subscribe的cursor types是storeDurableSubscriberCursor,vmDurableCursor和fileDurableSubscriberCursor,預設是store based cursor.
2. 對于queue的配置
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="org.apache.>" >
<deadLetterStrategy>
<individualDealLetterStrategy topicPrefix="Test.DLQ"/>
</deadLetterStrategy>
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
2. Async Sends
ActiveMQ支援異步和同步發送消息,是可以配置,通常對于快的消費者,是直接把消息同步發送過去,但是對于一個慢消費者,你使用同步發送消息可能出現producer堵塞等現象,慢消費者适合異步發送。
配置使用:
1. ActiveMQ預設設定dispatchAsync=true是最好的性能設定。如果你處理Fast Consumer則使用dispatchAsync=false
2. 在Connection URI級别來配置使用Async Send
factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616?jms.useAsyncSend=true");
3. 在ConnectionFactory級别來配置使用Async Send
factory.setUseAsyncSend(true);(此方法是存在于ActiveMQConnectionFactory中的)
4. Connection級别來配置使用Async Send
connection.setUseAsyncSend(true); (此方法存在于ActiveMQConnection中)
3. Dispatch Policies (消息分發政策)
3.1 嚴格順序分發政策(Strict Order Dispatch Policy)
通常ActiveMQ會保證topic consumer以相同的順序接收來意同一個producer的消息,并且有時候也需要保證不同的topic consumer以相同的資料接收消息,但是,由于多線程和異步處理,不同的topic consumer可能會以不同的順序接收來自不同producer的消息。
Strict order dispatch policy 會保證每個topic consumer會以相同的順序接收消息,代價就是性能上的損失。以下是一個配置例子:
<policyEntry topic="ORDERS.>">
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
</policyEntry>
對于Queue的配置為:
<policyEntry queue=">" stricOrderDispatch="false"/>
3.2 輪詢的分發政策(Round Robin Dispatch Policy)
ActiveMQ的prefetch預設參數,是針對處理大量消息時的高性能和高吞吐量而設定的,是以預設的prefetch參數比較大,而且預設的dispatche policies會嘗試盡可能快的填滿緩沖。
然而有些情況下,例如隻有少量的消息而且單個消息的處理時間比較長,那麼在預設的prefetch和dispatch policies下,這些少量的消息總是傾向于被分發到個邊的consumer上,這樣就會因為負載的不均衡而導緻處理時間的增加。
<policyEntry topic="FOO.>">
<dispatchPolicy>
<roundRobinDispatchPolicy/>
</dispatchPolicy>
</policyEntry>
侵删 聯系[email protected]