天天看点

activeMQ(一) Queue和topic

1、activeMQ介绍:     

① activeMQ是什么:

    面向消息的中间件

② activeMQ的作用: 解耦、异步、削峰

如上图所示,学生给老师提问,当人数多了老师就忙不过来,所以就想了个办法让学生以统一的格式就问题提交给班长(activeMQ),这样就让学生和老师解耦 了;那可能老师有事就能及时解决班长提交的问题,但是会处理,这样就异步了;大量的学生不用直接找老师了,而是找班长,这样消息中间件就可以抵御洪峰流量了,同时也保护了主业务。

③ 什么情况下使用activeMQ:

1、系统之间耦合比较严重;例如订单系统增加一个调用服务,就需要改动大量代码,可以更少改动,所以解耦非常重要。

2、面对洪峰流量并发时,容易被冲垮,使用缓冲池,能够削峰。

3,等待同步存在性能问题,所以异步

消息发送尽量异步

2、activeMQ启动及监测:

检查activemq 是否启动的三种方法: 也是三种查看后台进程的方法(软件放在/opt目录)

ps -ef|grep activemq|grep -v grep      // grep -v  grep 可以不让显示grep 本来的信息
netstat -anp|grep 61616    // activemq 的默认后台端口是61616
lsof -i:61616 

./activemq start/stop/restart
./activemq status  		查看运行状态
记得使用ifconfig查看局域网ip,然后互ping
开放防火墙端口号
           
activeMQ(一) Queue和topic

activeMQ如何使用:

主要功能:实现高可用、高性能、可伸缩、易用和安全的企业级面向消息服务系统。

JMS编程架构:

3、Queue 模式:

生产者

package com.at.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce {
     //  linux 上部署的activemq 的 IP 地址 + activemq 的端口号,如果用自己的需要改动
    public static final String ACTIVEMQ_URL = "tcp://192.168.25.25:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws  JMSException{
        // 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码admin/admin
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂连接 connection 和 启动
        Connection  connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3 创建会话  session,两个参数,第一个事务, 第二个签收
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地 (两种 : 队列/主题   这里用队列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        //messageProducer .setDeliveryMode(DeliveryMode.PERSISTENT); 默认持久化
        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
        try {
		        for (int i = 1; i < 4 ; i++) {
		            // 7  创建字消息
		            TextMessage textMessage = session.createTextMessage("msg--" + i);
		            // 8  通过messageProducer发布消息
		            messageProducer.send(textMessage);
		        	}
		         session.commit(); //开启事务,需提交
		  	} 
		  catch (JMSException e) {
            e.printStackTrace();
            session.rollback();//失败回滚
       		}      
         finally {
	        // 9 关闭资源
	        messageProducer.close();
	        session.close();
	        connection.close();
	        System.out.println("  **** 消息发送到MQ完成 ****");
        	}
    }
}
           

消费者

public class Customer {

    public static final String ACTIVEMQ_URL = "tcp://192.168.25.25:61616";
	 public static final String QUEUE_NAME = "queue01";
	 
    public static void main(String[] args) throws JMSException {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(Product.ACTIVEMQ_URL);
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
		//消费者必须消费对应的queue,不然不知道消费谁
        MessageConsumer customer = session.createConsumer(queue);
        while (true) {
        /* 同步阻塞方式reveive()空参数的receive方法是阻塞,有参数的为等待时间
订阅者或消费者使用MessageConsumer 的receive() 方法接收消息,receive 在接收之前一直阻塞 */       
       		//TextMessage receive = (TextMessage) customer.receive(4000L); 4秒后不接收了,单位毫秒
            TextMessage receive = (TextMessage) customer.receive();
            if (null!= receive ) {
				receive.acknowledge();  手动签收
                System.out.println("接收到消息" + receive.getText());
            } else {

                break;
            }
        }
            customer.close();
            session.close();
            connection.close();
    }
}
======================================================================================  
customer.setMessageListener(new MessageListener() {  //异步非阻塞的方式消费消息,监听器
            @Override
            public void onMessage(Message message) {
                if (null != message & message instanceof TextMessage) {
                    try {
                        TextMessage message1 = (TextMessage) message;
                       // message1.acknowledge();  手动签收
                        System.out.println("接收到消息" + message1.getText());

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();  //当输入就结束进程,因为消费需要时间
           

Queue监控控制页面

名称 待处理的消息数量 消费者数量 消息排队数 消息已出列数 查看 操作
Name Number Of Pending Messages Number Of Consumers Messages Enqueued Messages Dequeued Views Operations
queue01 1 2 2 Browse Active Consumers Active Producers Send To Purge Delete
activeMQ(一) Queue和topic

Queue总结

1、消息发送时异步,消息消费完就没有了

2、当先启动生产者,然后启动一个或多个消费者,第一个启动的消费者就会消费完消息

3、当多个消费者先启动,然后生产者生产消息到队列中,消费者对半获取消息(负载均衡)

4、topic 订阅模式:

1、消费者在生产者之前启动

2、先订阅才能消费,并且不能接受到订阅之前的消息

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Product {
    public static final String ACTIVEMQ_URL = "tcp://192.168.25.25:61616";
    public static final String TOPIC_NAME = "topic";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(Product.ACTIVEMQ_URL);
        Connection connection = factory.createConnection(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD);
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer producer = session.createProducer(topic);

        for (int i = 1; i < 4; i++) {
            TextMessage textMessage = session.createTextMessage("hello activeMQ"+i);
            producer.send(textMessage);
        }

        producer.close();
        session.close();
        connection.close();
    }
  ======================================================================================  
  public static void main(String[] args) throws Exception {
        System.out.println("我是一号消费者"  );
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(Product.ACTIVEMQ_URL);
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic(TOPIC_NAME);
        MessageConsumer consumer = session.createConsumer(topic);


        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (null != message & message instanceof TextMessage) {
                    try {
                        TextMessage message1 = (TextMessage) message;
                        System.out.println("接收到消息" + message1.getText());

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();
        consumer.close();
        session.close();
        connection.close();

    }
           

5、Queue和topic对比:

比较 topic Queue
工作模式 订阅模式:①如果当前没有订阅者,消息将被丢弃②如果有多个订阅,这些订阅者都将受到消息 负载均衡模式:①如果当前没有消费者,消息不会丢弃;②如果有多个消费者,一条消息只能被一个消费者消费,并且要求消费者ack信息
有无状态 无状态 Queue数据默认会在mq服务器上以文件形式保存,比如activeMQ保存在$AMQ_HOME\data\kr-store\data下面,也可以配置成DB
传递完整性 如果没有订阅者,消息将丢弃 消息不会丢弃
处理效率 由于消息按照订阅者数量进行赋值,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 由于一条消息只发送给一个消费者,所以消费者再多,性能也不会明显下降,当然协议不同也有差异

6、消息的可靠性: activeMQ和JMS规范

①Queue持久性: 参考 消息的五中类型   默认持久化

如果生产者将消息设置为非持久化状态,如果MQ宕机,就会丢失消息,消费者接受不到了

如果设置为持久化,如果宕机,重启activeMQ,只要宕机前没有被消费掉,重启后都可以消费,不丢失

activeMQ(一) Queue和topic

②topic持久性:

public class TopicProduct {//生产者

    public static final String ACTIVEMQ_URL = "tcp://192.168.25.25:61616";
    public static final String TOPIC_NAME = "topic";

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//开启事务
        MessageProducer producer = null;
        try {
            Topic topic = session.createTopic(TOPIC_NAME);
            producer = session.createProducer(topic);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);//设置持久化模式

            connection.start();//先设置持久化,再启动连接
            for (int i = 1; i < 4; i++) {
                TextMessage textMessage = session.createTextMessage("topic_persistent" + i);
                System.out.println("发送" + textMessage.getText());
                producer.send(textMessage);
            }
            session.commit();//提交事务
        } catch (JMSException e) {
            e.printStackTrace();
            session.rollback();//回滚
        } finally {
	        producer.close();
	        session.close();
	        connection.close();
        }

    }


}====================================================================================  
public class TopicCustomer {//topic消费者
    public static final String ACTIVEMQ_URL = "tcp://192.168.25.25:61616";
    public static final String TOPIC_NAME = "topic";

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        System.out.println("zhangsan" );
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("zhangsan");//设置客户端id
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//手动签收
        Topic topic= session.createTopic(TOPIC_NAME);//创建主题topic
        //设置订阅,第二个参数是订阅的名字
        TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "你的月亮我的心");//创建订阅对象
        connection.start();

        Message message = durableSubscriber.receive();//一致阻塞
        while (null != message){
            TextMessage message1 = (TextMessage) message;
            message1.acknowledge();  //手动签收
            System.out.println("接收到"+message1.getText() );
            message = durableSubscriber.receive();
        }
        session.close();
        connection.close();
    }
}
1、订阅模式消息生产者设置了持久化,需要先去订阅,自己不管宕机与否都能接收到消息
2、生产者没有设置持久化,需要消费者保持在线才能接受到消息,不然消息丢失
           
activeMQ(一) Queue和topic

③事务 和 Acknowledge 签收 (俗称ack):

生产者:如果设置事务为true开启事务,需要session.commit();提交不然就回滚(开启事务比较安全)

消费者:如果true开启事务,不使用commit()提交,导致消息被多次读取,记得commit();

开启事务的签收:

开始事务后,消费者不管是否手签还是自动签收,都将消费信息 ,下次消费不到 
           

非事务的签收:

非事务消费者,如果自动签收,下次消费不到,如果是手签,必须签收,不然下次还将消费
           

7、JMS点对点 和 发布订阅 总结:

点对点:

1、点对点是基于队列,生产者发送消息到队列,消费者到队列消费,队列的存在使得消息的异步传输成为可能(类似于发短信)

2、如果session关闭时存在部分消息没被签收(acknowledge),那消费者再次启动连接到该队列时,消息还可以接收到。

3、队列默认可以长久保留消息直到别消费,消费者不用担心消息没被消费而丢失,充分体现了异步传输的优势

订阅:

1、生产者设置持久化,先注册id订阅才能接受到发布的消息(不管是否在线,消息不丢失)

2、生产者没有设置持久化,只有消费者和生产者同时在线条件下才能接受消息

8、broker:

broker:是一个嵌入java代码中的一个activeMQ服务器实例,方便启用和节约资源,也保证了可靠性

<!--  broker 的绑定 不然会ObjectMapper异常 -->
 <dependency>
     <groupId>com.fasterxml.jackson.core</groupId>
     <artifactId>jackson-databind</artifactId>
     <version>2.9.5</version>
 </dependency>
         <!--  activemq  所需要的jar 包-->
 <dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-all</artifactId>
     <version>5.15.9</version>
 </dependency>
           

不需要了linux启动activeMQ服务器,直接java代码启动一个实例

public static void main(String[] args) throws Exception {
        BrokerService service = new BrokerService();
        service.setUseJmx(true);
        service.addConnector("tcp://localhost:61616");
        service.start();
    }
           

如何学习消息中间件:

API 接受发送

MQ 的高可用

MQ 的集群容错配置

MQ 的持久化

延时发送

签收机制

Spring/SpringBoot 整合