天天看点

activemq发布订阅持久化集成spring环境

一:pom.xml

<artifactId>spring_activemq_demo</artifactId>
<properties>
    <spring.version>4.2.4.RELEASE</spring.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.11.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-pool -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.11.2</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-webmvc</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-beans</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aspects</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>      

二:创建发布者配置文件

<context:component-scan base-package="com.xucj.activemq.spring"></context:component-scan>

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
</bean>
<!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包-->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory" ref="targetConnectionFactory" />
    <property name="maxConnections" value="100" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
    <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
    <property name="connectionFactory" ref="connectionFactory"/>
    <!-- 进行持久化 -->
    <property name="deliveryMode" value="2" />
    <!-- 开启订阅模式 -->
    <property name="pubSubDomain" value="true" />
</bean>
<!--这个是订阅模式持久化文本信息-->
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="topic_persistence_text"/>
</bean>      

三:订阅者配置文件

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
</bean>
<!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory" ref="targetConnectionFactory" />
    <property name="maxConnections" value="100" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <!--消费者标示id -->
    <property name="clientId" value="clientId_001" />
    <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
    <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>

<!--这个是队列目的地,发布订阅的  文本信息-->
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="topic_persistence_text"/>
</bean>
<!-- 我的监听类 -->
<bean id="myMessageListener" class="com.xucj.activemq.spring.MyMessageListener"></bean>
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="topicTextDestination" />
    <property name="messageListener" ref="myMessageListener" />
    <!-- 发布订阅模式 -->
    <property name="pubSubDomain" value="true" />
    <!-- 消息持久化值设置为true -->
    <property name="subscriptionDurable" value="true" />
    <!--消息接收超时 -->
    <property name="receiveTimeout" value="10000" />
    <!-- 接收者ID -->
    <property name="clientId" value="clientId_001" />
    <property name="durableSubscriptionName" value="clientId_001" />
</bean>      

四:订阅者监听类

import org.springframework.stereotype.Component;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消费者监听类
 */
@Component
public class MyMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("收到的消息:"+textMessage.getText());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
      

五:发布者发布消息组件

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@Component
public class QueueProducerChijiuhua {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Destination topicTextDestination;

    /**
     * 发送文本消息
     * @param text
     */
    public void sendTextMessage(final String text){
        jmsTemplate.send(topicTextDestination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(text);
            }
        });
    }

}      

六:发布者测试类

import com.xucj.activemq.spring.QueueProducerChijiuhua;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations= "classpath:applicationContext-jms-producer_chijiuhua.xml")
public class TestTopicProducerChijiuhua {

    @Autowired
    private QueueProducerChijiuhua queueProducerChijiuhua;
    @Test
    public void sendTextQueue(){
        queueProducerChijiuhua.sendTextMessage("订阅持久化21");
    }
}
      

七:订阅者测试类

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.io.IOException;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations= "classpath:applicationContext-jms-consumer_chijiuhua.xml")
public class TestTopicConsumerChijiuhua {
    @Test
    public void testQueue(){
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}      

八:测试

启动发布者测试类发布两条消息,随后再启动订阅者测试类,可以看到消息监听类成功监听到了发布者发布的两条消息:

activemq发布订阅持久化集成spring环境