通過上一篇文章 《消息隊列深入解析》,我們已經消息隊列是什麼、使用消息隊列的好處以及常見消息隊列的簡單介紹。
這一篇文章,主要帶大家詳細了解一下消息隊列ActiveMQ的使用。
學習消息隊列ActiveMQ的使用之前,我們先來搞清JMS。
JMS
1. JMS基本概念
JMS(JAVA Message Service,java消息服務)是java的消息服務,JMS的用戶端之間可以通過JMS服務進行異步的消息傳輸。JMS(JAVA Message Service,java消息服務)API是一個消息服務的标準或者說是規範,允許應用程式元件基于JavaEE平台建立、發送、接收和讀取消息。它使分布式通信耦合度更低,消息服務更加可靠以及異步性。
2. JMS五種不同的消息正文格式
JMS定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送并接收以一些不同形式的資料,提供現有消息格式的一些級别的相容性。
- StreamMessage -- Java原始值的資料流
- MapMessage--一套名稱-值對
- TextMessage--一個字元串對象
- ObjectMessage--一個序列化的 Java對象
- BytesMessage--一個位元組的資料流
3.JMS兩種消息模型
1 .點到點(P2P)模型
使用隊列(Queue)作為消息通信載體;滿足生産者與消費者模式,一條消息隻能被一個消費者使用,未被消費的消息在隊列中保留直到被消費或逾時。比如:我們生産者發送100條消息的話,兩個消費者來消費一般情況下兩個消費者會按照消息發送的順序各自消費一半(也就是你一個我一個的消費。)後面我們會通過代碼示範來驗證。
2. 釋出/訂閱(Pub/Sub)模型
釋出訂閱模型(Pub/Sub) 使用主題(Topic)作為消息通信載體,類似于廣播模式;釋出者釋出一條消息,該消息通過主題傳遞給所有的訂閱者,在一條消息廣播之後才訂閱的使用者則是收不到該條消息的。
4.JMS編碼接口之間的關系
- ConnectionFactory:建立Connection對象的工廠,針對兩種不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。
- Connection:Connection表示在用戶端和JMS系統之間建立的連結(對TCP/IP socket的包裝)。Connection可以産生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
- Session:Session是操作消息的接口。可以通過session建立生産者、消費者、消息等。Session提供了事務的功能。當需要使用session發送/接收多個消息時,可以将這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
- MessageProducer:消息生産者由Session建立,并用于将消息發送到Destination。同樣,消息生産者分兩種類型:QueueSender和TopicPublisher。可以調用消息生産者的方法(send或publish方法)發送消息。
- MessageConsumer :消息消費者由Session建立,用于接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分别通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。
- Destination:Destination的意思是消息生産者的消息發送目标或者說消息消費者的消息來源。對于消息生産者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。
- MessageListener: 消息監聽器。如果注冊了消息監聽器,一旦消息到達,将自動調用監聽器的onMessage方法。
參考:
https://blog.csdn.net/shaobingj126/article/details/50585035消息隊列ActiveMQ
1.簡介
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實作,盡管JMS規範出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。
2.簡單使用
安裝過程很簡單這裡就不貼安裝過程了,可以自行google.
添加Maven依賴
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>
2.1.測試點對點模型通信
生産者發送消息測試方法:
@Test
public void testQueueProducer() throws Exception {
// 1、建立一個連接配接工廠對象,需要指定服務的ip及端口。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
// 2、使用工廠對象建立一個Connection對象。
Connection connection = connectionFactory.createConnection();
// 3、開啟連接配接,調用Connection對象的start方法。
connection.start();
// 4、建立一個Session對象。
// 第一個參數:是否開啟事務。如果true開啟事務,第二個參數無意義。一般不開啟事務false。
// 第二個參數:應答模式。自動應答或者手動應答。一般自動應答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用Session對象建立一個Destination對象。兩種形式queue、topic,現在應該使用queue
Queue queue = session.createQueue("test-queue");
// 6、使用Session對象建立一個Producer對象。
MessageProducer producer = session.createProducer(queue);
// 7、建立一個Message對象,可以使用TextMessage。
for (int i = 0; i < 50; i++) {
TextMessage textMessage = session.createTextMessage("第"+i+ "一個ActiveMQ隊列目的地的消息");
// 8、發送消息
producer.send(textMessage);
}
// 9、關閉資源
producer.close();
session.close();
connection.close();
}
消費者消費消息測試方法
@Test
public void testQueueConsumer() throws Exception {
// 建立一個ConnectionFactory對象連接配接MQ伺服器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
// 建立一個連接配接對象
Connection connection = connectionFactory.createConnection();
// 開啟連接配接
connection.start();
// 使用Connection對象建立一個Session對象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立一個Destination對象。queue對象
Queue queue = session.createQueue("test-queue");
// 使用Session對象建立一個消費者對象。
MessageConsumer consumer = session.createConsumer(queue);
// 接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 列印結果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println("這是接收到的消息:" + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 等待接收消息
System.in.read();
// 關閉資源
consumer.close();
session.close();
connection.close();
}
我們開啟兩個消費者程序來監聽(運作兩次testQueueConsumer()方法)。
然後我們運作運作生産者測試方法發送消息.先發送消息還是先監聽消息一般不會不影響。
效果如下:
兩個消費者各自消費一半消息,而且還是按照消息發送到消息隊列的順序,這也驗證了我們上面的說法。
第一個消費者
第二個消費者
2.2.測試釋出/訂閱(Pub/Sub)模型通信
@Test
public void testTopicProducer() throws Exception {
// 1、建立一個連接配接工廠對象,需要指定服務的ip及端口。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
// 2、使用工廠對象建立一個Connection對象。
Connection connection = connectionFactory.createConnection();
// 3、開啟連接配接,調用Connection對象的start方法。
connection.start();
// 4、建立一個Session對象。
// 第一個參數:是否開啟事務。如果true開啟事務,第二個參數無意義。一般不開啟事務false。
// 第二個參數:應答模式。自動應答或者手動應答。一般自動應答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用Session對象建立一個Destination對象。兩種形式queue、topic,現在應該使用topic
Topic topic = session.createTopic("test-topic");
// 6、使用Session對象建立一個Producer對象。
MessageProducer producer = session.createProducer(topic);
// 7、建立一個Message對象,可以使用TextMessage。
for (int i = 0; i < 50; i++) {
TextMessage textMessage = session.createTextMessage("第"+i+ "一個ActiveMQ隊列目的地的消息");
// 8、發送消息
producer.send(textMessage);
}
// 9、關閉資源
producer.close();
session.close();
connection.close();
}
消費者消費消息測試方法:
@Test
public void testTopicConsumer() throws Exception {
// 建立一個ConnectionFactory對象連接配接MQ伺服器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616");
// 建立一個連接配接對象
Connection connection = connectionFactory.createConnection();
// 開啟連接配接
connection.start();
// 使用Connection對象建立一個Session對象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立一個Destination對象。topic對象
Topic topic = session.createTopic("test-topic");
// 使用Session對象建立一個消費者對象。
MessageConsumer consumer = session.createConsumer(topic);
// 接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 列印結果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println("這是接收到的消息:" + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic消費者啟動。。。。");
// 等待接收消息
System.in.read();
// 關閉資源
consumer.close();
session.close();
connection.close();
}
先運作兩個消費者程序(提前訂閱,不然收不到發送的消息),然後運作生産者測試方法發送消息。
結果是:
兩個消費者程序都可以接收到生産者發送過來的所有消息,我這裡就不貼圖檔了,
這樣驗證了我們上面的說法。
我們從上面代碼就可以看出,點對點通信和釋出訂閱通信模式的差別就是建立生産者和消費者對象時提供的Destination對象不同,如果是點對點通信建立的Destination對象是Queue,釋出訂閱通信模式通信則是Topic。
3.整合Spring使用
整合spring除了我們上面依賴的Jar包還要依賴
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.2.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>4.2.7.RELEASE</version>
</dependency>
比如我們在我們的系統中現在有兩個服務,第一個服務發送消息,第二個服務接收消息,我們下面看看這是如何實作的。
發送消息
發送消息的配置檔案:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.155:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory對應真實的可以産生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生産者 -->
<!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--這個是隊列目的地,點對點的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--這個是主題目的地,一對多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
</beans>
發送消息的測試方法:
@Test
public void testSpringActiveMq() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//從spring容器中獲得JmsTemplate對象
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
//從spring容器中取Destination對象
Destination destination = (Destination) applicationContext.getBean("queueDestination");
//使用JmsTemplate對象發送消息。
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//建立一個消息對象并傳回
TextMessage textMessage = session.createTextMessage("spring activemq queue message");
return textMessage;
}
});
}
我們上面直接ApplicationContext的getBean方法擷取的對象,實際在項目使用依賴注入即可。
接收消息
建立一個MessageListener的實作類。
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
//取消息内容
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收消息的配置檔案:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.168:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory對應真實的可以産生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--這個是隊列目的地,點對點的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--這個是主題目的地,一對多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
<!-- 接收消息 -->
<!-- 配置監聽器 -->
<bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" />
<!-- 消息監聽容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
測試接收消息的代碼
@Test
public void testQueueConsumer() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//等待
System.in.read();
}
歡迎關注我的微信公衆号:"Java面試通關手冊"(一個有溫度的微信公衆号,期待與你共同進步~~~堅持原創,分享美文,分享各種Java學習資源):。