天天看點

activemq釋出訂閱持久化內建spring環境

一:pom.xml

<artifactId>spring_activemq_demo</artifactId>
<properties>
    <spring.version>4.2.4.RELEASE</spring.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.11.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-pool -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.11.2</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-webmvc</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-beans</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aspects</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>      

二:建立釋出者配置檔案

<context:component-scan base-package="com.xucj.activemq.spring"></context:component-scan>

<!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
</bean>
<!-- ActiveMQ為我們提供了一個PooledConnectionFactory,通過往裡面注入一個ActiveMQConnectionFactory
可以用來将Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴于 activemq-pool包-->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory" ref="targetConnectionFactory" />
    <property name="maxConnections" value="100" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <!-- 目标ConnectionFactory對應真實的可以産生JMS Connection的ConnectionFactory -->
    <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>

<!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
    <property name="connectionFactory" ref="connectionFactory"/>
    <!-- 進行持久化 -->
    <property name="deliveryMode" value="2" />
    <!-- 開啟訂閱模式 -->
    <property name="pubSubDomain" value="true" />
</bean>
<!--這個是訂閱模式持久化文本資訊-->
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="topic_persistence_text"/>
</bean>      

三:訂閱者配置檔案

<!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
</bean>
<!-- ActiveMQ為我們提供了一個PooledConnectionFactory,通過往裡面注入一個ActiveMQConnectionFactory
可以用來将Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴于 activemq-pool包 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory" ref="targetConnectionFactory" />
    <property name="maxConnections" value="100" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <!--消費者标示id -->
    <property name="clientId" value="clientId_001" />
    <!-- 目标ConnectionFactory對應真實的可以産生JMS Connection的ConnectionFactory -->
    <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>

<!--這個是隊列目的地,釋出訂閱的  文本資訊-->
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="topic_persistence_text"/>
</bean>
<!-- 我的監聽類 -->
<bean id="myMessageListener" class="com.xucj.activemq.spring.MyMessageListener"></bean>
<!-- 消息監聽容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="topicTextDestination" />
    <property name="messageListener" ref="myMessageListener" />
    <!-- 釋出訂閱模式 -->
    <property name="pubSubDomain" value="true" />
    <!-- 消息持久化值設定為true -->
    <property name="subscriptionDurable" value="true" />
    <!--消息接收逾時 -->
    <property name="receiveTimeout" value="10000" />
    <!-- 接收者ID -->
    <property name="clientId" value="clientId_001" />
    <property name="durableSubscriptionName" value="clientId_001" />
</bean>      

四:訂閱者監聽類

import org.springframework.stereotype.Component;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消費者監聽類
 */
@Component
public class MyMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("收到的消息:"+textMessage.getText());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
      

五:釋出者釋出消息元件

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@Component
public class QueueProducerChijiuhua {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Destination topicTextDestination;

    /**
     * 發送文本消息
     * @param text
     */
    public void sendTextMessage(final String text){
        jmsTemplate.send(topicTextDestination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(text);
            }
        });
    }

}      

六:釋出者測試類

import com.xucj.activemq.spring.QueueProducerChijiuhua;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations= "classpath:applicationContext-jms-producer_chijiuhua.xml")
public class TestTopicProducerChijiuhua {

    @Autowired
    private QueueProducerChijiuhua queueProducerChijiuhua;
    @Test
    public void sendTextQueue(){
        queueProducerChijiuhua.sendTextMessage("訂閱持久化21");
    }
}
      

七:訂閱者測試類

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.io.IOException;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations= "classpath:applicationContext-jms-consumer_chijiuhua.xml")
public class TestTopicConsumerChijiuhua {
    @Test
    public void testQueue(){
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}      

八:測試

啟動釋出者測試類釋出兩條消息,随後再啟動訂閱者測試類,可以看到消息監聽類成功監聽到了釋出者釋出的兩條消息:

activemq釋出訂閱持久化內建spring環境