JMS消息組成詳解
整個JMS協定組成結構如下
結構 | 描述 |
---|---|
JMS Provider | 消息中間件/消息伺服器 |
JMS Producer | 消息生産者 |
JMS Consumer | 消息消費者 |
JMS Message | 消息(重要) |
JMS Message消息由三部分組成:
1)消息頭
2)消息體
3)消息屬性
JMS消息頭
JMS消息頭預定義了若幹字段用于用戶端與JMS提供者之間識别和發送消息,預編譯頭如下:紅色為重要的消息頭
名稱 | 描述 |
---|---|
JMSDestination | 消息發送的 Destination,在發送過程中由提供者設定 |
JMSMessageID | 唯一辨別提供者發送的每一條消息。這個字段是在發送過程中由提供者設 置的,客戶機隻能在消息發送後才能确定消息的 JMSMessageID |
JMSDeliveryMode | 消息持久化。包含值 DeliveryMode.PERSISTENT 或者 DeliveryMode.NON_PERSISTENT。 |
JMSTimestamp | 提供者發送消息的時間,由提供者在發送過程中設定 |
JMSExpiration | 消息失效的時間,毫秒,值 0 表明消息不會過期,預設值為0 |
JMSPriority | 消息的優先級,由提供者在發送過程中設定。優先級 0 的優先級最低,優 先級 9 的優先級最高。0-4為普通消息,5-9為加急消息。優先級高就一定先發送,隻保證了加急消息必須先于普通消息發送。預設值為4 |
JMSCorrelationID | 通常用來連結響應消息與請求消息,由發送消息的 JMS 程式設定。 |
JMSReplyTo | 請求程式用它來指出回複消息應發送的地方,由發送消息的 JMS 程式設定 |
JMSType | JMS 程式用它來指出消息的類型。 |
JMSRedelivered | More Actions消息的重發标志,false,代表該消息是第一次發生,true,代表該消息為 重發消息 |
不過需要注意的是,在傳送消息時,消息頭的值由JMS提供者來設定,是以開發者使用以上setJMSXXX()方法配置設定的值就被忽略了,隻有以下幾個值是可以由開發者設定的:JMSCorrelationID,JMSReplyTo,JMSType
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
// 這裡可以指定每個消息的目的地
textMessage.setJMSDestination(topic);
/*
持久模式和非持久模式。
一條持久性的消息:應該被傳送“一次僅僅一次”,這就意味着如果JMS提供者出現故障,該消息并不會丢失,它會在伺服器恢複之後再次傳遞。
一條非持久的消息:最多會傳遞一次,這意味着伺服器出現故障,該消息将會永遠丢失。
*/
textMessage.setJMSDeliveryMode(0);
/*
可以設定消息在一定時間後過期,預設是永不過期。
消息過期時間,等于Destination的send方法中的timeToLive值加上發送時刻的GMT時間值。
如果timeToLive值等于0,則JMSExpiration被設為0,表示該消息永不過期。
如果發送後,在消息過期時間之後還沒有被發送到目的地,則該消息被清除。
*/
textMessage.setJMSExpiration(1000);
/* 消息優先級,從0-9十個級别,0-4是普通消息5-9是加急消息。
JMS不要求MQ嚴格按照這十個優先級發送消息但必須保證加急消息要先于普通消息到達。預設是4級。
*/
textMessage.setJMSPriority(10);
// 唯一辨別每個消息的辨別。MQ會給我們預設生成一個,我們也可以自己指定。
textMessage.setJMSMessageID("ABCD");
// 上面有些屬性在send方法裡也能設定
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息發送到MQ完成 ****");
}
}
JMS消息體
在消息體中,JMS API定義了五種類型的消息格式,讓我們可以以不同的形式發送和接受消息,并提供了對已有消息格式的相容。不同的消息類型如下:
JMS 定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送并接收一些不同形式的資料,提供現有消息格式的一些級别的相容性。
· TextMessage--一個字元串對象 *
· MapMessage--一套名稱-值對
· ObjectMessage--一個序列化的 Java 對象 *
· BytesMessage--一個位元組的資料流 *
· StreamMessage -- Java原始值的資料流
生産者:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringBootProducer {
//JmsMessagingTemplate: 用于工具類發送消息
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private JmsTemplate jmsTemplate;
@Value("${activemq.name}")
private String name;
/**
* 發送TextMessage消息
*/
@Test
public void testMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage("文本消息");
return textMessage;
}
});
}
/**
* 發送MapMessage消息
*/
@Test
public void mapMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","張三");
mapMessage.setInt("age",20);
return mapMessage;
}
});
}
/**
* 發送ObjectMessage消息
*//*
@Test
public void objectMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
User user = new User("小明","123456");
ObjectMessage objectMessage = session.createObjectMessage(user);
return objectMessage;
}
});
}
*/
/**
* 發送BytesMessage消息
*/
@Test
public void bytesMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
//1.讀取檔案
File file = new File("d:/activemq/spring.jpg");
//2.建構檔案輸入流
try {
FileInputStream inputStream = new FileInputStream(file);
//3.把檔案流寫入到緩存數組中
byte[] buffer = new byte[(int)file.length()];
inputStream.read(buffer);
//4.把緩存數組寫入到BytesMessage中
bytesMessage.writeBytes(buffer);
} catch (Exception e) {
e.printStackTrace();
}
return bytesMessage;
}
});
}
/**
* 發送StreamMessage消息
*/
@Test
public void streamMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("你好,ActiveMQ");
streamMessage.writeInt(20);
//設定消息屬性:标記、過濾
streamMessage.setStringProperty("訂單","order");
return streamMessage;
}
});
}
}
消費者:
@Component // 放入IOC容器
public class MsgListener {
/**
* 接收TextMessage的方法
*/
/*@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}*/
/*@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
System.out.println("名稱:"+mapMessage.getString("name"));
System.out.println("年齡:"+mapMessage.getString("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}*/
/* @JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof ObjectMessage){
ObjectMessage objectMessage = (ObjectMessage)message;
try {
User user = (User)objectMessage.getObject();
System.out.println(user.getUsername());
System.out.println(user.getPassword());
} catch (JMSException e) {
e.printStackTrace();
}
}
}*/
/*@JmsListener(destination = "${activemq.name}")
public void receiveBytesMessage(Message message){
if(message instanceof BytesMessage){
BytesMessage bytesMessage = (BytesMessage)message;
try {
System.out.println("接收消息内容:"+bytesMessage.getBodyLength());
//1.設計緩存數組
byte[] buffer = new byte[(int)bytesMessage.getBodyLength()];
//2.把位元組消息的内容寫入到緩存數組
bytesMessage.readBytes(buffer);
//3.建構檔案輸出流
FileOutputStream outputStream = new FileOutputStream("d:/activemq/test.jpg");
//4.把資料寫出本地硬碟
outputStream.write(buffer);
} catch (Exception e) {
e.printStackTrace();
}
}
}*/
@JmsListener(destination = "${activemq.name}")
public void receiveStreamMessage(Message message){
if(message instanceof StreamMessage){
StreamMessage streamMessage = (StreamMessage)message;
try {
//接收消息屬性
System.out.println(streamMessage.getStringProperty("訂單"));
System.out.println(streamMessage.readString());
System.out.println(streamMessage.readInt());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
注意:ActiveMQ5.12後 ,為了安全考慮,ActiveMQ預設不接受自定義的序列化對象,需要将自定義的加入到受信任的清單
# springboot與activemq整合配置
activemq:
broker-url: tcp://192.168.1.144:61616 # 連接配接位址
user: admin # activemq使用者名
password: admin :# activemq密碼
packages:
trust-all: true # 讓ActiveMQ信任全部自定義對象,實作對象的序列化或反序列化
消息屬性
如果需要除消息頭字段之外的值,那麼可以使用消息屬性。他是識别/去重/重點标注等操作,非常有用的方法。
他們是以屬性名和屬性值對的形式制定的。可以将屬性是為消息頭得擴充,屬性指定一些消息頭沒有包括的附加資訊,比如可以在屬性裡指定消息選擇器。消息的屬性就像可以配置設定給一條消息的附加消息頭一樣。它們允許開發者添加有關消息的不透明附加資訊。它們還用于暴露消息選擇器在消息過濾時使用的資料。

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.144:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
// 調用Message的set*Property()方法,就能設定消息屬性。根據value的資料類型的不同,有相應的API。
textMessage.setStringProperty("From","[email protected]");
textMessage.setByteProperty("Spec", (byte) 1);
textMessage.setBooleanProperty("Invalide",true);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息發送到MQ完成 ****");
}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsummer_topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.144:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener( (message) -> {
if (null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("消息體:"+textMessage.getText());
System.out.println("消息屬性:"+textMessage.getStringProperty("From"));
System.out.println("消息屬性:"+textMessage.getByteProperty("Spec"));
System.out.println("消息屬性:"+textMessage.getBooleanProperty("Invalide"));
}catch (JMSException e) {
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}