ActiveMQ提供了虛拟通道的特性(Virtual Destination),它允許一個邏輯通道(logical destination)映射成一個或者多個實體通道(physical destination);它可以非常靈活的解決"消息整合"方面的問題,它可以實作:
1) 提供了VirtualTopic特性,可以讓一個訂閱者的消息清單,作為Queue來消費。
2) 提供了Composite特性,可以把一個邏輯通道中的消息,轉發到任意多的實體通道中。
一. VirtualTopic
Topic最大的限制就是同一個ClientId的訂閱者,任何時刻隻能有一個活躍。是以我們在分布式部署時,就會很麻煩,比如一個應用部署成多個執行個體,且它們都有相同的Topic Consumer配置,那麼意味着
一個執行個體部署成功後,其它的執行個體都會因為無法訂閱Topic而導緻故障;同時也意味着,如果這個Topic
Consumer失效後,我們不能自動讓其他Consumer的接管它。但是Queue卻沒有這些限制,因為Queue可以同時有任意多個消費者,它們可以并發的消費消息,進而實作“負載均衡”。如果我們期望Topic也能如此,那麼可以用VirtualTopic。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5ichR3cf52bjl2LcNXZnFWbp9CXt92YuUWelRXauwmc0NWL0xWYtQnZph2cvw1LcpDc0RHaiojIsJye.png)
<broker xmlns="http://activemq.apache.org/schema/core">
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="VConsumers.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>
對于所有的VirtualTopic,它們的namespace一定是"VirtualTopic.",Broker将會根據此namespace來判定邏輯通道是否為VirtualTopic,反過來說,如果你希望一個邏輯通道是一個VirtualTopic,那麼它必須以“VirtualTopic.”作為字首。比如“VirtualTopic.order”,那麼它就是一個VirtualTopic,它的實體通道名稱為"order"。
對于Producer端,需要按照正常的Topic發送消息,通道名稱為邏輯通道全名:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5ichR3cf52bjl2LcNXZnFWbp9CXt92YuUWelRXauwmc0NWL0xWYtQnZph2cvw1LcpDc0RHaiojIsJye.png)
Topic topic = session.createTopic("VirtualTopic.order");
MessageProducer producer = session.createProducer(topic);
//producer.send(message);
VirtualTopic對Consumer而言,是一個邏輯的Queue,Queue的全名有上述配置檔案中的“prefix” + Client辨別 + 邏輯虛拟通道;假如“Client辨別”為"dbcenter"【相當于ClientId】,用來訂閱Order消息,那麼最終邏輯Queue的全名為:"VConsumers.dbcenter.VirtualTopic.order",其中需要注意prefix中的"*"占位符就是用來替換“Client辨別”的;Broker端預設的prefix為"Consumer.*."。當然我們還可以postfix【字尾】,不過通常沒有必要。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5ichR3cf52bjl2LcNXZnFWbp9CXt92YuUWelRXauwmc0NWL0xWYtQnZph2cvw1LcpDc0RHaiojIsJye.png)
Queue queue = session.createQueue("VConsumers.dbcenter.VirtualTopic.order");
MessageConsumer consumer = session.createConsumer(queue);
對于Order通道而言,“dbcenter”相當于是一個訂閱者;Broker将dbcenter訂閱的消息轉發到了“VConsumers.dbcenter.VirtualTopic.order”隊列中;最重要的一點,這個虛拟Queue完全具有隊列的所有特性,它的Consumer可以并行消費。
其中還有一個重要的參數"selectorAware",它表示從Topic中将消息轉發給Queue時,是否關注Consumer的selector情況。如果為false,那麼Topic中的消息全部轉發給Queue,否則隻會轉發比對Queue
Consumer的selector的消息。需要非常注意,當selectorAware為true時,如果消息不比對任何selector或者Queue中沒有任何Consumer活躍,那麼消息将不會轉發給Queue。
VirtualTopic仍然可以被正常的訂閱者消費,即:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5ichR3cf52bjl2LcNXZnFWbp9CXt92YuUWelRXauwmc0NWL0xWYtQnZph2cvw1LcpDc0RHaiojIsJye.png)
TopicSubscriber subscriber = session.createDurableSubscriber(topic,"dbcenter");
同時需要注意,VirtualTopic隻會轉發“Client辨別”注冊之後的消息,且即使Queue消費了消息,VirtualTopic中的消息仍然不會被删除(看起來仍然是Dequeued=0),對于Broker而言,邏輯Queue不被認為是一個“Durable
Subscriber”,隻有真正的Subscriber消費消息後,Topic中的消息才會Dequeue。不過消息Dequeue後,不會影響Queue中的消息,因為這是基于Copy的。
到目前為止,我尚不清楚,如果VirtualTopic中沒有真正的Subscriber,這些消息該如何Dequeue。
當真正的subscriber和Queue都同時存在VirtualTopic中的時候,而且你的broker架構采用了“forward-brige”結構,那麼你需要增加如下配置來避免消息的重複轉發問題。在forward-brige架構中,任何通道中的消息都會forward到其他network
node中(其他broker上),當然這個虛拟的Queue的消息也不例外。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5ichR3cf52bjl2LcNXZnFWbp9CXt92YuUWelRXauwmc0NWL0xWYtQnZph2cvw1LcpDc0RHaiojIsJye.png)
<networkConnectors>
<networkConnector uri="static://(tcp://localhost:61617)">
<excludedDestinations>
<!-- prefix和VirtualTopic保持一緻
<queue physicalName="VConsumer.*.VirtualTopic.>"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
二. Composite Destinations
複合通道,它允許一條消息在多個實體通道間轉發,就像一個通道映射成多個一樣(one-many);比如複合通道A,映射成B和C,那麼發往A的消息會同時轉發給B和C,那麼消費者可以直接通過B或者C擷取消息;
這是一種實作消息複制轉發、通道映射的便捷辦法。複合通道包括CompositeQueue和CompositeTopic兩種。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5ichR3cf52bjl2LcNXZnFWbp9CXt92YuUWelRXauwmc0NWL0xWYtQnZph2cvw1LcpDc0RHaiojIsJye.png)
<broker persistent="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core">
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="order">
<forwardTo>
<queue physicalName="order.dbcenter" />
<topic physicalName="order.statistic" />
</forwardTo>
</compositeQueue>
<!--
<compositeTopic name="order" forwardOnly="false">
</compositeTopic>
-->
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>
上述配置forwardOnly屬性表示發往CompositeQueue中的消息是否“僅僅轉發,而不本地保留”,如果forwardOnly為true,那麼消息将不會在order隊列中保留,即order隊列中不會有任何消息。如果為false,那麼消息将會轉發完成後,添加到order中,消費者仍然可以消費order隊列中的消息。無論是Compsite通道還是轉發的通道,它們和普通的通道沒有任何差別,開發者仍然可以像使用普通的通道一樣使用它們(消費消息和發送消息)。
很多時候,我們希望在轉發消息時,能夠使用selector,此時我們可以使用filteredDestination,這樣我們可以消息轉發時控制消息。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5ichR3cf52bjl2LcNXZnFWbp9CXt92YuUWelRXauwmc0NWL0xWYtQnZph2cvw1LcpDc0RHaiojIsJye.png)
<compositeQueue name="MY.QUEUE">
<forwardTo>
<filteredDestination selector="orderType = 1" queue="food.order"/>
<filteredDestination selector="status = 1" topic="order.statistic"/>
</forwardTo>
</compositeQueue>
原文連結:[http://wely.iteye.com/blog/2328781]