天天看點

JMS 學習

1.4      消息轉換器 MessageConverter

MessageConverter 的作用主要有兩方面,一方面它可以把我們的非标準化 Message對象轉換成我們的目标 Message 對象,這主要是用在發送消息的時候;另一方面它又可以把我們的 Message 對象轉換成對應的目标對象,這主要是用在接收消息的時候。

下面我們就拿發送一個對象消息來舉例,假設我們有這樣一個需求:我們平台有一個發送郵件的功能,進行發送的時候我們隻是把我們的相關資訊封裝成一個 JMS 消息,然後利用 JMS 進行發送,在對應的消息監聽器進行接收到的消息處理時才真正的進行消息發送。

假設我們有這麼一個 Email 對象:

public class Email implements Serializable {
 
    private static final long serialVersionUID = -658250125732806493L;
 
    private String receiver;
    private String title;
    private String content;
 
    public Email(String receiver, String title, String content) {
        this.receiver = receiver;
        this.title = title;
        this.content = content;
    }
 
    public String getReceiver() {
        return receiver;
    }
 
    public void setReceiver(String receiver) {
        this.receiver = receiver;
    }
 
    public String getTitle() {
        return title;
    }
 
    public void setTitle(String title) {
        this.title = title;
    }
 
    public String getContent() {
        return content;
    }
 
    public void setContent(String content) {
        this.content = content;
    }
 
    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("Email [receiver=").append(receiver).append(", title=")
                .append(title).append(", content=").append(content).append("]");
        return builder.toString();
    }
    
}
           

       這個 Email 對象包含了一個簡單的接收者 email 位址、郵件主題和郵件内容。我們在發送的時候就把這個對象封裝成一個 ObjectMessage 進行發送。代碼如下所示:

public class ProducerServiceImpl implements ProducerService {
 
    @Autowired
    private JmsTemplate jmsTemplate;    

    public void sendMessage(Destination destination, final Serializable obj) {
        jmsTemplate.send(destination, new MessageCreator() {
 
            public Message createMessage(Session session) throws JMSException {
                ObjectMessage objMessage = session.createObjectMessage(obj);
                return objMessage;
            }
            
        });
    }
 
}
           

       這是對應的在沒有使用 MessageConverter 的時候我們需要 new 一個MessageCreator 接口對象,然後在其抽象方法 createMessage 内部使用 session 建立一個對應的消息。在使用了 MessageConverter 的時候我們在使用 JmsTemplate 進行消息發送時隻需要調用其對應的 convertAndSend 方法即可。如:

public void sendMessage(Destination destination, final Serializable obj) {
        //未使用MessageConverter的情況
        /*jmsTemplate.send(destination, new MessageCreator() {
 
            public Message createMessage(Session session) throws JMSException {
                ObjectMessage objMessage = session.createObjectMessage(obj);
                return objMessage;
            }
            
        });*/
        //使用MessageConverter的情況
        jmsTemplate.convertAndSend(destination, obj);
    }
           

這樣 JmsTemplate 就會在其内部調用預定的 MessageConverter 對我們的消息對象進行轉換,然後再進行發送。

       這個時候我們就需要定義我們的 MessageConverter 了。要定義自己的MessageConverter 很簡單,隻需要實作 Spring 為我們提供的 MessageConverter 接口即可。我們先來看一下 MessageConverter 接口的定義:

public interface MessageConverter {
 
    Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;
 
    Object fromMessage(Message message) throws JMSException, MessageConversionException;
 
}
           

       我們可以看到其中一共定義了兩個方法 fromMessage 和 toMessage ,fromMessage 是用來把一個 JMS Message 轉換成對應的 Java 對象,而 toMessage 方法是用來把一個 Java 對象轉換成對應的 JMS Message 。因為我們已經知道上面要發送的對象就是一個 Email 對象,是以在這裡我們就簡單地定義一個 EmailMessageConverter用來把 Email 對象和對應的 ObjectMessage 進行轉換,其代碼如下:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
 
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
 
public class EmailMessageConverter implements MessageConverter {
 
    public Message toMessage(Object object, Session session)
            throws JMSException, MessageConversionException {
        return session.createObjectMessage((Serializable) object);
    }
 
    public Object fromMessage(Message message) throws JMSException,
            MessageConversionException {
        ObjectMessage objMessage = (ObjectMessage) message;
        return objMessage.getObject();
    }
 
}
           

       這樣當我們利用 JmsTemplate 的 convertAndSend 方法發送一個 Email 對象的時候就會把對應的 Email 對象當做參數調用我們定義好的 EmailMessageConverter 的toMessage 方法。

       定義好我們的 EmailMessageConverter 之後就需要指定我們用來發送 Email 對象的JmsTemplate 對象的 messageConverter 為 EmailMessageConverter ,這裡我們在 Spring的配置檔案中定義 JmsTemplate bean 的時候就指定:

<!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!-- 消息轉換器 -->
        <property name="messageConverter" ref="emailMessageConverter"/>
    </bean>
    <!-- 類型轉換器 -->
    <bean id="emailMessageConverter" class="com.tiantian.springintejms.converter.EmailMessageConverter"/>      

       到此我們的 MessageConverter 就定義好了,也能夠進行使用了,接着我們來進行測試一下,定義測試代碼如下所示:

@Test
    public void testObjectMessage() {
        Email email = new Email("[email protected]", "主題", "内容");
        producerService.sendMessage(destination, email);
    }
           

       上面 destination 對應的接收處理的 MessageListener 方法如下所示:

public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
        
        if (message instanceof ObjectMessage) {
            ObjectMessage objMessage = (ObjectMessage) message;
            try {
                Object obj = objMessage.getObject();
                Email email = (Email) obj;
                System.out.println("接收到一個ObjectMessage,包含Email對象。");
                System.out.println(email);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
 
}
           

       之前說了 MessageConverter 有兩方面的功能,除了把 Java 對象轉換成對應的 Jms Message 之外還可以把 Jms Message 轉換成對應的 Java 對象。我們看上面的消息監聽器在接收消息的時候接收到的就是一個 Jms Message ,如果我們要利用MessageConverter 來把它轉換成對應的 Java 對象的話,隻能是我們往裡面注入一個對應的 MessageConverter ,然後在裡面手動的調用,如:

public class ConsumerMessageListener implements MessageListener {
 
    private MessageConverter messageConverter;
    
    public void onMessage(Message message) {
        
        if (message instanceof ObjectMessage) {
            ObjectMessage objMessage = (ObjectMessage) message;
            try {
                /*Object obj = objMessage.getObject();
                Email email = (Email) obj;*/
                Email email = (Email) messageConverter.fromMessage(objMessage);
                System.out.println("接收到一個ObjectMessage,包含Email對象。");
                System.out.println(email);
            } catch (JMSException e) {
                e.printStackTrace();
            }
            
        }
    }
 
    public MessageConverter getMessageConverter() {
        return messageConverter;
    }
 
    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }
 
}
           

       當我們使用 MessageListenerAdapter 來作為消息監聽器的時候,我們可以為它指定一個對應的 MessageConverter ,這樣 Spring 在處理接收到的消息的時候就會自動地利用我們指定的 MessageConverter 對它進行轉換,然後把轉換後的 Java 對象作為參數調用指定的消息處理方法。這裡我們再把前面講解 MessageListenerAdapter 時定義的MessageListenerAdapter 拿來做一個測試,我們指定它的 MessageConverter 為我們定義好的 EmailMessageConverter 。

<!-- 消息監聽擴充卡 -->
    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate">
            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
        </property>
        <property name="defaultListenerMethod" value="receiveMessage"/>
        <property name="messageConverter" ref="emailMessageConverter"/>
    </bean>
    <!-- 消息監聽擴充卡對應的監聽容器 -->
    <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="adapterQueue"/>
        <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來作為消息監聽器 -->
    </bean>      

       然後在我們的真正用于處理接收到的消息的 ConsumerListener 中添加一個receiveMessage 方法,添加後其代碼如下所示:

public class ConsumerListener {
 
    public void receiveMessage(String message) {
        System.out.println("ConsumerListener通過receiveMessage接收到一個純文字消息,消息内容是:" + message);
    }
    
    public void receiveMessage(Email email) {
        System.out.println("接收到一個包含Email的ObjectMessage。");
        System.out.println(email);
    }
    
}      

       然後我們定義如下測試代碼:

@Test
    public void testObjectMessage() {
        Email email = new Email("[email protected]", "主題", "内容");
        producerService.sendMessage(adapterQueue, email);
    }
           

       因為我們給 MessageListenerAdapter 指定了一個 MessageConverter ,而且是一個EmailMessageConverter ,是以當 MessageListenerAdapter 接收到一個消息後,它會調用我們指定的 MessageConverter 的 fromMessage 方法把它轉換成一個 Java 對象,根據定義這裡會轉換成一個 Email 對象,然後會把這個 Email 對象作為參數調用我們通過defaultListenerMethod 屬性指定的預設處理器方法,根據定義這裡就是 receiveMessage方法,但是我們可以看到在 ConsumerListener 中我們一共定義了兩個 receiveMessage方法,因為是通過轉換後的 Email 對象作為參數進行方法調用的,是以這裡調用的就應該是參數類型為 Email 的 receiveMessage 方法了。上述測試代碼運作後會輸出如下結果: 

JMS 學習

        說到這裡可能有讀者就會有疑問了,說我們在之前講解 MessageListenerAdapter 的時候不是沒有指定對應的 MessageConverter ,然後發送了一個 TextMessage ,結果Spring 還是把它轉換成一個 String 對象,調用了 ConsumerListener 參數類型為 String 的receiveMessage 方法嗎?那你這個 MessageConverter 在 MessageListenerAdapter 進行消息接收的時候也沒什麼用啊。

       其實還是有用的,在我們使用 MessageListenerAdapter 時,在對其進行初始化也就是調用其構造方法時,它會預設 new 一個 Spring 已經為我們實作了的MessageConverter —— SimpleMessageConverter 作為其預設的 MessageConverter ,這也就是為什麼我們在使用 MessageListenerAdapter 的時候不需要指定 MessageConverter但是消息還是會轉換成對應的 Java 對象的原因。是以預設情況下我們使用MessageListenerAdapter 時其對應的 MessageListener 的處理器方法參數類型必須是一個普通 Java 對象,而不能是對應的 Jms Message 對象。

       那如果我們在處理 Jms Message 的時候想使用 MessageListenerAdapter ,然後又希望處理最原始的 Message ,而不是經過 MessageConverter 進行轉換後的 Message 該怎麼辦呢?這個時候我們隻需要在定義 MessageListenerAdapter 的時候指定其MessageConverter 為空就可以了。

<!-- 消息監聽擴充卡 -->
    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate">
            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
        </property>
        <property name="defaultListenerMethod" value="receiveMessage"/>
        <property name="messageConverter">
            <null/>
        </property>
    </bean>      

       那麼這個時候我們的真實 MessageListener 的處理器方法參數類型就應該是 Jms Message 或對應的 Jms Message 子類型了,不然就會調用不到對應的處理方法了。這裡因為我們發送的是一個 ObjectMessage ,是以這裡就添加一個對應的參數類型為ObjectMessage 的 receiveMessage 方法了。

public void receiveMessage(ObjectMessage message) throws JMSException {
        System.out.println(message.getObject());
    }      

       剛剛講到 Spring 已經為我們實作了一個簡單的 MessageConverter ,即org.springframework.jms.support.converter.SimpleMessageConverter ,其實 Spring 在初始化 JmsTemplate 的時候也指定了其對應的 MessageConverter 為一個SimpleMessageConverter ,是以如果我們平常沒有什麼特殊要求的時候可以直接使用JmsTemplate 的 convertAndSend 系列方法進行消息發送,而不必繁瑣的在調用 send 方法時自己 new 一個 MessageCreator 進行相應 Message 的建立。

這裡我們也來看一下 SimpleMessageConverter 的定義,如果覺得它不能滿足你的要求,那我們可以對它裡面的部分方法進行重寫,或者是完全實作自己的MessageConverter 。

public class SimpleMessageConverter implements MessageConverter {
 
    public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
        if (object instanceof Message) {
            return (Message) object;
        }
        else if (object instanceof String) {
            return createMessageForString((String) object, session);
        }
        else if (object instanceof byte[]) {
            return createMessageForByteArray((byte[]) object, session);
        }
        else if (object instanceof Map) {
            return createMessageForMap((Map) object, session);
        }
        else if (object instanceof Serializable) {
            return createMessageForSerializable(((Serializable) object), session);
        }

        else {
            throw new MessageConversionException("Cannot convert object of type [" +
                    ObjectUtils.nullSafeClassName(object) + "] to JMS message. Supported message " +
                    "payloads are: String, byte array, Map<String,?>, Serializable object.");
        }
    }
 
    public Object fromMessage(Message message) throws JMSException, MessageConversionException {
        if (message instanceof TextMessage) {
            return extractStringFromMessage((TextMessage) message);
        }
        else if (message instanceof BytesMessage) {
            return extractByteArrayFromMessage((BytesMessage) message);
        }
        else if (message instanceof MapMessage) {
            return extractMapFromMessage((MapMessage) message);
        }
        else if (message instanceof ObjectMessage) {
            return extractSerializableFromMessage((ObjectMessage) message);
        }
        else {
            return message;
        }
    }
 
    protected TextMessage createMessageForString(String text, Session session) throws JMSException {
        return session.createTextMessage(text);
    }
 
    protected BytesMessage createMessageForByteArray(byte[] bytes, Session session) throws JMSException {
        BytesMessage message = session.createBytesMessage();
        message.writeBytes(bytes);
        return message;
    }
 
    protected MapMessage createMessageForMap(Map<?, ?> map, Session session) throws JMSException {
        MapMessage message = session.createMapMessage();
        for (Map.Entry entry : map.entrySet()) {
            if (!(entry.getKey() instanceof String)) {
                throw new MessageConversionException("Cannot convert non-String key of type [" +
                        ObjectUtils.nullSafeClassName(entry.getKey()) + "] to JMS MapMessage entry");
            }
            message.setObject((String) entry.getKey(), entry.getValue());
        }
        return message;
    }
 
    protected ObjectMessage createMessageForSerializable(Serializable object, Session session) throws JMSException {
        return session.createObjectMessage(object);
    }
 
 
    protected String extractStringFromMessage(TextMessage message) throws JMSException {
        return message.getText();
    }
 
    protected byte[] extractByteArrayFromMessage(BytesMessage message) throws JMSException {
        byte[] bytes = new byte[(int) message.getBodyLength()];
        message.readBytes(bytes);
        return bytes;
    }
 
    protected Map extractMapFromMessage(MapMessage message) throws JMSException {
        Map<String, Object> map = new HashMap<String, Object>();
        Enumeration en = message.getMapNames();
        while (en.hasMoreElements()) {
            String key = (String) en.nextElement();
            map.put(key, message.getObject(key));
        }
        return map;
    }
 
    protected Serializable extractSerializableFromMessage(ObjectMessage message) throws JMSException {
        return message.getObject();
    }
 
}
           
JMS