1 安裝環境
1、需要jdk
2、安裝Linux系統。生産環境都是Linux系統。
2 安裝步驟
第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。
第二步:解壓縮。
第三步:關閉防火牆
臨時關閉:service iptables stop
寫入配置檔案,開機也不啟動 chkconfig iptables off
第四步:啟動activemq服務
使用bin目錄下的activemq指令啟動:
# ./activemq start 啟動服務
# ./activemq stop 停止服務
# ./activemq status 檢視服務的狀态
注意:如果ActiveMQ整合spring時,一定不要使用activemq-all-5.12.0.jar包。建議使用5.11.2
3 管理背景
1 進入管理背景
http://192.168.25.168:8161
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcuAzM1cDM5YDNy0iNwcDMwgDNwIDOwYDM4EDMy0yN3ADMyMTMvwlNwgTMwIzLcdzNwAjMzEzLcd2bsJ2Lc12bj5ycn9Gbi52YugTMwIzcldWYtl2Lc9CX6MHc0RHaiojIsJye.png)
登入進來之後的界面如下:
4 linux activemq 出現無法通路的解決
出現上面錯誤的原因是因為機器名和ip位址沒有對應上。
解決方式:
1.用cat /etc/sysconfig/network 命名檢視主機名
# cat /etc/sysconfig/network
2.檢視hosts檔案
# cat /etc/hosts 檢視hosts檔案
如果你的機器名(我的是admin)沒有在hosts檔案裡面,就需要将你的機器名加入到hosts的檔案裡面。
也可以修改你的機器名為hosts檔案裡面已有的項
3. 修改完成之後重新開機activemq服務就可以通路了。
5 Queue
1 Producer
生産者:生産消息,發送端。
把jar包添加到工程中。使用5.11.2版本的jar包。
第一步:建立ConnectionFactory對象,需要指定服務端ip及端口号。
第二步:使用ConnectionFactory對象建立一個Connection對象。
第三步:開啟連接配接,調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。
第六步:使用Session對象建立一個Producer對象。
第七步:建立一個Message對象,建立一個TextMessage對象。
第八步:使用Producer對象發送消息。
第九步:關閉資源。
package cn.e3mall.activeMQ;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
/**
* 測試activeMQ
*
* @title:TestActiveMQ
* @description:
* @author jepson
* @date 2018年6月8日 下午10:50:14
* @version 1.0
*/
public class TestActiveMQ {
@Test
public void testQueueProducer() throws Exception {
// 第一步:建立ConnectionFactory對象,需要指定服務端ip及端口号。
// brokerURL伺服器的ip及端口号,端口号是61616,web服務的端口号是8161
String brokerURL="tcp://192.168.25.131:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
// 第二步:使用ConnectionFactory對象建立一個Connection對象。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接配接,調用Connection對象的start方法。
connection.start();
// 第四步:使用Connection對象建立一個Session對象。
/*
*第一個參數:是否開啟事務。
*如果true開啟事務,第二個參數無意義。一般不開啟事務。事務的意思就是沒有發出去就重發
*
*第二個參數:當第一個參數為false時,第二個參數才有意義。
*表示消息的應答模式。1、自動應答2、手動應答。一般是自動應答。
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。
//參數:隊列的名稱。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session對象建立一個Producer對象。
MessageProducer producer = session.createProducer(queue);
// 第七步:建立一個Message對象,建立一個TextMessage對象。
TextMessage message = session.createTextMessage("ActiveMQ helloworld,This is my first activemq test");
// 第八步:使用Producer對象發送消息。
producer.send(message);
// 第九步:關閉資源。
producer.close();
session.close();
connection.close();
}
}
運作測試程式,然後檢視web服務端
點選test-queue
2 Consumer
消費者:接收消息。
第一步:建立一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中獲得一個Connection對象。
第三步:開啟連接配接。調用Connection對象的start方法。
第五步:使用Session對象建立一個Destination對象。和發送端保持一緻queue,并且隊列的名稱一緻。
第六步:使用Session對象建立一個Consumer對象。
第七步:接收消息。
第八步:列印消息。
第九步:關閉資源
@Test
public void testQueueConsumer() throws Exception {
// 第一步:建立一個ConnectionFactory對象連接配接MQ伺服器。
String brokerURL = "tcp://192.168.25.131:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
// 第二步:從ConnectionFactory對象中獲得一個Connection對象。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接配接。調用Connection對象的start方法。
connection.start();
// 第四步:使用Connection對象建立一個Session對象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對象建立一個Destination對象。和發送端保持一緻queue,并且隊列的名稱一緻。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session對象建立一個Consumer對象。
MessageConsumer consumer = session.createConsumer(queue);
// 第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text =null;
//取消息的内容
text = textMessage.getText();
// 第八步:列印消息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待鍵盤輸入
System.in.read();
// 第九步:關閉資源
consumer.close();
session.close();
connection.close();
}
運作上面的測試程式,然後可以在控制看到消費者拿到了消息内容
我們再去檢視一個web服務端
6 Topic
使用步驟:
第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Topic對象。
@Test
public void testTopicProducer() throws Exception {
// 第一步:建立ConnectionFactory對象,需要指定服務端ip及端口号。
// brokerURL伺服器的ip及端口号,端口号是61616,web服務的端口号是8161
String brokerURL="tcp://192.168.25.131:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
// 第二步:使用ConnectionFactory對象建立一個Connection對象。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接配接,調用Connection對象的start方法。
connection.start();
// 第四步:使用Connection對象建立一個Session對象。
/*
*第一個參數:是否開啟事務。
*如果true開啟事務,第二個參數無意義。一般不開啟事務。事務的意思就是沒有發出去就重發
*
*第二個參數:當第一個參數為false時,第二個參數才有意義。
*表示消息的應答模式。1、自動應答2、手動應答。一般是自動應答。
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Topic對象。
//參數:隊列的名稱。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session對象建立一個Producer對象。
MessageProducer producer = session.createProducer(topic);
// 第七步:建立一個Message對象,建立一個TextMessage對象。
/*
* TextMessage message = new ActiveMQTextMessage(); message.setText(
* "hello activeMq,this is my first test.");
*/
TextMessage message = session.createTextMessage("hello activeMq,this is my first test.");
// 第八步:使用Producer對象發送消息。
producer.send(message);
// 第九步:關閉資源。
producer.close();
session.close();
connection.close();
}
第五步:使用Session對象建立一個Destination對象。和發送端保持一緻topic,并且話題的名稱一緻。
@Test
public void testTopicConsumer() throws Exception {
// 第一步:建立一個ConnectionFactory對象連接配接MQ伺服器。
String brokerURL = "tcp://192.168.25.131:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
// 第二步:從ConnectionFactory對象中獲得一個Connection對象。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接配接。調用Connection對象的start方法。
connection.start();
// 第四步:使用Connection對象建立一個Session對象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對象建立一個Destination對象。和發送端保持一緻topic,并且話題的名稱一緻。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session對象建立一個Consumer對象。
MessageConsumer consumer = session.createConsumer(topic);
// 第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text =null;
//取消息的内容
text = textMessage.getText();
// 第八步:列印消息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic的消費端01。。。。。");
//等待鍵盤輸入
System.in.read();
// 第九步:關閉資源
consumer.close();
session.close();
connection.close();
}
3 測試
1 測試一
運作producer生成者
可以看到消息發送了一個,沒有發送到任何消費者。消息不會持久化,直接丢失掉了,點選test-topic也看不到發送的消息内容。
2 測試二
運作consumer消費者3次,相當于啟動了三個消費者。分别修改輸出,用以區分.。
System.out.println("topic的消費端01。。。。。");
System.out.println("topic的消費端02。。。。。");
System.out.println("topic的消費端03。。。。。");
然後運作producer
會發現三個消費者都能夠收到消息
然後我們檢視一下web服務端
可以看到有3個消費者,發送了2次消息【測試一發送一次,是以總的是兩次】,3條消息已出隊。
如果我們在運作一次producer,會顯示發送了消息3次,6條消息已出隊
7 Quene 和 Topic的差別
Quene:點對點,消息會被持久化
Topic:廣播,消息不會被持久化
8 activemq和spring的整合
第一步:引入相關的jar包
<!-- ActiveMQ用戶端依賴的jar包 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
第二步:producer生産者的spring配置檔案
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://47.93.53.127:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory對應真實的可以産生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生産者 -->
<!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--這個是隊列目的地,點對點的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--這個是主題目的地,一對多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
</beans>
第三步:生産者的代碼
package cn.e3mall.activeMQ;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
/**
* 測試spring整合activemq
* @title:TestSpringActiveMQ
* @description:
* @author jepson
* @date 2018年6月10日 下午5:41:47
* @version 1.0
*/
public class TestSpringActiveMQ {
@Test
public void testSpringActiveMq() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//從spring容器中獲得JmsTemplate對象
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
//從spring容器中取Destination對象
Destination destination = (Destination) applicationContext.getBean("queueDestination");
//使用JmsTemplate對象發送消息。
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//建立一個消息對象并傳回
TextMessage textMessage = session.createTextMessage("spring activemq queue message");
return textMessage;
}
});
}
}
第四步:MessageListener代碼實作
package cn.e3mall.search.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 測試消息接收
* @title:MyMessageListener
* @description:
* @author jepson
* @date 2018年6月10日 下午7:01:04
* @version 1.0
*/
public class MyMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
//取消息内容
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
第五步:consumer消費者的spring配置檔案
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://47.93.53.127:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory對應真實的可以産生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生産者 -->
<!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--這個是隊列目的地,點對點的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--這個是主題目的地,一對多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
<!-- 接收消息 -->
<!-- 配置監聽器 -->
<bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" />
<!-- 消息監聽容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
第六步:測試消費
package cn.e3mall.activemq;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
*
* @title:MessageConsumer
* @description:
* @author jepson
* @date 2018年6月10日 下午7:07:29
* @version 1.0
*/
public class MessageConsumer {
@Test
public void testQueueConsumer() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//等待
System.in.read();
}
}