天天看點

ActiveMQ高可用+負載均衡叢集之功能測試

1.基礎功能測試

ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實作。JMS的全稱是Java Message Service,即Java消息服務。它主要用于在生産者和消費者之間進行消息傳遞,生産者負責産生消息,而消費者負責接收消息。而消息的傳遞有兩種類型,主要如下:

  • 一種是點對點的,即一個生産者和一個消費者一一對應。
    ActiveMQ高可用+負載均衡叢集之功能測試
  • 另一種是釋出/訂閱模式,即一個生産者産生消息并進行發送後,可以由多個消費者進行接收。
    ActiveMQ高可用+負載均衡叢集之功能測試

ActiveMQ和JMS的消息類型對應如下

JMS消息模型 P2P消息模型 Pub/Sub消息模型
ActiveMQ Queue隊列 Topic隊列
特點 一對一,生産者生産了一個消息,隻能由一個消費者進行消費 一對多,生産者生産了一個消息,可以由多個消費者進行消費

接下來将對兩種類型的場景進行分别驗證。

1.1點對點模式(Queue)

點對點的模式主要建立在某個queue上,消息可以被同步或異步的發送和接收。點對點的消息模式可以有多個發送端,多個接收端,但是每個消息隻會給一個Consumer傳送一次。

1.1.1引入依賴

1.ActiveMQ依賴

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-all</artifactId>
   <version>5.15.12</version>
</dependency>           

2.Springboot-ActiveMQ連接配接池依賴

如果要啟用連接配接池,且使用springboot2.0+及以下版本的時候,maven配置依賴是:

<dependency> 
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId> 
</dependency>           

如果要啟用連接配接池,且使用springboot2.1+的時候,maven配置依賴是:

<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>           

1.1.2生産者釋出Queue消息

由于ActiveMQ的用戶端隻能通路Master的Broker,其他處于Slave的Broker不能被通路,是以用戶端連接配接Broker應該使用failover協定,具體代碼如下。

String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服務位址
String queueName = "queue-testqq";//要建立的消息名稱
//1.建立ConnectiongFactory,綁定位址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.建立Connection
Connection connection = factory.createConnection();
//3.啟動連接配接
connection.start();
//4.建立會話
//第一個參數:是否開啟事務
//第二個參數:消息是否自動确認
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立一個隊列
Destination destination = session.createQueue(queueName);
//6.建立一個生産者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
    //7.建立消息
    TextMessage textMessage = session.createTextMessage("我是Queue消息生産者:" + i);
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    //8.發送消息
    producer.send(textMessage);
    System.out.println("發送第一組消息:" + i);
}
connection.close();           

運作代碼後,可在ActiveMQ控制台檢視對應的Queue資訊,此時有消息待接收。

ActiveMQ高可用+負載均衡叢集之功能測試

1.1.3消費者接收Queue消息

Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通過使用MessageConsumer.setMessageListener() 注冊一個 MessageListener 實作異步接收。

這裡使用異步接收的方式消費消息,具體代碼如下:

String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服務位址
String queueName="queue-testqq";//要消費的消息名稱

//1.建立ConnectiongFactory,綁定位址
ConnectionFactory factory=new ActiveMQConnectionFactory(url);
//2.建立Connection
Connection connection= factory.createConnection();
//3.啟動連接配接
connection.start();
//4.建立會話
/** 第一個參數,是否使用事務
 如果設定true,操作消息隊列後,必須使用 session.commit();
 如果設定false,操作消息隊列後,不使用session.commit();
 */
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立一個目标
Destination destination=session.createQueue(queueName);
//6.建立一個消費者
MessageConsumer consumer=session.createConsumer(destination);
//7.建立一個監聽器
consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message arg0) {
        TextMessage textMessage=(TextMessage)arg0;
        try {
            System.out.println("接收消息:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});           

執行代碼後,可以在控制台如下輸出:

ActiveMQ高可用+負載均衡叢集之功能測試

在ActiveMQ控制台檢視對應的Queue資訊,此時消息已被消費。

ActiveMQ高可用+負載均衡叢集之功能測試

1.1.4多消費者模式

同時開啟2個以上的消費者,再次運作生産者,觀察每個消費者控制台的輸出。

ActiveMQ高可用+負載均衡叢集之功能測試
ActiveMQ高可用+負載均衡叢集之功能測試

觀察後,得出結論:一條消息隻會被一個消費者會接收消費,不可重複消費。同時還發現,多個消費者的情況下消息會被均分,即負載均衡政策。

生産者發送消息情況:

ActiveMQ高可用+負載均衡叢集之功能測試

消費者1接收消息情況:

ActiveMQ高可用+負載均衡叢集之功能測試

消費者2接收消息情況:

ActiveMQ高可用+負載均衡叢集之功能測試

1.2釋出/訂閱模式(Topic)

Pub/Sub(釋出/訂閱,Publish/Subscribe)消息域使用topic作為Destination,釋出者向topic發送消息,訂閱者注冊接收來自topic的消息。發送到topic的任何消息都将自動傳遞給所有訂閱者。接收方式(同步和異步)與P2P域相同。

1.2.1生産者釋出Topic消息

Pub/Sub模式與P2P模式在代碼實作層面基本一樣,變化的隻有Queue與Topic。生産者釋出Topic消息的具體代碼如下:

String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服務位址,端口預設61616
        String topicName="topic-testqq";//要建立的消息名稱

        //1.建立ConnectiongFactory,綁定位址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.建立Connection
        Connection connection= factory.createConnection();
        //3.啟動連接配接
        connection.start();
        //4.建立會話 (參數1:是否啟動事務,參數2:消息确認模式)
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.建立一個目标
        Destination destination=session.createTopic(topicName);
//        Topic topic = session.createTopic(topicName);
        //6.建立一個生産者
        MessageProducer producer=session.createProducer(destination);
        for (int i = 0; i < 15; i++) {
            //7.建立消息
            TextMessage textMessage=session.createTextMessage("我是topic類型消息生産者:"+i);
            //8.發送消息
            producer.send(textMessage);
            System.out.println("發送消息:"+i);
        }
        connection.close();           

此時,消息生産者先不啟動。Pub/Sub模式下必須先啟動sub,否則在啟動sub之前釋出的消息是不能消費的,就像你今天開始訂報紙,那今天之前的報紙你肯定是收不到了,釋出/訂閱模式與此同理。

1.2.2消費者接收Topic消息

與P2P模式相同,Pub/Sub模式的消息接收方式也有兩種:同步接收和異步接收。這裡采用異步接收的方式消費Topic消息,具體代碼如下:

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";//服務位址,端口預設61616
String topicName = "topic-testqq";//要建立的消息名稱

//1.建立ConnectiongFactory,綁定位址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.建立Connection
Connection connection = factory.createConnection();
//3.啟動連接配接
connection.start();
//4.建立會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立一個目标
Destination destination = session.createTopic(topicName);
//6.建立一個消費者
MessageConsumer consumer = session.createConsumer(destination);
//7.接收消息,可選擇同步接收或者異步接收
/*//消費者同步接收,receive(long timeout)主線程阻塞式等待下一個消息到來,可設定逾時時間,逾時則傳回null。
TextMessage message = (TextMessage) consumer.receive(1000);
System.out.println("同步接收Topic消息: " + message);*/
//消費者異步接收,建立一個監聽器
consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message arg0) {
        TextMessage textMessage = (TextMessage) arg0;
        try {
            System.out.println("異步接收Topic消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});           

運作代碼,可以看到控制台輸出已消費Topic消息。

ActiveMQ高可用+負載均衡叢集之功能測試
ActiveMQ高可用+負載均衡叢集之功能測試

ActiveMQ控制台也可以看到對應的Topic釋出、訂閱資訊。如下

ActiveMQ高可用+負載均衡叢集之功能測試

1.2.3多消費者模式

同時開啟2個以上的消費者,再次運作生産者,觀察每個消費者控制台的輸出.

ActiveMQ高可用+負載均衡叢集之功能測試

兩個消費者運作情況如下:

ActiveMQ高可用+負載均衡叢集之功能測試
ActiveMQ高可用+負載均衡叢集之功能測試

可以發現:

  • topic釋出/訂閱模式,一個消息可以被多個消費者消費
  • topic釋出/訂閱模式要求消費者必須即時消費,即生産者釋出消息時,消費者必須同時線上才可接收消費消息。

2.高可用測試

2.1測試方案一

2.1.1測試用例

1.生産者連接配接叢集發送50條消息并設定每發送一條消息,sleep1秒

2.觀察生産者發送消息所連接配接的節點,并将所在的節點停掉

3.觀察生産者發送消息日志,檢視所有消息是不是正常發送

2.1.2測試代碼

1.生産者釋出消息

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要建立的消息名稱
        
//1.建立ConnectiongFactory,綁定位址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.建立Connection
Connection connection = factory.createConnection();
//3.啟動連接配接
connection.start();
//4.建立會話
//第一個參數:是否開啟事務
//第二個參數:消息是否自動确認
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立一個目标
Destination destination = session.createQueue(queueName);
//        Queue queue = session.createQueue("test-Queue");
//6.建立一個生産者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 50; i++) {
//7.建立消息
TextMessage textMessage = session.createTextMessage("我是第一組消息生産者:" + i);
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
}
//8.發送消息
producer.send(textMessage);
System.out.println("發送第一組消息:" + i);
  }
connection.close();           

2.1.3測試過程

1.運作生産者釋出消息,觀察生産者控制台

ActiveMQ高可用+負載均衡叢集之功能測試

2.停止mq1(61616)節點

ActiveMQ高可用+負載均衡叢集之功能測試

3.繼續觀察生産者控制台

(1)此時61616節點已無法連接配接

ActiveMQ高可用+負載均衡叢集之功能測試

(2)生産者已成功連接配接61619節點,并繼續發送消息

ActiveMQ高可用+負載均衡叢集之功能測試

2.2測試方案二

2.2.1測試用例

1.生産者連接配接叢集發送20條消息

3.消費者連接配接叢集消費消息,觀察消息消費情況

2.2.2測試代碼

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要建立的消息名稱
        
//1.建立ConnectiongFactory,綁定位址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.建立Connection
Connection connection = factory.createConnection();
//3.啟動連接配接
connection.start();
//4.建立會話
//第一個參數:是否開啟事務
//第二個參數:消息是否自動确認
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立一個目标
Destination destination = session.createQueue(queueName);
//        Queue queue = session.createQueue("test-Queue");
//6.建立一個生産者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
//7.建立消息
TextMessage textMessage = session.createTextMessage("我是第一組消息生産者:" + i);
//8.發送消息
producer.send(textMessage);
System.out.println("發送第一組消息:" + i);
  }
connection.close();           

2.消費者接收消息

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要建立的消息名稱

    //1.建立ConnectiongFactory,綁定位址
    ConnectionFactory factory = new ActiveMQConnectionFactory(url);
    //2.建立Connection
    Connection connection = factory.createConnection();
    //3.啟動連接配接
    connection.start();
    //4.建立會話
    /** 第一個參數,是否使用事務
     如果設定true,操作消息隊列後,必須使用 session.commit();
     如果設定false,操作消息隊列後,不使用session.commit();
     */
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.建立一個目标
    Destination destination = session.createQueue(queueName);
    //6.建立一個消費者
    MessageConsumer consumer = session.createConsumer(destination);
    //7.建立一個監聽器
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message arg0) {
            TextMessage textMessage = (TextMessage) arg0;
            try {
                System.out.println("接收消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });           

2.2.3測試過程

ActiveMQ高可用+負載均衡叢集之功能測試

2.停止mq4(61619)節點

ActiveMQ高可用+負載均衡叢集之功能測試

3.啟動消費者,觀察消費情況

ActiveMQ高可用+負載均衡叢集之功能測試

觀察發現,消費者連接配接61618節點,并成功接收消費消息。

2.3測試結論

經測試,目前叢集在啟動消息生産者發送消息時,使生産者所在節點當機的情況下,得出如下結論:

1.高可用架構的ActiveMQ叢集,在生産消息的過程中生産者所在節點挂掉,用戶端會暫時阻塞無法發送消息,但整體可用性不受影響。

2.高可用架構的ActiveMQ叢集,在消息生産者所在節點挂掉後,消費者仍可正常消費消息

3.目前ActiveMQ叢集若其中一個節點挂掉,ActiveMQ正常提供服務,不影響服務可用性

3.負載均衡測試

最終的架構就是兩個master-slave叢集互相連通,兩個叢集可以互相消費對方的消息,但是如果用戶端所連接配接的叢集挂掉用戶端依然是不能發送消息的,也就是說activemq的負載均衡隻是做到消費的負載均衡,高可用是靠master-slave來保證的。

3.1測試用例

1.啟動兩個消費者監聽相同的queue,且服務位址均配置叢集所有節點

2.生産者連接配接叢集向指定的queue連續發送20條消息

3.觀察兩個生産者消費消息的日志

3.2測試代碼

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要建立的消息名

//1.建立ConnectiongFactory,綁定位址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.建立Connection
Connection connection = factory.createConnection();
//3.啟動連接配接
connection.start();
//4.建立會話
//第一個參數:是否開啟事務
//第二個參數:消息是否自動确認
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立一個目标
Destination destination = session.createQueue(queueName);
//Queue queue = session.createQueue("test-Queue");
//6.建立一個生産者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
//7.建立消息
TextMessage textMessage = session.createTextMessage("我是第一組消息生産者:" + i);
//8.發送消息
producer.send(textMessage);
System.out.println("發送第一組消息:" + i);
}
connection.close();           

2.消費者接收消費消息

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618," +
"tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要建立的消息名稱
//1.建立ConnectiongFactory,綁定位址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.建立Connection
Connection connection = factory.createConnection();
//3.啟動連接配接
connection.start();
//4.建立會話
/** 第一個參數,是否使用事務
如果設定true,操作消息隊列後,必須使用 session.commit();
如果設定false,操作消息隊列後,不使用session.commit();
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立一個目标
Destination destination = session.createQueue(queueName);
//6.建立一個消費者
MessageConsumer consumer = session.createConsumer(destination);
//7.建立一個監聽器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
TextMessage textMessage = (TextMessage) arg0;
try {
/* if (textMessage.getText().contains("10")) {
System.out.println("======消息異常=======");
throw new Exception();
}*/
System.out.println("接收消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});           

3.3測試過程

  1. 同時開啟2個消費者,再運作生産者,觀察每個消費者控制台的輸出。
    ActiveMQ高可用+負載均衡叢集之功能測試
ActiveMQ高可用+負載均衡叢集之功能測試
  1. 再運作生産者,觀察每個消費者控制台的輸出。

    生産者釋出消息情況:

ActiveMQ高可用+負載均衡叢集之功能測試
ActiveMQ高可用+負載均衡叢集之功能測試
ActiveMQ高可用+負載均衡叢集之功能測試

觀察發現,多個消費者的情況下消息會被均分,即負載均衡政策。且同一條消息隻會被一個消費者會接收消費。

3.4測試結論

經測試,目前叢集在多個消費者消費相同隊列的情況下,可以實作消息消費的負載均衡,進而實作ActiveMQ叢集的分流,提高叢集吞吐率。

到這裡,ActiveMQ叢集的功能測試、高可用測試及負載均衡測試已完成,目前ActiveMQ叢集高可用+負載均衡功能正常。