天天看点

Spring集成ActiveMQ

准备工作

从下面官网下载ActiveMQ,本文使用5.7版本,写博时Maven库能找到的最高版本为5.7

<a href="http://activemq.apache.org/download-archives.html" target="_blank">http://activemq.apache.org/download-archives.html</a>

解压缩下载的文件到你希望安装的目录

进入bin目录运行activemq.bat以启动ActiveMQ服务,启动后默认

broker URL : tcp://localhost:61616

Admin URL:http://localhost:8161/admin

打开Admin URL可以查看Queue、Topic等信息

关于ActiveMQ相关依赖包就交给Gradle工具,请参看demo配置

JMS框架基本角色和编程模块

JMS Message Producers : 消息生产者,向消息队列提供消息

JMS Provider(Broker) : JMS提供商,如ActiveMQ,提供JMS Queues/Topics服务

JMS Message listener/Consumer :接收并使用消息

JMS Provider有如数据库,Producers/Consumers发送/接收消息前需要先连接到Provider,与建立数据库连接相似,JMS ConnectionFactory负责创建JMS Connection,JMS Connection创建JMS Session

JMS ConnectionFactory : Producers/Consumers用于创建一个到Provider的连接

JMS Connection :封装一个到Provider的连接

JMS Session : 消息发送接收上下文

<a href="http://s3.51cto.com/wyfs02/M00/11/A6/wKiom1LXvQrxVnOnAAD304pI3Qk941.jpg" target="_blank"></a>

在JMS Provider上可以定义多个Queue和Topic,Producers发送消息到哪个Queue/Topic,称具体的那个Queue/Topic为Destination.

JMS Destination : 一对一的Queue或者一对多的Topic

关于JMS编程模型可以参考以下文章:

<a href="http://docs.oracle.com/javaee/6/tutorial/doc/bnceh.html" target="_blank">http://docs.oracle.com/javaee/6/tutorial/doc/bnceh.html</a>

Spring集成配置

Queue消息发送者(JMS Message Producers) :

ConnectionFactory : 用于连接Provider,支持连接池配置

JmsTemplate : Spring封装类,可用于创造消息、发送消息、接收消息等

1

2

3

4

5

6

7

<code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"connectionFactory"</code> <code>class</code><code>=</code><code>"org.apache.activemq.ActiveMQConnectionFactory"</code>

<code>    </code><code>p:brokerURL</code><code>=</code><code>"tcp://localhost:61616"</code> <code>/&gt;</code>

<code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"jmsTemplate"</code> <code>class</code><code>=</code><code>"org.springframework.jms.core.JmsTemplate"</code><code>&gt;</code>

<code>    </code><code>&lt;</code><code>constructor-arg</code> <code>name</code><code>=</code><code>"connectionFactory"</code> <code>ref</code><code>=</code><code>"connectionFactory"</code> <code>/&gt;</code>

<code>    </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"defaultDestinationName"</code> <code>value</code><code>=</code><code>"TestQueue"</code> <code>/&gt;</code>

<code>&lt;/</code><code>bean</code><code>&gt;</code>

<code>&lt;</code><code>context:component-scan</code> <code>base-package</code><code>=</code><code>"com.stevex.demo"</code> <code>/&gt;</code>

连接池配置示例:

8

9

10

11

12

13

14

15

16

17

<code>&lt;!-- a pooling based JMS provider --&gt;</code>

<code>  </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"jmsFactory"</code> <code>class</code><code>=</code><code>"org.apache.activemq.pool.PooledConnectionFactory"</code> <code>destroy-method</code><code>=</code><code>"stop"</code><code>&gt;</code>

<code>    </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"connectionFactory"</code><code>&gt;</code>

<code>      </code><code>&lt;</code><code>bean</code> <code>class</code><code>=</code><code>"org.apache.activemq.ActiveMQConnectionFactory"</code><code>&gt;</code>

<code>        </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"brokerURL"</code><code>&gt;</code>

<code>          </code><code>&lt;</code><code>value</code><code>&gt;tcp://localhost:61616&lt;/</code><code>value</code><code>&gt;</code>

<code>        </code><code>&lt;/</code><code>property</code><code>&gt;</code>

<code>      </code><code>&lt;/</code><code>bean</code><code>&gt;</code>

<code>    </code><code>&lt;/</code><code>property</code><code>&gt;</code>

<code>  </code><code>&lt;/</code><code>bean</code><code>&gt;</code>

<code>                                                                                                 </code> 

<code>  </code><code>&lt;!-- Spring JMS Template --&gt;</code>

<code>  </code><code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"jmsTemplate"</code> <code>class</code><code>=</code><code>"org.springframework.jms.core.JmsTemplate"</code><code>&gt;</code>

<code>      </code><code>&lt;</code><code>ref</code> <code>local</code><code>=</code><code>"jmsFactory"</code><code>/&gt;</code>

Queue消息接收者(JMS Message listener) :

Listener : 接收消息

jms:listener-container 简化了配置工作,destination只需要给queue/topic名称,不需要额外定义

<code>&lt;</code><code>jms:listener-container</code> <code>container-type</code><code>=</code><code>"default"</code>

<code>    </code><code>connection-factory</code><code>=</code><code>"connectionFactory"</code> <code>acknowledge</code><code>=</code><code>"auto"</code><code>&gt;</code>

<code>    </code><code>&lt;</code><code>jms:listener</code> <code>destination</code><code>=</code><code>"TestQueue"</code> <code>ref</code><code>=</code><code>"messageListener"</code>

<code>        </code><code>method</code><code>=</code><code>"onMessage"</code> <code>/&gt;</code>

<code>&lt;/</code><code>jms:listener-container</code><code>&gt;</code>

<code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"messageListener"</code> <code>class</code><code>=</code><code>"com.stevex.demo.SimpleMessageListener"</code><code>/&gt;</code>

Topic消息发送者(JMS Message Producers) :

JmsTemplate 的pubSubDomain属性值为true表示destination为Topic类型,默认false表示destination为Queue类型。

<code>    </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"defaultDestinationName"</code> <code>value</code><code>=</code><code>"TestTopic"</code> <code>/&gt;</code>

<code>    </code><code>&lt;</code><code>property</code> <code>name</code><code>=</code><code>"pubSubDomain"</code> <code>value</code><code>=</code><code>"true"</code> <code>/&gt;</code>

Topic消息接收者(JMS Message listener) :

Topic类型的消息,所有订阅者都可以接收到,本文Demo定义了两个接收者。

jms:listener-container 的destination-type属性默认值为queue,如果是Topic需要显示指定。

<code>&lt;</code><code>bean</code> <code>id</code><code>=</code><code>"subscriber1"</code> <code>class</code><code>=</code><code>"com.stevex.demo.SimpleMessageListener"</code> <code>/&gt;</code>

<code>    </code><code>destination-type</code><code>=</code><code>"topic"</code> <code>connection-factory</code><code>=</code><code>"connectionFactory"</code>

<code>    </code><code>acknowledge</code><code>=</code><code>"auto"</code><code>&gt;</code>

<code>    </code><code>&lt;</code><code>jms:listener</code> <code>destination</code><code>=</code><code>"TestTopic"</code> <code>ref</code><code>=</code><code>"subscriber1"</code>

Producer类实现代码

<code>@Component</code><code>(</code><code>"messageSender"</code><code>)</code>

<code>public</code> <code>class</code> <code>SimpleMessageSender </code><code>implements</code> <code>MessageSender {</code>

<code>    </code><code>@Autowired</code>

<code>    </code><code>private</code> <code>JmsTemplate jmsTemplate;</code>

<code>    </code><code>@Override</code>

<code>    </code><code>public</code> <code>void</code> <code>sendMessage(</code><code>final</code> <code>String message) {</code>

<code>        </code><code>jmsTemplate.send(</code><code>new</code> <code>MessageCreator() {</code>

<code>                                                                                                  </code> 

<code>            </code><code>public</code> <code>Message createMessage(Session session) </code><code>throws</code> <code>JMSException {</code>

<code>                </code><code>return</code> <code>session.createTextMessage(message);</code>

<code>            </code><code>}</code>

<code>        </code><code>});</code>

<code>    </code><code>}</code>

<code>}</code>

Listener类实现代码

<code>@Component</code><code>(</code><code>"messageListener"</code><code>)</code>

<code>public</code> <code>class</code> <code>SimpleMessageListener </code><code>implements</code> <code>MessageListener {</code>

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>Logger logger = LoggerFactory.getLogger(SimpleMessageListener.</code><code>class</code><code>);</code>

<code>    </code><code>public</code> <code>void</code> <code>onMessage(Message message) {</code>

<code>        </code><code>TextMessage textMessage = (TextMessage) message;</code>

<code>                                                                                            </code> 

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>logger.info(</code><code>"Message received: "</code> <code>+ textMessage.getText());</code>

<code>        </code><code>} </code><code>catch</code> <code>(JMSException ex) {</code>

<code>            </code><code>logger.error(</code><code>"JMS error"</code><code>, ex);</code>

<code>        </code><code>}</code>

应用跑起来后,ActiveMQ接收到连接后,如果请求的Queue/Topic不存在它会自动创建,我们也可以通过ActiveMQ Admin界面给Listener发消息。

<a href="http://down.51cto.com/data/2364002" target="_blank">附件:http://down.51cto.com/data/2364002</a>

     本文转自sarchitect 51CTO博客,原文链接:http://blog.51cto.com/stevex/1352334,如需转载请自行联系原作者