天天看點

學習筆記:ActiveMQ + SpringBoot & 事務問題 & 序列化問題一、docker 安裝 ActiveMQ二、ActiveMQ介紹三、ActiveMQ + SpringBoot的使用四、解決的問題

一、docker 安裝 ActiveMQ

1.在docker環境中執行:

// 搜尋activemq鏡像
docker search activemq
// 拉取activemq鏡像
docker pull webcenter/activemq
// 檢視拉取後的activemq鏡像
docker images
// 建立資料檔案夾和日志檔案夾
mkdir -p ./activemq/soft/activemq
mkdir -p ./activemq/soft/activemq/log
// docker執行指令,名稱,背景啟動,綁定端口,開機啟動,資料卷綁定
docker run --name=activemq -itd -p 8161:8161 -p 61616:61616 --restart=always -v /home/docker/activemq/soft/activemq:/data/activemq -v /home/docker/activemq/soft/activemq/log:/var/log/activemq  webcenter/activemq:latest
// 預設登陸使用者名密碼
使用者名密碼admin/admin
           

2.通路頁面

通路路徑:http://ip:8161。

二、ActiveMQ介紹

1.ActiveMQ基于JMS協定,組成部分:

JMS Provider:生産者,支援事務來保證發送可靠性。

JMS Message:JMS 的消息,主要類型有Text,Object,Map,Bytes,Stream五種類型。由消息頭,消息屬性,消息體組成。

JMS Consumer:消費者,支援事務和确認機制來保證消費可靠性。同步:使用recive()方法阻塞接受消息(用戶端拉)。異步:使用監聽方式接受消息(伺服器推)。

JMS Domains:消息傳遞域,支援P2P(點對點傳輸),隻存在一個隊列,生産者發送到隊列,消費者從隊列中消費;支援pub/sub訂閱消費模式,生産者發送到Topic,生産者訂閱topic進行消費(消費者在訂閱之前是收不到消息的。在訂閱之後線上的狀态可以收到消息,如果想離線後依然能接收到消息,需要設定成持久訂閱)。

Connection Factory:連接配接工廠,建立連接配接,通過連接配接可以建立session對話,再建立生産者和消費者。(springboot中整合為jmstemplate模闆,可以直接生産和消費消息);

JMS Connection:封裝了客戶與 JMS 提供者之間的一個虛拟的連接配接。

JMS Session:是生産者和消費者的一個單線程上下文。會話用于建立消息生産者(Producer)、消息消費者(Consumer),和消息(Message)等。會話提供了一個事務性的上下文,一組發送和接收被組合到了一個原子操作中。

2.消息結構

1.消息頭:

屬性 含義
Destination 目的地,主要是queue和topic
DeliveryMode 傳遞模式,在send或者jmstemplate中設定。分為持久模式和非持久模式
Expiration 消息過期/到期時間,在send或者jmstemplate中設定
Priority 消息優先級,有 0-9 十個級别,0-4是普通消息,5-9是加急消息。JMS 不要求 JMS Provider 嚴格按着十個優先級發送消息,但必須保證加急消息要先于普通消息到達。預設是第4級
MessageID 由生産者自動配置設定,唯一的ID,以ID:開頭
Timestamp 生産者發送消息到消息call或者return傳回的時間差
JMSType JMS 消息類型的識别符
CorrelationID JMS 相關性 id,由用戶端設定。用來連接配接到另外一個消息,典型的應用是在回複消息中連接配接到原消息
ReplyTo 回複,由用戶端設定。提供本資訊回複消息的目的位址
Redelivered 重發,由JMS Provider(供應商)設定

2.消息體:JMS API 定義了 5 種消息體格式,也叫消息類型,可以使用不同的形式發送接收資料,并可以相容現有的消息格式。包括:TextMessage、MapMessage、BytesMessage、StreamMessage 和 ObjectMessage。它們都是 Message 接口的子類。

3.消息屬性:自定義屬性,JMS定義的屬性,供應商特點的屬性。

三、ActiveMQ + SpringBoot的使用

ActiveMQ配置:

spring:
  activemq:
    broker-url: tcp://192.168.99.100:61616
    user: admin
    password: admin
    in-memory: false # 基于外部mq模式
    pool:
      enable: true #開啟連結池
      max-connections: 10 #最大連結數
           
package com.zwfw.framework.activemq.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

import javax.jms.DeliveryMode;
import javax.jms.Session;


@Configuration
public class ActivemqConfig {
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String username;

    @Value("${spring.activemq.password}")
    private String password;

    /**
     * 消息重發政策配置
     */
    @Bean
    public RedeliveryPolicy redeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        //是否在每次嘗試重新發送失敗後,增長這個等待時間
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重發次數,預設為6次-設定為3次
        redeliveryPolicy.setMaximumRedeliveries(3);
        //重發時間間隔機關毫秒,預設為1秒
        redeliveryPolicy.setInitialRedeliveryDelay(1000L);
        //第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒
        redeliveryPolicy.setBackOffMultiplier(2);
        // 是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(false);
        // 設定重發最大拖延時間-1表示無延遲限制
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        return redeliveryPolicy;
    }

    /**
     * 消息工廠配置
     */
    @Bean
    public ActiveMQConnectionFactory activeMqConnectionFactory() {
        ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);
        activeMqConnectionFactory.setRedeliveryPolicy(redeliveryPolicy());
        return activeMqConnectionFactory;
    }

    @Bean(name = "jmsTemplate")
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        // 設定連接配接工廠
        jmsTemplate.setConnectionFactory(activeMqConnectionFactory());
        //deliveryMode, priority, timeToLive 的開關,要生效,必須配置為true,預設false
        jmsTemplate.setExplicitQosEnabled(true);
        //定義持久化後節點挂掉以後,重新開機可以繼續消費 1表示非持久化,2表示持久化
        jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
        /**
         * 如果不啟用事務,則會導緻XA事務失效;
         * 作為生産者如果需要支援事務,則需要配置SessionTransacted為true
         */
        jmsTemplate.setSessionTransacted(false);
        //消息的應答方式,需要手動确認,此時SessionTransacted必須被設定為false,且為Session.CLIENT_ACKNOWLEDGE模式
        /**
         * 當關閉事務時候,下面設定才有效
         * Session.AUTO_ACKNOWLEDGE  消息自動簽收
         * Session.CLIENT_ACKNOWLEDGE  用戶端調用acknowledge方法手動簽收
         * Session.DUPS_OK_ACKNOWLEDGE 不必必須簽收,消息可能會重複發送
         */
        jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        return jmsTemplate;
    }

    /**
     * topic模式的ListenerContainer
     * topic下沒有消息回執一說,确認消息之存在queue模式
     * 浏覽隻是針對 Queue 的概念,Topic 沒有浏覽。浏覽是指擷取消息而消息依然保持在 broker 中,而消息的接收會把消息從 broker 中移除。
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(activeMqConnectionFactory());
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    /**
     * queue模式的ListenerContainer
     * 監聽容器配置,使用jackson的消息轉換器
     * 1 不開啟事務,手動确認,自動确認
     * 2 開啟事務,是自動應答,當用戶端消費有異常抛出,會進行重試模式,按照上面重試配置次數重試後,如果還是失敗,則會進入死信隊列
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        // 關閉事務
        factory.setSessionTransacted(false);
        // 設定手動确認,預設配置中Session是開啟了事務的,事務優先級大于用戶端确認,即使我們設定了手動Ack也是無效的
        factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        factory.setConnectionFactory(activeMqConnectionFactory());
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    /**
     * queue模式的ListenerContainer
     * 監聽容器配置,使用自帶的消息轉換器
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueueNoConver() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        // 關閉事務
        factory.setSessionTransacted(false);
        // 設定手動确認,預設配置中Session是開啟了事物的,即使我們設定了手動Ack也是無效的
        factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        factory.setConnectionFactory(activeMqConnectionFactory());
        return factory;
    }

    /**
     * 自定義消息轉換器
     * @return
     */
    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        // MappingJackson2MessageConverter隻支援TEXT和byte類型的轉換,
        // 見org.springframework.jms.support.converter.MappingJackson2MessageConverter.toMessage
        converter.setTargetType(MessageType.TEXT);
        // 可以為任何字元,但必需要配置,在下文中的setTypeIdOnMessage方法中會用上
        converter.setTypeIdPropertyName("_type");
        return converter;
    }
}
           

queue和topic配置

package com.zwfw.framework.activemq.queue;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;
import javax.jms.Topic;

@Configuration
public class QueueConfig {

    /**
     * 聲明普通隊列
     */
    @Bean
    public Queue conmonQueue(){
        return new ActiveMQQueue("common.queue");
    }

    /**
     * 聲明延時隊列
     */
    @Bean
    public Queue delayQueue(){
        return new ActiveMQQueue("delay.queue");
    }

    /**
     * 聲明廣播類型隊列
     */
    @Bean
    public Topic topicQueue(){
        return new ActiveMQTopic("topic.queue");
    }
}

           
上面是mq的基本配置,配置了幾個監聽容器:queue模式下的帶事務和不帶事務手動确認的容器和topic監聽的容器,這些容器在後續監聽類中配置用得上。

P2P模式(圖是借鑒來的):

學習筆記:ActiveMQ + SpringBoot &amp; 事務問題 &amp; 序列化問題一、docker 安裝 ActiveMQ二、ActiveMQ介紹三、ActiveMQ + SpringBoot的使用四、解決的問題
前言:該模式,點對點傳輸。生産者傳輸消息到隊列,消費者從隊列中消費。生産者可配置事務,如果開啟事務發送,隻有在commit之後,隊列中才會有消息入列,如果rollback則不會進入隊列;用戶端一般有兩種方式,一種是事務,第二種是确認機制:開啟事務,不需要設定确認機制(事務優先級大于确認機制,設定了手動确認等也無效),預設就是自動确認,當事務方法中抛出異常,則消息不會被消費,會進入重試,重試次數到了之後,會進入activemq的DLQ隊列(死信)。

生産者代碼和說明:

@RestController
public class SendController {
    
    @Autowired
    private Queue conmonQueue;

    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     * 單條資料發送,事務模式
     */
    @RequestMapping("/commonQueue")
    public void commonQueue() throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            Book book = new Book();
            book.setName("三體" + i).setAuthor("劉慈欣" + i).setType("科幻" + i);
            // 設定事務模式發送
            jmsTemplate.setSessionTransacted(true);
            jmsTemplate.convertAndSend(conmonQueue,book);
            if (i == 12) {
                throw new RuntimeException();
            }
        }
    }

    /**
     * 批量發送,事務模式
     */
    @RequestMapping("/commonQueue/{num}")
    public void commonQueueTranscate(@PathVariable("num") Integer num) throws Exception {
        MessageProducer pd = null;
        Session session = null;
        Connection connection = null;
        try {
            ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
            connection = connectionFactory.createConnection();
            connection.start();
            // 開啟事務,隻能設定AUTO_ACKNOWLEDGE,其他模式無效且不受控制
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            pd = session.createProducer(conmonQueue);

            for (int i = 0; i < 20; i++) {
                Book book = new Book();
                book.setName("三體" + i).setAuthor("劉慈欣" + i).setType("科幻" + i);
                // 用内置的消息轉換器
                TextMessage message = session.createTextMessage(JSON.toJSONString(book));
                // 用jackson的消息轉化器
//                Message message = jmsTemplate.getMessageConverter().toMessage(book, session);
                pd.send(message);
                System.out.println("send book" + i + " to queue");
            }
            // 測試判斷,偶數送出,奇數復原
            if (num % 2 == 0) {
                session.commit();
            }else {
                session.rollback();
            }
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }finally {
            pd.close();
            session.close();
            connection.close();
        }
    }

}
           

事務:使用springboot整合後,基本上是使用jmstemplate模版進行消息的發送。在事務環境下,好像隻能發送單條并且發送成功後确認單條資料。這樣資料量大的話感覺會影響效率。如果在批量發送環境下,用jmstemplate發送我還沒有找到合适的方法 ………是以用session會話的模式,設定事務進行批量發送。

序列化:在配置檔案中,定義了jackson的序列化方式,如果不定義,就是使用預設的org.springframework.jms.support.converter.SimpleMessageConverter.toMessage序列化。在裡面根據你發送消息的類型來序列化。

學習筆記:ActiveMQ + SpringBoot &amp; 事務問題 &amp; 序列化問題一、docker 安裝 ActiveMQ二、ActiveMQ介紹三、ActiveMQ + SpringBoot的使用四、解決的問題

如果使用的是自定義的jackson序列化,發送的jmstemplate模闆也需要注入jackson的序列化配置,在監聽容器配置中,也需要注入jackson的序列化配置。否則,如果發送的jmstemplate沒有注入,或者用的會話模式session.createTextMessage來發送的消息(自帶的序列化),在監聽收到消息後會序列化失敗,原因是,使用了jackson配置發送的消息,在内部會調用setTypeIdOnMessage,裡面會塞入設定過的typeIdPropertyName。當消費者反序列化的時候,則會調用getJavaTypeForMessage方法,裡面會判斷有沒有這個屬性,如果沒有則抛出異常。

總而言之,發送端和接收端的序列化配置必需同步。

學習筆記:ActiveMQ + SpringBoot &amp; 事務問題 &amp; 序列化問題一、docker 安裝 ActiveMQ二、ActiveMQ介紹三、ActiveMQ + SpringBoot的使用四、解決的問題
學習筆記:ActiveMQ + SpringBoot &amp; 事務問題 &amp; 序列化問題一、docker 安裝 ActiveMQ二、ActiveMQ介紹三、ActiveMQ + SpringBoot的使用四、解決的問題
當用戶端監聽配置的反序列化是jackson後,jmstemplate也要注入jackson配置。如果想要批量發送消息,可以使用下面的模版來構造一個消息對象,通過配置jackson後的jmstemplate方法是調用了setTypeIdOnMessage方法,在反序列化的時候不會出現上面異常問題:

消費者代碼和說明:

@Component
public class ActiveListener {

    /**
     * 将開啟事務時候,方法内有異常則不會确認消費
     * 發生異常會進入重試模式,伺服器按重試配置數推送,預設6次
     * 重試還是失敗,消息會進入死信隊列
     */
    @JmsListener(destination = "common.queue", containerFactory = "jmsListenerContainerQueue")
    public void commonQueueListen(Book book, ActiveMQMessage message) throws Exception {
        System.out.println(book);
        // 手動确認消息,當開啟事務時,此設定無效
        message.acknowledge();
    }

    /**
     * 使用内置序列化的配置的監聽容器
     */
    @JmsListener(destination = "common.queue", containerFactory = "jmsListenerContainerQueueNoConver")
    public void commonQueueListen1(String book, ActiveMQMessage message) throws Exception {
        System.out.println(book);
        // 手動确認消息,當開啟事務時,此設定無效
        message.acknowledge();
    }
}
           

當監聽容器開啟事務後, message.acknowledge()方法并沒有作用,事務對應的是自動确認,不受控制。當監聽容器關閉事務,應使用确認機制,一般手動确認,如果沒有确認,則消息不會被消費。

分組和并發消費

并發消費:如果想在發送消息并且由多個消費者一起并發消費,可以通過設定配置檔案中的concurrency屬性,或者在@JmsListener中給注解屬性concurrency設定數量,如下圖。

分組消費:用于隊列模式。分組消費需要在生産者發送的消息中設定消息頭,使用setStringProperty來設定消息的頭屬性。在消費者端,在@JmsListener注解上添加消息頭過濾 selector =“JMSXGroupID=‘groupB’”,就可以完成分組消費。

學習筆記:ActiveMQ + SpringBoot &amp; 事務問題 &amp; 序列化問題一、docker 安裝 ActiveMQ二、ActiveMQ介紹三、ActiveMQ + SpringBoot的使用四、解決的問題

生産者:

@RequestMapping("/groupQueue")
    public void groupQueue() throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            Book book = new Book();
            book.setName("三體" + i).setAuthor("劉慈欣" + i).setType("科幻" + i);
            jmsTemplate.setSessionTransacted(true);
            // 可以用session會話模式生成的message來設定消息頭,我這裡用模版發送,在生成message同時塞入屬性
            jmsTemplate.send(conmonQueue, session -> {
                Message message = jmsTemplate.getMessageConverter().toMessage(book, session);
                message.setStringProperty("JMSXGroupID","groupA");
                return message;
            });
        }
    }
           

消費者:

/**
     * 分組
     * selector,過濾頭屬性
     * concurrency,并發數量,可以生成多個消費者
     */
    @JmsListener(destination = "common.queue", containerFactory = "jmsListenerContainerQueue", selector ="JMSXGroupID='groupA'")
    public void commonQueueListenGroupA(Book book, ActiveMQMessage message) throws Exception {
        System.out.println("groupA: " + book);
        // 手動确認消息,當開啟事務時,此設定無效
        message.acknowledge();
    }
    /**
     * 分組
     * selector,過濾頭屬性
     * concurrency,并發數量,可以生成多個消費者
     */
    @JmsListener(concurrency = "10", destination = "common.queue", containerFactory = "jmsListenerContainerQueue", selector ="JMSXGroupID='groupB'")
    public void commonQueueListenGroupB(Book book, ActiveMQMessage message) throws Exception {
        System.out.println("groupB: " + book);
        // 手動确認消息,當開啟事務時,此設定無效
        message.acknowledge();
    }
           

pub/sub模式(圖是借鑒來的):

學習筆記:ActiveMQ + SpringBoot &amp; 事務問題 &amp; 序列化問題一、docker 安裝 ActiveMQ二、ActiveMQ介紹三、ActiveMQ + SpringBoot的使用四、解決的問題

topic模式待完善。。。。

四、解決的問題

此文主要是提供ActiveMQ基本的配置,生産者事務,消費者事務和确認機制,生産者和消費者的序列化問題。更詳細的介紹網上有很多資料可以查閱。主要是自己學習的時候,網上的代碼都沒有很好的解決問題,是以在這裡記錄一下。
  1. 序列化和反序列化必需同步:如果使用原生的session來發送消息是沒有配置序列化的(應該也是可以設定自定義的序列化配置的),監聽類也需要用自帶的預設的序列化方式來接受對象。如果使用了jackson序列化方式,在配置類中,jmstemplate和監聽容器都要注入此配置,并且發送的時候需要用帶有jackson配置的jmstemplate來發送消息,否則消息會序列化失敗。
  2. 生産者的事務:如果用jmstemplate來發送消息,跟讀源碼會發現,如果設定setSessionTransacted為true,則會代理生成一個事務,并且發送了會commit。在網上用模版批量發送事務消息的例子。于是就用的session會話來控制批量消息事務的控制。如果要用自定義序列化,則可以用“Message message = jmsTemplate.getMessageConverter().toMessage(book, session);”來構造一個含有jackson序列化配置的消息。
  3. 消費者的事務和确認機制:當事務設定為true的時候,和生産者一樣,在配置确認機制就不生效了。因為預設就是自動确認。當發生異常事務復原,會進行重試階段,重試後如果失敗,則會進入死信隊列。當事務設定為false的時候,一般設定的是手動确認,隻有确認後的資料才會出隊,否則資料會一直存在隊列裡面(排除定時的消息)。
  4. 批量事務:用模版方法發送貌似隻能一條條的來,批量的話得用session來發送,并commit;
  5. topic的代碼還沒有實作,還在學習中,後續補上。。。

繼續閱讀