1、activeMQ介绍:
① activeMQ是什么:
面向消息的中间件
② activeMQ的作用: 解耦、异步、削峰
如上图所示,学生给老师提问,当人数多了老师就忙不过来,所以就想了个办法让学生以统一的格式就问题提交给班长(activeMQ),这样就让学生和老师解耦 了;那可能老师有事就能及时解决班长提交的问题,但是会处理,这样就异步了;大量的学生不用直接找老师了,而是找班长,这样消息中间件就可以抵御洪峰流量了,同时也保护了主业务。
③ 什么情况下使用activeMQ:
1、系统之间耦合比较严重;例如订单系统增加一个调用服务,就需要改动大量代码,可以更少改动,所以解耦非常重要。
2、面对洪峰流量并发时,容易被冲垮,使用缓冲池,能够削峰。
3,等待同步存在性能问题,所以异步
消息发送尽量异步
2、activeMQ启动及监测:
检查activemq 是否启动的三种方法: 也是三种查看后台进程的方法(软件放在/opt目录)
ps -ef|grep activemq|grep -v grep // grep -v grep 可以不让显示grep 本来的信息
netstat -anp|grep 61616 // activemq 的默认后台端口是61616
lsof -i:61616
./activemq start/stop/restart
./activemq status 查看运行状态
记得使用ifconfig查看局域网ip,然后互ping
开放防火墙端口号

activeMQ如何使用:
主要功能:实现高可用、高性能、可伸缩、易用和安全的企业级面向消息服务系统。
JMS编程架构:
3、Queue 模式:
生产者
package com.at.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
// linux 上部署的activemq 的 IP 地址 + activemq 的端口号,如果用自己的需要改动
public static final String ACTIVEMQ_URL = "tcp://192.168.25.25:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException{
// 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码admin/admin
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂连接 connection 和 启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3 创建会话 session,两个参数,第一个事务, 第二个签收
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地 (两种 : 队列/主题 这里用队列)
Queue queue = session.createQueue(QUEUE_NAME);
// 5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//messageProducer .setDeliveryMode(DeliveryMode.PERSISTENT); 默认持久化
// 6 通过messageProducer 生产 3 条 消息发送到消息队列中
try {
for (int i = 1; i < 4 ; i++) {
// 7 创建字消息
TextMessage textMessage = session.createTextMessage("msg--" + i);
// 8 通过messageProducer发布消息
messageProducer.send(textMessage);
}
session.commit(); //开启事务,需提交
}
catch (JMSException e) {
e.printStackTrace();
session.rollback();//失败回滚
}
finally {
// 9 关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 消息发送到MQ完成 ****");
}
}
}
消费者
public class Customer {
public static final String ACTIVEMQ_URL = "tcp://192.168.25.25:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(Product.ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
//消费者必须消费对应的queue,不然不知道消费谁
MessageConsumer customer = session.createConsumer(queue);
while (true) {
/* 同步阻塞方式reveive()空参数的receive方法是阻塞,有参数的为等待时间
订阅者或消费者使用MessageConsumer 的receive() 方法接收消息,receive 在接收之前一直阻塞 */
//TextMessage receive = (TextMessage) customer.receive(4000L); 4秒后不接收了,单位毫秒
TextMessage receive = (TextMessage) customer.receive();
if (null!= receive ) {
receive.acknowledge(); 手动签收
System.out.println("接收到消息" + receive.getText());
} else {
break;
}
}
customer.close();
session.close();
connection.close();
}
}
======================================================================================
customer.setMessageListener(new MessageListener() { //异步非阻塞的方式消费消息,监听器
@Override
public void onMessage(Message message) {
if (null != message & message instanceof TextMessage) {
try {
TextMessage message1 = (TextMessage) message;
// message1.acknowledge(); 手动签收
System.out.println("接收到消息" + message1.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
System.in.read(); //当输入就结束进程,因为消费需要时间
Queue监控控制页面
名称 | 待处理的消息数量 | 消费者数量 | 消息排队数 | 消息已出列数 | 查看 | 操作 |
---|---|---|---|---|---|---|
Name | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued | Views | Operations |
queue01 | 1 | 2 | 2 | Browse Active Consumers Active Producers | Send To Purge Delete |
Queue总结
1、消息发送时异步,消息消费完就没有了
2、当先启动生产者,然后启动一个或多个消费者,第一个启动的消费者就会消费完消息
3、当多个消费者先启动,然后生产者生产消息到队列中,消费者对半获取消息(负载均衡)
4、topic 订阅模式:
1、消费者在生产者之前启动
2、先订阅才能消费,并且不能接受到订阅之前的消息
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Product {
public static final String ACTIVEMQ_URL = "tcp://192.168.25.25:61616";
public static final String TOPIC_NAME = "topic";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(Product.ACTIVEMQ_URL);
Connection connection = factory.createConnection(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 1; i < 4; i++) {
TextMessage textMessage = session.createTextMessage("hello activeMQ"+i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
}
======================================================================================
public static void main(String[] args) throws Exception {
System.out.println("我是一号消费者" );
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(Product.ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message & message instanceof TextMessage) {
try {
TextMessage message1 = (TextMessage) message;
System.out.println("接收到消息" + message1.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
}
5、Queue和topic对比:
比较 | topic | Queue |
---|---|---|
工作模式 | 订阅模式:①如果当前没有订阅者,消息将被丢弃②如果有多个订阅,这些订阅者都将受到消息 | 负载均衡模式:①如果当前没有消费者,消息不会丢弃;②如果有多个消费者,一条消息只能被一个消费者消费,并且要求消费者ack信息 |
有无状态 | 无状态 | Queue数据默认会在mq服务器上以文件形式保存,比如activeMQ保存在$AMQ_HOME\data\kr-store\data下面,也可以配置成DB |
传递完整性 | 如果没有订阅者,消息将丢弃 | 消息不会丢弃 |
处理效率 | 由于消息按照订阅者数量进行赋值,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 | 由于一条消息只发送给一个消费者,所以消费者再多,性能也不会明显下降,当然协议不同也有差异 |
6、消息的可靠性: activeMQ和JMS规范
①Queue持久性: 参考 消息的五中类型 默认持久化
如果生产者将消息设置为非持久化状态,如果MQ宕机,就会丢失消息,消费者接受不到了
如果设置为持久化,如果宕机,重启activeMQ,只要宕机前没有被消费掉,重启后都可以消费,不丢失
②topic持久性:
public class TopicProduct {//生产者
public static final String ACTIVEMQ_URL = "tcp://192.168.25.25:61616";
public static final String TOPIC_NAME = "topic";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//开启事务
MessageProducer producer = null;
try {
Topic topic = session.createTopic(TOPIC_NAME);
producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);//设置持久化模式
connection.start();//先设置持久化,再启动连接
for (int i = 1; i < 4; i++) {
TextMessage textMessage = session.createTextMessage("topic_persistent" + i);
System.out.println("发送" + textMessage.getText());
producer.send(textMessage);
}
session.commit();//提交事务
} catch (JMSException e) {
e.printStackTrace();
session.rollback();//回滚
} finally {
producer.close();
session.close();
connection.close();
}
}
}====================================================================================
public class TopicCustomer {//topic消费者
public static final String ACTIVEMQ_URL = "tcp://192.168.25.25:61616";
public static final String TOPIC_NAME = "topic";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
System.out.println("zhangsan" );
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("zhangsan");//设置客户端id
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//手动签收
Topic topic= session.createTopic(TOPIC_NAME);//创建主题topic
//设置订阅,第二个参数是订阅的名字
TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "你的月亮我的心");//创建订阅对象
connection.start();
Message message = durableSubscriber.receive();//一致阻塞
while (null != message){
TextMessage message1 = (TextMessage) message;
message1.acknowledge(); //手动签收
System.out.println("接收到"+message1.getText() );
message = durableSubscriber.receive();
}
session.close();
connection.close();
}
}
1、订阅模式消息生产者设置了持久化,需要先去订阅,自己不管宕机与否都能接收到消息
2、生产者没有设置持久化,需要消费者保持在线才能接受到消息,不然消息丢失
③事务 和 Acknowledge 签收 (俗称ack):
生产者:如果设置事务为true开启事务,需要session.commit();提交不然就回滚(开启事务比较安全)
消费者:如果true开启事务,不使用commit()提交,导致消息被多次读取,记得commit();
开启事务的签收:
开始事务后,消费者不管是否手签还是自动签收,都将消费信息 ,下次消费不到
非事务的签收:
非事务消费者,如果自动签收,下次消费不到,如果是手签,必须签收,不然下次还将消费
7、JMS点对点 和 发布订阅 总结:
点对点:
1、点对点是基于队列,生产者发送消息到队列,消费者到队列消费,队列的存在使得消息的异步传输成为可能(类似于发短信)
2、如果session关闭时存在部分消息没被签收(acknowledge),那消费者再次启动连接到该队列时,消息还可以接收到。
3、队列默认可以长久保留消息直到别消费,消费者不用担心消息没被消费而丢失,充分体现了异步传输的优势
订阅:
1、生产者设置持久化,先注册id订阅才能接受到发布的消息(不管是否在线,消息不丢失)
2、生产者没有设置持久化,只有消费者和生产者同时在线条件下才能接受消息
8、broker:
broker:是一个嵌入java代码中的一个activeMQ服务器实例,方便启用和节约资源,也保证了可靠性
<!-- broker 的绑定 不然会ObjectMapper异常 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
不需要了linux启动activeMQ服务器,直接java代码启动一个实例
public static void main(String[] args) throws Exception {
BrokerService service = new BrokerService();
service.setUseJmx(true);
service.addConnector("tcp://localhost:61616");
service.start();
}
如何学习消息中间件:
API 接受发送
MQ 的高可用
MQ 的集群容错配置
MQ 的持久化
延时发送
签收机制
Spring/SpringBoot 整合