天天看點

架構設計:系統間通信(22)——提高ActiveMQ工作性能(上)3、ActiveMQ性能優化思路4、ActiveMQ中的網絡IO5、JMS規範中的幾個基本概念

接上文《架構設計:系統間通信(21)——ActiveMQ的安裝與使用》

3、ActiveMQ性能優化思路

上篇文章中的兩節内容,主要介紹消息中間件ActiveMQ的安裝和基本使用。從上篇文章給出的安裝配置和示例代碼來看,我們既沒有修改ActivieMQ服務節點的任何配置,也沒有采用任何的叢集方案。這種情況隻适合各位讀者熟悉ActiveMQ的工作原理和基本操作,但是如果要将ActivieMQ應用在生産環境下,上文中介紹的運作方式遠遠沒有挖掘出它的潛在性能。

架構設計:系統間通信(22)——提高ActiveMQ工作性能(上)3、ActiveMQ性能優化思路4、ActiveMQ中的網絡IO5、JMS規範中的幾個基本概念

根據這個系列文章所陳述的中心思想,系統的性能層次包括:代碼級性能、規則性能、存儲性能、網絡性能,以及多節點協同方法(叢集方案),是以我們優化ActiveMQ的中心思路也是這樣的:首先優化ActiveMQ單個節點的性能,然後在配置ActiveMQ的叢集。下面我們就按照這個思路,一步步介紹和ActiveMQ性能有關的那些事。

在預設情況下ActiveMQ的網絡資訊傳遞方式基于網絡IO模型中的BIO方式。那麼為了提高ActiveMQ單節點的工作性能,我們首先應該為每一個獨立的MQ服務節點配置更高效的網絡IO模型(關于網絡IO模型的詳細講解,請參考本系列的另外幾篇文章:《架構設計:系統間通信(3)——IO通信模型和JAVA實踐 上篇》、

《架構設計:系統間通信(4)——IO通信模型和JAVA實踐 中篇》、《架構設計:系統間通信(5)——IO通信模型和JAVA實踐 下篇》,這個思想也是整個系列文章的核心思想之一)。

另外我們還需要為ActiveMQ考慮一種存儲方案,讓它能高效的完成“持久化”消息的存儲操作(也包括對“非持久化”消息的臨時存儲)**。雖然關于存儲部分的知識和示例 在我另外一個系列的專題文章——系統存儲(很快就會出品)才會講到,但為了講清楚消息中間件軟體的性能優化,是以還是會講到存儲方案的選擇。

另外,我們還知道了,使用叢集方案能夠增加整個軟體的性能和穩定性,是以在完成單節點優化以後,我們還需要提供某種叢集方案将多個ActiveMQ組合起來,讓它們協同工作(一定是協同工作,單純的安裝多個ActiveMQ節點而不進行協同,是沒法提高性能和穩定性的)。

Apache ActiveMQ是開源軟體,您可以在ActiveMQ的官網下載下傳到它的源代碼,并進行代碼級别性能的改造,但很明顯這并不是一個能獲得高效費比的方式。是以筆者不建議您和您的團隊更改ActiveMQ代碼實作(學習一下别人的代碼實作還是可以的)。

4、ActiveMQ中的網絡IO

4-1、基本連接配接配置

在上文中,我們提到了ActiveMQ支援多種消息協定,包括AMQP協定、MQTT協定、Openwire協定、Stomp協定等。在ActiveMQ的官方網站上,列出了目前ActiveMQ中支援的所有消息協定,它們是:AMQP、MQTT、OpenWire、REST、Stomp、XMPP;

不同的協定需要設定不同的網絡監聽端口,這個相關設定在ActiveMQ安裝目錄的./conf/conf/activemq.xml主配置檔案中。主配置檔案采用XML格式進行描述,其中的“transportConnectors”标記描述了各種協定的網絡監聽端口,示例如下:

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
           

以上配置了openwire協定的接入端口号為本機所有IP裝置的61616(0.0.0.0代表本機所有IP裝置);配置amqp協定的接入端口号為本機所有IP裝置的5672;配置stomp協定的接入端口号為本機所有IP裝置的61613,等等。這裡注意以下幾個事實:

  • 每一個“transportConnector”标記的name屬性和uri屬性都必須填寫,name屬性的值可以随便填寫,它将作為一個Connector元素,顯示在ActiveMQ管理界面的Connections欄目中;
  • 每一個“transportConnector”标記的uri元素,都有固定寫法:uri頭是指定的協定名稱,例如amqp、mqtt、stomp等。然後是HOST/IP域名,指定端口監聽所在的路由資訊;請不要使用localhost或者127.0.0.1這樣的回環位址,否則無法通過網絡連接配接到ActiveMQ;接下來是端口資訊,指定的端口不能重複,否則會産生沖突。
  • URI參數部分,每一種協定都有一些特定的參數,讀者可參考ActiveMQ官網中,關于“協定”部分的介紹:http://activemq.apache.org/protocols.html。但是有些參數卻是各種協定都可共用的,例如以上執行個體中使用的“maximumConnections”屬性,代表這個端口支援的最大連接配接數量;”wireFormat.maxFrameSize”屬相代表支援協定的“一個完整消息”的最大資料量(機關為byte);您可以在ActiveMQ官網中對wire formats的參數描述中,找到這些預設屬性:http://activemq.apache.org/configuring-wire-formats.html。
Any transport which involves marshalling messages onto some kind of network transport like TCP or UDP will typically use the OpenWire format. This is configurable to customize how things appear on the wire.
  • URI參數部分,各協定可共用的參數還包括“基本連接配接特性”相關的參數,這些參數說明可參見官網:http://activemq.apache.org/connection-configuration-uri.html 中的詳細說明。另外如果您使用的是TCP協定,您還可以在URI參數部分加入TCP相關的屬性描述,參見官網 http://activemq.apache.org/tcp-transport-reference.html 中的詳細說明;如果您使用的是UDP協定(當然,不推薦這樣使用),那麼您還可以在URI參數部分加入UDP相關的屬性描述,參見官網 http://activemq.apache.org/udp-transport-reference.html 中的詳細說明。
  • “transportConnector”标記中,除了必須填寫的“name”屬性和“uri”屬性以外還有一些可選擇的屬性,例如:enableStatusMonitor、updateClusterClients。詳細的屬性介紹可參考官方文檔 http://activemq.apache.org/configuring-transports.html 中 “Server side options”部分章節的介紹。在後續的文章中,我們将陸續使用到其中的一些設定項

4-2、特别說明

  • 在上文給出的配置資訊中,您可以發現我們在描述各種消息協定時,URI描述資訊的頭部都的是采用協定名稱:例如,描述amqp協定的監聽端口時,采用的URI描述格式為“amqp://……”;描述Stomp協定的監聽端口時,采用的URI描述格式為“stomp://……”。唯獨在進行openwire協定描述時,URI頭卻采用的“tcp://…..”。這是因為ActiveMQ中預設的消息協定就是openwire:

OpenWire is binary protocol designed for working with Message Oriented Middleware. It is the native wire format of ActiveMQ.

OpenWire is our cross language Wire Protocol to allow native access to ActiveMQ from a number of different languages and platforms. The Java OpenWire transport is the default transport in ActiveMQ 4.x or later.

  • 上文中我們還講到,ActiveMQ完整支援AMQP協定。但是讀者會發現ActiveMQ中并沒有存在Exchange這樣的結構。這是怎麼回事呢?實際上在國際标準組織 (ISO) 和國際電工委員會 (IEC) 制定的AMQP Version 1.0 規範文檔中(《OASIS Advanced Message Queueing Protocol

    (AMQP) Version 1.0》),并沒有說AMQP 消息必須經過Exchange規則才能夠到達隊列,也沒有規定Exchange 必須要實作某種規則的路由。是以在支援AMQP協定時,是否需要有Exchange這樣的路由處理規則,完全取決于AMQP的消息中間件軟體廠商自己的決定。下面一段代碼是使用JMS API連接配接ActiveMQ的AMQP端口,發送AMQP消息的示例:

package mq.test.amqp;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;

public class JMSProducer {
    public static void main(String[] args) throws Throwable {
        // 注意,JMS-AMQP使用的是Apache QPID的實作。如果您需要運作這段代碼,請導入QPID的用戶端
        /*
         * <dependency>
         *  <groupId>org.apache.qpid</groupId>
         *  <artifactId>qpid-amqp-1-0-client-jms</artifactId>
         *  <version>0.32</version>
         * </dependency>
         * */
        ConnectionFactoryImpl factory = ConnectionFactoryImpl.createFromURL("amqp://192.168.61.138:5672");
        Connection connection = factory.createQueueConnection();
        connection.start();

        // 建立會話,連接配接到叫做/test的Queue上
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = session.createQueue("/test");
        MessageProducer messageProducer = session.createProducer(queue);

        // 開始發送消息
        TextMessage outMessage = session.createTextMessage();
        outMessage.setText("23456656457567456");
        messageProducer.send(outMessage);

        //關閉
        messageProducer.close();
        connection.close();
    }
}
           

4-3、網絡配置優化

那麼各位讀者您是否覺得上一小節那樣的連接配接端口配置太過冗長,不好進行管理?确實是這樣,并且實際工作中我們也隻會使用幾種固定的協定。是以ActiveMQ在Version 5.13.0+ 版本後,将OpenWire, STOMP, AMQP, MQTT這四種主要協定的端口監聽進行了合并,并使用auto關鍵字進行表示。也就是說,ActiveMQ将監聽這一個端口的消息狀态,并自動比對合适的協定格式。配置如下:

<transportConnectors>
    <transportConnector name="auto" uri="auto://0.0.0.0:61617?maximumConnections=1000" />
</transportConnectors>
           

以上的URI配置資訊中,可以使用所有通用的Connection Configuration、Wire Formats Configuring、Server side options和TCP Transport Configuration配置項。但是這種優化隻是讓ActiveMQ的連接配接管理變得簡潔了,并沒有提升單個節點的處理性能。

如果您不特别指定ActiveMQ的網絡監聽端口,那麼這些端口都将使用BIO網絡IO模型。是以為了首先提高單節點的網絡吞吐性能,我們需要明确指定Active的網絡IO模型,如下所示:

<transportConnectors>  
    <transportConnector name="nio" uri="nio://0.0.0.0:61618?maximumConnections=1000"/>  
</transportConnectors> 
           

請注意,URI格式頭以”nio”開頭,表示這個端口使用以TCP協定為基礎的NIO網絡IO模型。但是這樣的設定方式,隻能使這個端口支援Openwire協定。那麼我們怎麼既讓這個端口支援NIO網絡IO模型,又讓它支援多個協定呢?ActiveMQ的服務端設定,允許開發人員使用“+”符号來為端口設定多種特性,如下:

<transportConnector name="stomp+nio" uri="stomp+nio://0.0.0.0:61613?transport.transformer=jms"/>
// 表示這個端口使用NIO模型支援Stomp協定

<transportConnector name="amqp+ssl" uri="amqp+ssl://localhost:5671"/>
// 表示這個端口支援amqp和ssl密文傳輸
           

是以如果我們既需要某一個端口支援NIO網絡IO模型,又需要它支援多個協定,那麼可以進行如下的配置:

另外,如果是為了生産環境進行的配置,那麼您至少應該還要配置這個端口支援的最大連接配接數量、設定每一條消息的最大傳輸值、設定NIO使用的線程池最大工作線程數量(當然您已經知道了這些設定的文檔所在位置,是以您可以根據自己的情況進行設定屬性的增減):

以下附圖是改變網絡連接配接設定後,ActiveMQ管理控制台中Connections頁面顯示的内容。注意ws協定的端口是額外保留的配置——因為auto模式中的協定不支援ws:

架構設計:系統間通信(22)——提高ActiveMQ工作性能(上)3、ActiveMQ性能優化思路4、ActiveMQ中的網絡IO5、JMS規範中的幾個基本概念

5、JMS規範中的幾個基本概念

由于ActiveMQ是JMS規範的完整實作,是以為了講清楚ActiveMQ是如何進行存儲和排程的,就需要首先說明JMS中和存儲、排程有關的幾個概念。它們是:消息收發模式(訂閱-釋出和負載均衡模式)、消息存儲模式(持久化消息和非持久化消息)和訂閱模型(持續訂閱和非持續訂閱)。

5-1、訂閱釋出模式和負載均衡模式

在上篇文章中我們已經詳細過訂閱-釋出模式和負載均衡模式(http://blog.csdn.net/yinwenjie/article/details/50916518#t5)。在ActiveMQ的官方文檔中的描述,他們的英文名稱分别是Topics和Queue。這兩種消息“發送-接受”模式,都是JMS規範中的标準模式。為了使各位讀者在後續的閱讀中,不會和JMS中的其他概念不造成混淆,我在後文章中都将使用規範的名稱進行稱呼。

  • 訂閱-釋出模式:
架構設計:系統間通信(22)——提高ActiveMQ工作性能(上)3、ActiveMQ性能優化思路4、ActiveMQ中的網絡IO5、JMS規範中的幾個基本概念

在“訂閱-釋出”模式下,消息會被複制多份,分别發送給所有“訂閱”者。實際上在後文的描述中您将看到,這個複制的過程實際上沒有您想象的簡單。

  • 負載均衡模式:
架構設計:系統間通信(22)——提高ActiveMQ工作性能(上)3、ActiveMQ性能優化思路4、ActiveMQ中的網絡IO5、JMS規範中的幾個基本概念

在“負載均衡”模式下,一條消息将會發送給一個消息消費者,如果目前Queue沒有消息消費者,消息将進行存儲。同樣通過後文的介紹,您會發現這其中的過程也并不簡單。

5-2、持久化消息和非持久化消息

JMS中對非持久化消息和非持久化消息的稱呼分别是:NON_PERSISTENT Message和PERSISTENT Meaage。它們指的是消息在任何一種“發送-接受”模式下(“訂閱-釋出”模式和“負載均衡模式”),是否進行持久化存儲。

NON_PERSISTENT Message隻存儲在JMS服務節點的記憶體區域,不會存儲在某種持久化媒體上(後面我們要介紹到AcitveMQ可支援的持久化媒體有:KahaBD、AMQ和關系型資料)。在極限情況下,JMS服務節點的記憶體區域不夠使用了,也隻會采用某種輔助方案進行轉存(例如ActiveMQ會使用磁盤上的一個“臨時存儲區域”進行暫存)。一旦JMS服務節點當機了,這些NON_PERSISTENT Message就會丢失。

JMS中對PERSISTENT Meaage的定義是:這些消息不受JMS服務端異常狀态的影響,JMS服務端會使用某種持久化存儲方案儲存這些消息,直到JMS服務端認為這些PERSISTENT Meaage被消費端成功處理。例如ActiveMQ中可以選擇的持久化存儲方案就包括:KahaDB、AMQ和關系型資料庫。

在JMS标準API中,使用setDeliveryMode标記消息發送者是發送的PERSISTENT Meaage還是NON_PERSISTENT Message。示例如下:

......
for(int index =  ; index <  ; index++) {
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText("這是發送的消息内容:" + index);
    if(index %  == ) {
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    } else {
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
    }
    sender.send(outMessage);
}
......
           

那麼當JMS服務節點重新開機後(注意不是producer重新開機),以上代碼中發送的10條消息隻有其中5條消息能夠儲存下來。

5-3、持續訂閱和非持續訂閱

持續訂閱和非持續訂閱,是針對“訂閱-釋出”模式的細分處理政策,在JMS規範中的标準稱呼是:Durable-Subscribers和Non-Durable Subscribers。

Durable-Subscribers是指在“訂閱-釋出”模式下,即使标記為Durable-Subscribers的訂閱者下線了(可能是因為訂閱者當機,也可能是因為這個訂閱者故意下線),“訂閱-釋出”模式的Topic隊列也要儲存這些消息(視消息不同的持久化政策影響,儲存機制不一樣),直到下次這個被标記為Durable-Subscribers的訂閱者重新上線,并正确處理這條消息為止。換句話說,标記為Durable-Subscribers的訂閱者是否能獲得某條消息,和它是否曾經下線沒有任何關系。

Non-Durable Subscribers是指在“訂閱-釋出”模式下,“訂閱-釋出”模式的Topic隊列不用為這些已經下線的訂閱者保留消息。當後者将消息按照既定的廣播規則發送給目前線上的訂閱者後,消息就可以被标記為“處理完成”。

==================接下文

繼續閱讀