個人部落格網:https://wushaopei.github.io/ (你想要這裡多有)
一、部署操作
1. 部署在linux 上的acvtiveMQ 要可以通過前台windows 的頁面通路,必須把linux 的IP和 windows的 IP 位址配置到同一個網關下 。這種情況一般都是修改 linux 的IP 位址,修改網卡檔案對應的IP 位址
修改linux 的ip 位址:
cd /etc/sysconfig/network-scripts
vi ifcfg-eth0

這是修改之後的網卡檔案配置,IP 位址為:192.168.17.3
配置成功後 ,可以用 windows ping linux , linux ping windows ,當全部ping 通後,可以使用圖形化界面通路activeMQ
// ActiveMQ 的前台端口為 8161 , 提供控制台服務 背景端口為61616 ,提供 JMS 服務
// 192.168.17.3 為 linux 的IP 位址, 使用 IP+端口 通路了ActiveMQ , 登陸之後的樣子如上。(能通路成功首先得在linux 上啟動activeMQ 的服務),首次登入的預設賬戶密碼為 賬号:admin 密碼:admin ,預設端口号:8161
2、JMS
Java 消息中間件的服務接口規範,activemq 之上是 mq , 而 mq 之上是JMS 定義的消息規範 。 activemq 是mq 技術的一種理論實作(與之相類似的實作還有 Kafka RabbitMQ RockitMQ ),而 JMS 是更上一級的規範。
JMS 的兩種模式:
在點對點的消息傳遞時,目的地稱為 隊列 queue
在釋出訂閱消息傳遞中,目的地稱為 主題 topic
類比JDBC編碼套路:
第一步:注冊驅動(僅僅隻做一次)
Class.forName("com.mysql.jdbc.com");
第二步:建立連接配接(Connection)
DriverManager.getConnection(url,user,password);
第三步:建立運作SQL語句(Statement)
connection.createStatement();
第四步:運作語句
rs.executeQuery(sql);
第五步:處理結果集(ResultSet)
第六步:釋放資源
3、工程建立與配置
- IDEA建立Maven工程
- 配置POM.xml檔案
pom.xml 依賴:
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.15</version>
</dependency>
4、隊列模式與案例講解
在點對點的消息傳遞域中,目的地被稱為隊列(queue)
點對點消息傳遞域的特點如下:
- 每個消息隻能有一個消費者,類似于1對1的關系。好比個人快遞自己領自己的。
- 消息的生産者和消費者之間沒有時間上的相關性。無論消費者在生産者發送消息的時候是否處于運作狀态,消費者都可以提取消息。好比我們的發送短信,發送者發送後不見得接收者會即收即看。
- 消息被消費後隊列中不會再存儲,是以消費者不會消費到已經被消費掉的消息。
(1)demo 隊列的消費生産者
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.建立連接配接工廠,按照給定的URL,采用預設的使用者名密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連接配接工廠,獲得connection并啟動通路
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.建立會話session
//兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是隊列queue還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.建立消息的生産者
MessageProducer messageProducer = session.createProducer(queue);
//6.通過使用消息生産者,生産三條消息,發送到MQ的隊列裡面
for (int i = 0; i < 3; i++) {
//7.建立消息
TextMessage textMessage = session.createTextMessage("msg---hello" + i);//了解為一個字元串
//8.通過messageProducer發送給MQ隊列
messageProducer.send(textMessage);
}
//9.關閉資源
messageProducer.close();
session.close();
以及在頁面上的顯示:
控制說明:
Number Of Pending Messages=等待消費的消息,這個是未出隊列的數量,公式=總接收數-總出隊列數。
Number Of Consumers=消費者數量,消費者端的消費者數量。
Messages Enqueued=進隊消息數,進隊列的總消息量,包括出隊列的。這個數隻增不減。
Messages Dequeued=出隊消息數,可以了解為是消費者消費掉的數量。
總結:
當有一個消息進入這個隊列時,等待消費的消息是1,進入隊列的消息是1。
當消息消費後,等待消費的消息是0,進入隊列的消息是1,出隊列的消息是1。
(2)與之相對應的消息消費者(處理消息的系統)代碼及運作
① 阻塞式消費者
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 簡單消息消費者
*/
public class JmsConsumer {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.建立連接配接工廠,按照給定的URL,采用預設的使用者名密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連接配接工廠,獲得connection并啟動通路
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.建立會話session
//兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是隊列queue還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.建立消息的消費者,指定消費哪一個隊列裡面的消息
MessageConsumer messageConsumer = session.createConsumer(queue);
//循環擷取
while (true) {
//6.通過消費者調用方法擷取隊列裡面的消息(發送的消息是什麼類型,接收的時候就強轉成什麼類型)
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (textMessage != null) {
System.out.println("****消費者接收到的消息: " + textMessage.getText());
}else {
break;
}
}
//7.關閉資源
messageConsumer.close();
session.close();
connection.close();
}
}
②異步監聽式消費者
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* 監聽模式下的消費者
*/
public class JmsConsumer2 {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1.建立連接配接工廠,按照給定的URL,采用預設的使用者名密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連接配接工廠,獲得connection并啟動通路
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.建立會話session
//兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是隊列queue還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.建立消息的消費者,指定消費哪一個隊列裡面的消息
MessageConsumer messageConsumer = session.createConsumer(queue);
//6.通過監聽的方式消費消息
/*
異步非阻塞式方式監聽器(onMessage)
訂閱者或消費者通過建立的消費者對象,給消費者注冊消息監聽器setMessageListener,
當消息有消息的時候,系統會自動調用MessageListener類的onMessage方法
我們隻需要在onMessage方法内判斷消息類型即可擷取消息
*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
//7.把message轉換成消息發送前的類型并擷取消息内容
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("****消費者接收到的消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.out.println("執行了39行");
//保證控制台不關閉,阻止程式關閉
System.in.read();
//關閉資源
messageConsumer.close();
session.close();
connection.close();
}
}
控制台顯示結果:
(3)JMS開發的基本步驟:
(4)兩種消費方式的比較:
- 同步阻塞方式(receive):訂閱者或接收者抵用MessageConsumer的receive()方法來接收消息,receive方法在能接收到消息之前(或逾時之前)将一直阻塞。
- 異步非阻塞方式(監聽器onMessage()):訂閱者或接收者通過MessageConsumer的setMessageListener(MessageListener listener)注冊一個消息監聽器,當消息到達之後,系統會自動調用監聽器MessageListener的onMessage(Message message)方法。
5、主題模式與案例講解
在釋出訂閱消息傳遞域中,目的地被稱為主題(topic)
釋出/訂閱消息傳遞域的特點如下:
- 生産者将消息釋出到topic中,每個消息可以有多個消費者,屬于1:N的關系;
- 生産者和消費者之間有時間上的相關性。訂閱某一個主題的消費者隻能消費自它訂閱之後釋出的消息。
- 生産者生産時,topic不儲存消息它是無狀态的不落地,假如無人訂閱就去生産,那就是一條廢消息,是以,一般先啟動消費者再啟動生産者。
JMS規範允許客戶建立持久訂閱,這在一定程度上放松了時間上的相關性要求。持久訂閱允許消費者消費它在未處于激活狀态時發送的消息。一句話,好比我們的微信公衆号訂閱
(1)釋出主題生産者
package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProducer_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
//1.建立連接配接工廠,按照給定的URL,采用預設的使用者名密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連接配接工廠,獲得connection并啟動通路
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.建立會話session
//兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是隊列queue還是主題topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.建立消息的生産者
MessageProducer messageProducer = session.createProducer(topic);
//6.通過使用消息生産者,生産三條消息,發送到MQ的隊列裡面
for (int i = 0; i < 3; i++) {
//7.通過session建立消息
TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
//8.使用指定好目的地的消息生産者發送消息
messageProducer.send(textMessage);
}
//9.關閉資源
messageProducer.close();
session.close();
connection.close();
System.out.println("****TOPIC_NAME消息釋出到MQ完成");
}
}
控制台展示結果:
(2)訂閱主題消費者
package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("我是1号消費者");
//1.建立連接配接工廠,按照給定的URL,采用預設的使用者名密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連接配接工廠,獲得connection并啟動通路
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.建立會話session
//兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是隊列queue還是主題topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.建立消息的消費者
MessageConsumer messageConsumer = session.createConsumer(topic);
//5.建立消息的消費者,指定消費哪一個隊列裡面的消息
messageConsumer.setMessageListener(message -> {
if (message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
}
}
注意:先啟動訂閱者再啟動生産者,不然發送的消息是廢消息
控制台消費結果:
6、小總結
重點注意:activemq 好像自帶負載均衡,當先啟動兩個隊列(Queue)的消費者時,在啟動生産者發出消息,此時的消息平均的被兩個消費者消費。 并且消費者不會消費已經被消費的消息(即為已經出隊的消息)
但是當有多個主題(Topic)訂閱者時,釋出者釋出的消息,每個訂閱者都會接收所有的消息。topic 更像是被廣播的消息,但是缺點是不能接受已經發送過的消息。
先要有訂閱者,生産者才有意義。