本例為本地操作消息隊列,故需要本地安裝ActiveMQ。
1、本地安裝ActiveMQ。
(1)、下載下傳位址:http://activemq.apache.org/download.html
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL5lERONTQq1EMNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL3ADOzIzMxkDM1AzNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
(2)、解壓後,根據電腦作業系統是32位或者64位選擇對應的activemq.bat檔案,啟動ActiveMQ。
啟動成功,則顯示如下内容
(3)、成功之後在浏覽器輸入http://127.0.0.1:8161/位址,可以看到ActiveMQ的管理頁面,使用者名和密碼預設都是admin
2、搭建一個springboot項目,可參照之前的部落格進行搭建
(1)、添加pom依賴如下(本例調用junit測試,如果不用junit測試,可忽略junit的依賴)
<!-- ActiveMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- Junit -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
(2)、在application.yml中添加如下的配置(注意在yml配置檔案中不可以使用tab縮進,必需使用空格鍵)
spring:
activemq: #消息隊列
broker-url: tcp://localhost:61616
in-memory: true #true 表示使用内置的MQ,false則連接配接伺服器
pool:
enabled: false #true表示使用連接配接池;false時,每發送一條資料建立一個連接配接
(3)、建立生産者
import javax.jms.Destination;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer {
@Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝
private JmsMessagingTemplate jmsTemplate;
// 發送消息
public void sendMessage(Destination destination, final String message) {
System.out.println("生産者釋出消息: "+message);
// 将消息message放入消息隊列destination
jmsTemplate.convertAndSend(destination, message);
}
// 監聽消息隊列out.queue,列印消息隊列out.queue收到的消息内容,這裡作為消費者對生産者的回報
@JmsListener(destination="out.queue")
public void consumerMessage(String text){
System.out.println("生産者收到消費者的回複封包為:"+text);
}
}
(4)、建立消費者1,僅接收生産者的釋出資訊
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
// 消費者1接僅收生産者的消息
// 使用JmsListener配置消費者監聽的隊列,其中text是接收到的消息
@JmsListener(destination = "mytest.queue")
public void receiveQueue(String text) {
System.out.println("消費者1收到生産者的封包為:"+text);
}
}
(5)、建立消費者2,接收生産者的釋出資訊,且對生産者的釋出資訊進行回複
package com.zw.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
@Component
public class Consumer2 {
// 消費者2接收生産者的消息,并給生産者回報消息
@JmsListener(destination = "mytest.queue")
@SendTo("out.queue")
public String receiveQueue(String text) {
System.out.println("消費者2收到生産者的封包為:"+text);
return "消費者2回複生産者: 收到消息,"+text;
}
}
(6)、建立junit測試類,模拟生産者釋出消息
import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.zw.Boot1Start;
import com.zw.jms.Producer;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {Boot1Start.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class SpringbootJmsApplicationTests {
// 注入自定義的生産者
@Autowired
private Producer producer;
@Test
public void contextLoads() throws InterruptedException {
// 定義生産者釋出資訊的管道mytest.queue消息隊列
Destination destination = new ActiveMQQueue("mytest.queue");
// 生産者釋出資訊
for(int i=0; i<100; i++){
producer.sendMessage(destination, "myname is 張三!!!"+i);
}
}
}
(7)、測試結果
(8)、檢視消息隊列
浏覽器輸入http://localhost:8161/admin/,使用者名和密碼都是admin,點選橫排的Queues,可檢視使用消息隊列的情況。
- Number Of Pending Messages:消息隊列中待處理的消息
- Number Of Consumers:消費者的數量
- Messages Enqueued:累計進入過消息隊列的總量
- Messages Dequeued:累計消費過的消息總量
3、說明
本例中,共一個生産者,兩個消費者,兩個消息隊列(mytest.queue和out.queue)。生産者将消息推到消息隊列mytest.queue中,消費者通過監聽消息隊列mytest.queue,進而擷取生産者的消息。消費者也可以在監聽的同時将回報消息通過out.queue消息隊列返給生産者,此時生産者既可以(mytest.queue)生産,也可以(out.queue)消費。
本例模式點對點模式,相當于賣火車票,生産出來的一個東西隻能被衆多消費者中的一個給消費掉。