个人博客网:https://wushaopei.github.io/ (你想要这里多有)
一、部署操作
1. 部署在linux 上的acvtiveMQ 要可以通过前台windows 的页面访问,必须把linux 的IP和 windows的 IP 地址配置到同一个网关下 。这种情况一般都是修改 linux 的IP 地址,修改网卡文件对应的IP 地址
修改linux 的ip 地址:
cd /etc/sysconfig/network-scripts
vi ifcfg-eth0

这是修改之后的网卡文件配置,IP 地址为:192.168.17.3
配置成功后 ,可以用 windows ping linux , linux ping windows ,当全部ping 通后,可以使用图形化界面访问activeMQ
// ActiveMQ 的前台端口为 8161 , 提供控制台服务 后台端口为61616 ,提供 JMS 服务
// 192.168.17.3 为 linux 的IP 地址, 使用 IP+端口 访问了ActiveMQ , 登陆之后的样子如上。(能访问成功首先得在linux 上启动activeMQ 的服务),首次登录的默认账户密码为 账号:admin 密码:admin ,默认端口号:8161
2、JMS
Java 消息中间件的服务接口规范,activemq 之上是 mq , 而 mq 之上是JMS 定义的消息规范 。 activemq 是mq 技术的一种理论实现(与之相类似的实现还有 Kafka RabbitMQ RockitMQ ),而 JMS 是更上一级的规范。
JMS 的两种模式:
在点对点的消息传递时,目的地称为 队列 queue
在发布订阅消息传递中,目的地称为 主题 topic
类比JDBC编码套路:
第一步:注册驱动(仅仅只做一次)
Class.forName("com.mysql.jdbc.com");
第二步:建立连接(Connection)
DriverManager.getConnection(url,user,password);
第三步:创建运行SQL语句(Statement)
connection.createStatement();
第四步:运行语句
rs.executeQuery(sql);
第五步:处理结果集(ResultSet)
第六步:释放资源
3、工程创建与配置
- IDEA创建Maven工程
- 配置POM.xml文件
pom.xml 依赖:
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.15</version>
</dependency>
4、队列模式与案例讲解
在点对点的消息传递域中,目的地被称为队列(queue)
点对点消息传递域的特点如下:
- 每个消息只能有一个消费者,类似于1对1的关系。好比个人快递自己领自己的。
- 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。
- 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。
(1)demo 队列的消费生产者
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
for (int i = 0; i < 3; i++) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("msg---hello" + i);//理解为一个字符串
//8.通过messageProducer发送给MQ队列
messageProducer.send(textMessage);
}
//9.关闭资源
messageProducer.close();
session.close();
以及在页面上的显示:
控制说明:
Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
Number Of Consumers=消费者数量,消费者端的消费者数量。
Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。
总结:
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
(2)与之相对应的消息消费者(处理消息的系统)代码及运行
① 阻塞式消费者
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 简单消息消费者
*/
public class JmsConsumer {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的消费者,指定消费哪一个队列里面的消息
MessageConsumer messageConsumer = session.createConsumer(queue);
//循环获取
while (true) {
//6.通过消费者调用方法获取队列里面的消息(发送的消息是什么类型,接收的时候就强转成什么类型)
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (textMessage != null) {
System.out.println("****消费者接收到的消息: " + textMessage.getText());
}else {
break;
}
}
//7.关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}
②异步监听式消费者
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* 监听模式下的消费者
*/
public class JmsConsumer2 {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的消费者,指定消费哪一个队列里面的消息
MessageConsumer messageConsumer = session.createConsumer(queue);
//6.通过监听的方式消费消息
/*
异步非阻塞式方式监听器(onMessage)
订阅者或消费者通过创建的消费者对象,给消费者注册消息监听器setMessageListener,
当消息有消息的时候,系统会自动调用MessageListener类的onMessage方法
我们只需要在onMessage方法内判断消息类型即可获取消息
*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
//7.把message转换成消息发送前的类型并获取消息内容
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("****消费者接收到的消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.out.println("执行了39行");
//保证控制台不关闭,阻止程序关闭
System.in.read();
//关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}
控制台显示结果:
(3)JMS开发的基本步骤:
(4)两种消费方式的比较:
- 同步阻塞方式(receive):订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
- 异步非阻塞方式(监听器onMessage()):订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。
5、主题模式与案例讲解
在发布订阅消息传递域中,目的地被称为主题(topic)
发布/订阅消息传递域的特点如下:
- 生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
- 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
- 生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅
(1)发布主题生产者
package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProducer_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(topic);
//6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
for (int i = 0; i < 3; i++) {
//7.通过session创建消息
TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
//8.使用指定好目的地的消息生产者发送消息
messageProducer.send(textMessage);
}
//9.关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("****TOPIC_NAME消息发布到MQ完成");
}
}
控制台展示结果:
(2)订阅主题消费者
package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("我是1号消费者");
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.创建消息的消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
//5.创建消息的消费者,指定消费哪一个队列里面的消息
messageConsumer.setMessageListener(message -> {
if (message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
}
}
注意:先启动订阅者再启动生产者,不然发送的消息是废消息
控制台消费结果:
6、小总结
重点注意:activemq 好像自带负载均衡,当先启动两个队列(Queue)的消费者时,在启动生产者发出消息,此时的消息平均的被两个消费者消费。 并且消费者不会消费已经被消费的消息(即为已经出队的消息)
但是当有多个主题(Topic)订阅者时,发布者发布的消息,每个订阅者都会接收所有的消息。topic 更像是被广播的消息,但是缺点是不能接受已经发送过的消息。
先要有订阅者,生产者才有意义。