消息隊列一般有兩種模型
1.點對點模型(基于隊列 Point to Point,PTP) 每個消息隻能有一個消費者。消息的生産者和消費者之間沒有時間上的 相關性.可以有多個發送者,但隻能被一個消費者消費。 一個消息隻能被一個接受者接受一次 生産者把消息發送到隊列中(Queue),接受者無需訂閱,當接受者未接受到消息時就會處于阻塞狀态
2. 釋出者/訂閱者模型(基于主題的Publish/Subscribe,pub/sub) 每個消息可以有多個消費者。 生産者和消費者之間有時間上的相關性。訂閱一個主題的消費者隻能消 費自它訂閱之後釋出的消息. 允許多個接受者,類似于廣播的方式 生産者将消息發送到主題上(Topic) 接受者必須先訂閱 注:持久化訂閱者:特殊的消費者,告訴主題,我一直訂閱着,即使網絡斷開,消息伺服器也記住所有持久化訂閱者,如果有新消息,也會知道必定有人回來消費。
在SpringBoot中寫的執行個體
連接配接ActiveMQ工具類:
/*
* Copyright @ 2019 com.iflysse.trains
* 01SpringBoot 下午2:18:30
* All right reserved.
*
*/
package com.dcx.comm.utils;
import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;
/**
* @ClassName: ActiveMqUtils
* @author: cxding
* @createTime: 2019年4月26日 下午2:18:30
* @version: v1.0
*/
@Component
public class ActiveMqUtils {
/**
* 注入JMS
*/
@Autowired
private JmsTemplate jmsTemplate;
/**
* 設定普通
* @Title: sendNorMolMessage
* @author: cxding
* @createTime: 2019年4月28日 下午1:03:56
* @param destination
* @param text void
*/
public <T> void sendNorMolMessage(Destination destination, String text) {
// 連接配接工廠
ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
// 建立連結
connection = connectionFactory.createConnection();
connection.start();
// 建立session,開啟事物
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 建立生産者
producer = session.createProducer(destination);
// 設定持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 設定過期時間
//producer.setTimeToLive(time);
TextMessage message = session.createTextMessage(text);
producer.send(message);
// 送出
session.commit();
} catch (JMSException e) {
throw new RuntimeException(e);
} finally {
// 關閉連接配接
close(producer, session, connection, connectionFactory);
}
}
}
連接配接資訊配置application.properties
#-------------------------------activeMQ--------------------------------
# active mq
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=root
spring.activemq.password=pass
#預設是點對點
#default point to point
#現在需要訂閱釋出模式
spring.jms.pub-sub-domain=true
#預設是點對點
#default point to point
#現在需要訂閱釋出模式
spring.jms.pub-sub-domain=true
控制層
@PostMapping("/producertopic")
@ApiOperation(value="産生消息隊列(訂閱釋出模式,監聽器目前隻監聽:“topicSend”)",response=Result.class)
public Result<String> createTopicMq(@ApiParam(value = "隊列目的地", required = false,defaultValue="topicSend") @RequestParam String topicName,
@ApiParam(value = "隊列消息内容", required = false) @RequestParam String text){
Result<String> result = stuService.createTopicMq(topicName,text);
return result;
}
生産者:實作層
/**
* 釋出主題産生消息隊列
*/
@Override
public Result<String> createTopicMq(String topicName, String text) {
// TODO 釋出-訂閱模式--修改其中的Destination
Destination destination = new ActiveMQTopic(topicName);
for (int i = 0; i < 10; i++) {
utils.sendNorMolMessage(destination, text+i);
}
return new Result<String>("主題消息已經産生",true);
}
監聽消費,設定兩個訂閱者
@Component
public class ActiveMqListenConfig {
@Autowired
private ActiveMqUtils util;
private Logger logger = LoggerFactory.getLogger(this.getClass());
@JmsListener(destination="topicSend")
public void recieveTopicTaskMq(String message) {
logger.info("消費者1訂閱收到的消息是:"+message+",并且這條消息已經被消費了");
}
@JmsListener(destination="topicSend")
public void recieveTopicTaskMq2(String message) {
logger.info("消費者2訂閱收到到的消息是:"+message+",并且這條消息已經被消費了");
}
運作項目後
檢視消息
控制台輸出結果
兩個消費者分别消費了10條消息