天天看點

SpringBoot(十一) 內建消息隊列ActiveMQ

本例為本地操作消息隊列,故需要本地安裝ActiveMQ。

1、本地安裝ActiveMQ。

(1)、下載下傳位址:http://activemq.apache.org/download.html

SpringBoot(十一) 內建消息隊列ActiveMQ

(2)、解壓後,根據電腦作業系統是32位或者64位選擇對應的activemq.bat檔案,啟動ActiveMQ。

SpringBoot(十一) 內建消息隊列ActiveMQ

啟動成功,則顯示如下内容

SpringBoot(十一) 內建消息隊列ActiveMQ

(3)、成功之後在浏覽器輸入http://127.0.0.1:8161/位址,可以看到ActiveMQ的管理頁面,使用者名和密碼預設都是admin

SpringBoot(十一) 內建消息隊列ActiveMQ

2、搭建一個springboot項目,可參照之前的部落格進行搭建

(1)、添加pom依賴如下(本例調用junit測試,如果不用junit測試,可忽略junit的依賴)

<!-- ActiveMQ -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>
		<!-- Junit -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
		    <groupId>junit</groupId>
		    <artifactId>junit</artifactId>
		    <version>4.12</version>
		    <scope>test</scope>
		</dependency>
           

(2)、在application.yml中添加如下的配置(注意在yml配置檔案中不可以使用tab縮進,必需使用空格鍵)

spring:
  activemq:       #消息隊列
    broker-url: tcp://localhost:61616
    in-memory: true           #true 表示使用内置的MQ,false則連接配接伺服器
    pool:     
      enabled: false          #true表示使用連接配接池;false時,每發送一條資料建立一個連接配接
           

(3)、建立生産者

import javax.jms.Destination;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

@Service
public class Producer {

	@Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝
	private JmsMessagingTemplate jmsTemplate;

	// 發送消息
	public void sendMessage(Destination destination, final String message) {
		System.out.println("生産者釋出消息: "+message);
		// 将消息message放入消息隊列destination
		jmsTemplate.convertAndSend(destination, message);
	}
	
	// 監聽消息隊列out.queue,列印消息隊列out.queue收到的消息内容,這裡作為消費者對生産者的回報
	@JmsListener(destination="out.queue")  
    public void consumerMessage(String text){  
        System.out.println("生産者收到消費者的回複封包為:"+text);  
    }  
}
           

(4)、建立消費者1,僅接收生産者的釋出資訊

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
	
	// 消費者1接僅收生産者的消息
	// 使用JmsListener配置消費者監聽的隊列,其中text是接收到的消息
	@JmsListener(destination = "mytest.queue")
	public void receiveQueue(String text) {
		System.out.println("消費者1收到生産者的封包為:"+text);
	}
}
           

(5)、建立消費者2,接收生産者的釋出資訊,且對生産者的釋出資訊進行回複

package com.zw.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
public class Consumer2 {
	
	 // 消費者2接收生産者的消息,并給生産者回報消息
	 @JmsListener(destination = "mytest.queue")  
     @SendTo("out.queue")  
     public String receiveQueue(String text) {  
         System.out.println("消費者2收到生産者的封包為:"+text);  
         return "消費者2回複生産者: 收到消息,"+text;  
     }  
}
           

(6)、建立junit測試類,模拟生産者釋出消息

import javax.jms.Destination;

import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.zw.Boot1Start;
import com.zw.jms.Producer;  
  
@RunWith(SpringRunner.class)  
@SpringBootTest(classes = {Boot1Start.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class SpringbootJmsApplicationTests {  
    
	// 注入自定義的生産者
    @Autowired  
    private Producer producer;  
      
    @Test  
    public void contextLoads() throws InterruptedException {  
    	// 定義生産者釋出資訊的管道mytest.queue消息隊列
        Destination destination = new ActiveMQQueue("mytest.queue");  
        
        // 生産者釋出資訊
        for(int i=0; i<100; i++){  
            producer.sendMessage(destination, "myname is 張三!!!"+i);  
        }  
    }  
	
}
           

(7)、測試結果

SpringBoot(十一) 內建消息隊列ActiveMQ

(8)、檢視消息隊列

浏覽器輸入http://localhost:8161/admin/,使用者名和密碼都是admin,點選橫排的Queues,可檢視使用消息隊列的情況。

  • Number Of Pending Messages:消息隊列中待處理的消息
  • Number Of Consumers:消費者的數量
  • Messages Enqueued:累計進入過消息隊列的總量
  • Messages Dequeued:累計消費過的消息總量
SpringBoot(十一) 內建消息隊列ActiveMQ

3、說明

本例中,共一個生産者,兩個消費者,兩個消息隊列(mytest.queue和out.queue)。生産者将消息推到消息隊列mytest.queue中,消費者通過監聽消息隊列mytest.queue,進而擷取生産者的消息。消費者也可以在監聽的同時将回報消息通過out.queue消息隊列返給生産者,此時生産者既可以(mytest.queue)生産,也可以(out.queue)消費。

本例模式點對點模式,相當于賣火車票,生産出來的一個東西隻能被衆多消費者中的一個給消費掉。

繼續閱讀