天天看点

消息中间件—ActiveMQ

  在分布式系统程序解耦合的过程中经常使用消息中间件技术,常见的消息中间件产品有ActiveMQ、RabbitMQ、ZeroMQ、Kafka等,今天主要介绍一下ActiveMQ。

  ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

    消息中间件依靠高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型来进行分布式环境下的进程通信,消息中间件的角色大致有提供者(producer)和消费者(consumer)。

一、JMS

 jms是java平台上面向消息中间件的技术规范,用于两个应用程序之间或者分布式系统中发送消息,进行异步通信。JMS只定义了一系列的接口规范,是一个与具体厂商无关的API。jms定义了五种不同类型的消息正文格式:

1、TextMessage   一个字符串对象

2、MapMessage    一套 名称—值 对

3、ObjectMessage  一个序列化的java对象

4、BytesMessage   一个字节的数据流

5、StreamMessage  java原始值的数据流

JMS拥有两种消息传递类型:

1、点对点(Queue) 生产者和消费者一一对应

消息中间件—ActiveMQ

2、发布/订阅模式(topic) 一个生产者发布消息,多个消费者接收

消息中间件—ActiveMQ

两种类型的区别:

Queue:

 允许后订阅,Queue模式主要建立在一个队列上,当连接上队列时,发送端可以直接往队列发送消息,不需要考虑是否有接收端在接收,如果不存在接收端监听的话,发送的消息会先存在ActiveMQ的服务器上,直到有接收端进行接收。

 点对点的模式可以有多个发送端和多个接收端,但是一条消息只能被接收一次,先连接上的接收端先接收到,后连接上的接收不到。

topic:

只能先订阅,必须在发消息之前就已经订阅了才会接收到消息,topic中发送端发送的消息会被每一个接收端都执行一次。

二、Active-MQ入门demo

发送/接收消息的步骤 :

1、创建连接工厂

2、获取连接

3、启动连接

4、获取session (参数1:是否启用事务  参数2:消息确认模式)

5、创建队列或topic对象

6、创建消息生产者或对象消费者

7、创建消息    // 接收消息,设置监听

8、使用生产者发送消息    

9、关闭资源

注意细节 :

1、启用事务后必须要commit();

2、消息确认模式的分类:

    AUTO_ACKNOWLEDGE = 1      自动确认

    CLIENT_ACKNOWLEDGE = 2    客户端手动确认   

    DUPS_OK_ACKNOWLEDGE = 3   自动批量确认

    SESSION_TRANSACTED = 0    事务提交并确认

如果开启事务后,消息确认模式会自动切换为SESSION_TRANSACTED模式

Queue模式生产者发送消息

public class QueueProducer {
    public static void main(String[] args) throws JMSException {

        // 创建连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://这里写ip地址:61616");
        // 获取连接
        Connection connection = factory.createConnection();
        // 启动连接
        connection.start();
        // 获取session(参数1:是否启动连接   参数2:消息确认模式)
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 创建队列对象
        Queue queue = session.createQueue("test-queue");
        // 创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        // 创建消息
        TextMessage textMessage = session.createTextMessage("这是一个测试消息");
        //  发送消息
        producer.send(textMessage);
        session.commit();
        // 关闭资源
        producer.close();
        session.close();
        connection.close();
    }

}
           

运行结果如下图:

ActiveMQ后台待消费的消息为1,入队消息为1。

消息中间件—ActiveMQ

Queue模式消费者接收消息

public class QueueConsumer {
    public static void main(String[] args) throws JMSException {

        // 创建连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://这里写ip地址:61616");
        // 获取连接
        Connection connection = factory.createConnection();
        // 启动连接
        connection.start();
        // 获取session(参数1:是否启动连接   参数2:消息确认模式)
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 创建队列对象
        Queue queue = session.createQueue("test-queue");
        // 创建消息消费者
        MessageConsumer consumer = session.createConsumer(queue);
        // 接收消息 设置监听
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        session.commit();
        // 关闭资源   使用死循环让监听挂起,不关闭资源
      while (true){}
  }
           

运行结果如下:

消息中间件—ActiveMQ

原生ActiveMQ了解一下即可,在开发中经常使用的是Spring整合的JMS

三、Spring整合JMS

Queue模式

消息生产者

1、编写applicationContext-activemq-producer.xml配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans   
		http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context   
		http://www.springframework.org/schema/context/spring-context.xsd
		http://www.springframework.org/schema/jms
		http://www.springframework.org/schema/jms/spring-jms.xsd">
		
		
	<context:component-scan base-package="cn.springjms.demo"></context:component-scan>
	
	   
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
	    <property name="brokerURL" value="tcp://ip地址:61616"/>
	</bean>
	   
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
	<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
	    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
	</bean>  
		   
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
	    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
	    <property name="connectionFactory" ref="connectionFactory"/>  
	</bean>      
    <!--这个是队列目的地,点对点的  文本信息-->  
	<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
	    <constructor-arg value="queue_text"/>  
	</bean>    
	
	<!--这个是订阅模式  文本信息-->  
	<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">  
	    <constructor-arg value="topic_text"/>  
	</bean>  
	
</beans>
           

2、编写发送消息的类

@Component
public class SpringJmsProduce {
    @Autowired
    private ActiveMQQueue queue;

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMessage(final String message){
        jmsTemplate.send(queue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}
           

单元测试

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext-activemq-producer.xml")
public class SpringMessageSend {

    @Autowired
    private SpringJmsProduce produce;

    @Test
    public void sendTest(){
        produce.sendMessage("测试一下");
    }
}
           

消息接收者

1、编写applicationContext-activemq-consumer.xml配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans   
		http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context   
		http://www.springframework.org/schema/context/spring-context.xsd
		http://www.springframework.org/schema/jms
		http://www.springframework.org/schema/jms/spring-jms.xsd">
	
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
	    <property name="brokerURL" value="tcp://ip地址:61616"/>
	</bean>
	   
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
	<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
	    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
	</bean>  
	
    <!--这个是队列目的地,点对点的  文本信息-->  
	<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
	    <constructor-arg value="queue_text"/>  
	</bean>
	<!--这个是订阅模式  文本信息-->
	<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="topic_text"/>
	</bean>

	<!-- 我的监听类 -->
	<bean id="myMessageListener" class="cn.springjms.demo.SpringJmsReceiver"></bean>
	<!-- 消息监听容器 -->
	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />  // 连接信息
		<property name="destination" ref="queueTextDestination" />  // 目标信息
		<property name="messageListener" ref="myMessageListener" /> // 监听类
	</bean>
	
</beans>
           

2、编写消息接收类 (实现MessageListener 接口)

@Component
public class SpringJmsReceiver implements MessageListener{

    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println(textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
           

3、单元测试

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext-activemq-consumer.xml")
public class SpringReceiverTest {

    @Test
    public void testRun(){
        while (true);
    }
}
           

topic模式与Queue模式代码编写类似。

继续阅读