天天看點

架構設計:系統間通信(27)——其他消息中間件及場景應用(上)1、概述2、RabbitMQ及特性3、場景應用——電子政務平台:駕駛人違法記錄同步功能

1、概述

目前業界有很多消息中間件可供大家選擇,主要分為兩類:需要付費的商業軟體和開源共享的非商業軟體。對于商業軟體您和您的團隊可以選擇IBM WebSphere內建的MQ功能,也可以選擇Oracle WebLogic內建的MQ功能。本文首先介紹除Apache ActiveMQ以外的兩款開源共享的消息中間件産品,然後列舉三個實際的業務常見,為讀者介紹如何在這些實際業務中使用消息中間件解決問題。

2、RabbitMQ及特性

RabbitMQ基于Erlang語言開發和運作。它與Apache ActiveMQ有很多相同的特性,例如RabbitMQ完整支援多種消息協定:AMQP、STOMP、MQTT、HTTP,我們使用RabbitMQ時會預設使用AMQP1.0 協定。當然,RabbitMQ作為Apache ActiveMQ最主要的競品之一也有其獨特的功能特性。例如RabbitMQ支援一套特有的Routing-Exchange消息路由規則。這套規則可以按照消息内容,自動将消息歸類到不同的消息隊列中。關于這套Routing-Exchange消息路由規則可參見我另一篇文章的詳細介紹:架構設計:系統間通信(20)——MQ:消息協定(下)

2-1、RabbitMQ軟體特性

下面我們來看看RabbitMQ官網上對這款消息中間件軟體的特性介紹:

  • Reliability(可靠性):
RabbitMQ offers a variety of features to let you trade off performance with reliability, including persistence, delivery acknowledgements, publisher confirms, and high availability.

RabbitMQ支援消息持久化、消息重試操作(比ActiveMQ的相關功能還要強大)、消息回執确認規則、消息生産者發送确認機制(實際上是消息生産者端的一種事務機制)和高可用性HA(多節點熱備方案)等特性來提供RabbitMQ服務的高可靠性。

  • Flexible Routing(靈活的路由規則):
Messages are routed through exchanges before arriving at queues. RabbitMQ features several built-in exchange types for typical routing logic. For more complex routing you can bind exchanges together or even write your own exchange type as a plugin.

這就是我們提到的RabbitMQ所支援的一套特有的Routing-Exchange消息路由規則。一定注意這套規則不是AMQP協定規範提供的。關于個消息路由規則可參見我另一篇文章的詳細介紹:架構設計:系統間通信(20)——MQ:消息協定(下)

  • Clustering(RabbitMQ服務叢集):
Several RabbitMQ servers on a local network can be clustered together, forming a single logical broker。

RabbitMQ服務叢集主要解決的問題是單個RabbitMQ服務節點的性能瓶頸。關于RabbmitMQ叢集的搭建過程由于本文篇幅限制,我将随後安排時間為各位讀者詳細介紹。

  • Plugin System & Federation(支援第三方插件子產品,其中RabbitMQ Federation [插件]需要特别說明):

RabbitMQ ships with a variety of plugins extending it in different ways, and you can also write your own.

For servers that need to be more loosely and unreliably connected than clustering allows, RabbitMQ offers a federation model.

RabbitMQ支援第三方擴充插件,在RabbitMQ的官網上(http://www.rabbitmq.com/plugins.html)列舉了各種由RabbitMQ官方開發的插件,以及實驗性質的插件,包括(但不限于):rabbitmq_federation、rabbitmq_management、rabbitmq_mqtt、rabbitmq_stomp、rabbitmq_tracing等等。您還可以按照RabbitMQ提供的插件規範,開發您自己的RabbitMQ-Plugins。特别說明一下rabbitmq_federation 插件:這個插件允許您在多個RabbitMQ Clusters之間傳遞消息。

  • Multi-protocol(多協定支援):
RabbitMQ supports messaging over a variety of messaging protocols.

上文已經提到,RabbitMQ完整支援多種消息協定,包括:AMQP(預設使用該協定)、STOMP、MQTT、HTTP。其中一些協定要安裝相應的插件進行支援,例如rabbitmq_stomp插件。

  • Many Clients(多用戶端/多語言支援):
There are RabbitMQ clients for almost any language you can think of.

您可以想到的各種程式設計語言都可以作為RabbitMQ的用戶端進行連接配接,包括(但不限于):Java 、.NET 、Ruby、 Python 、PHP、 JavaScript、Scala、Groovy……

  • Tracing(消息追溯):
If your messaging system is misbehaving, RabbitMQ offers tracing support to let you find out what’s going on.

如果您發現發送給RabbitMQ的消息存在異常(如發送到了錯誤的隊列中,發送給了錯誤的訂閱者等等),RabbitMQ提供了消息處理過程追溯功能,以便幫助開發人員分析錯誤原因。

2-2、RabbitMQ使用概要

RabbitMQ的安裝過程非常簡單:由于RabbitMQ是基于Erlang語言開發并運作的,是以安裝RabbitMQ的第一步是安裝Erlang運作環境。您可以在https://www.erlang-solutions.com/resources 下載下傳最新Erlang版本進行安裝(注意不同的RabbitMQ版本有不同的Erlang最低版本要求,筆者使用的RabbitMQ版本為V3.5.4,Erlang版本為V18.0);

接下來您可以在RabbitMQ官方(http://www.rabbitmq.com/)下載下傳各種RabbitMQ的安裝版本,建議直接使用各種作業系統對應的rpm檔案進行安裝即可。安裝完成後,可以使用15672端口通路RabbitMQ的管理界面(預設的使用者名和密碼都是guest)

架構設計:系統間通信(27)——其他消息中間件及場景應用(上)1、概述2、RabbitMQ及特性3、場景應用——電子政務平台:駕駛人違法記錄同步功能

以下代碼示範了如何使用RabbitMQ的用戶端開發包,進行消息生産和消費。RabbitMQ的用戶端開發包可以在RabbitMQ官網進行下載下傳(http://www.rabbitmq.com/java-client.html),也可以使用Mavean官方庫進行導入:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.5.4</version>
</dependency>
           
  • RabbitMQ消息生産者
package com.yinwenjie.test.testRabbitMQ;

import java.io.IOException;
import java.util.Date;
import java.util.UUID;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 這個測試類,用于模拟消息生成者,每100毫秒,向rabbit叢集寫入不同的消息
 * @author yinwenjie
 */
public class RabbitProducerThread implements Runnable {
    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(RabbitProducerThread.class);

    public static void main(String[] args) throws Exception {
        new Thread(new RabbitProducerThread()).start();
    }

    public void run() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        //連接配接叢集節點就這麼設定
        // connectionFactory.newConnection(addrs)
        Connection conn = null;
        Channel producerChannel = null;
        try {
            conn = connectionFactory.newConnection();
            producerChannel = conn.createChannel();
        } catch (Exception e) {
            RabbitProducerThread.LOGGER.error(e.getMessage() , e);
            System.exit(-);
        }

        //然後每隔100毫秒,發送一條資料
        while(true) {
            // 消息的唯一編号
            String uuid = UUID.randomUUID().toString();
            String message = uuid + ";time=" + new Date().getTime();
            //設定一些參數
            BasicProperties properties = new BasicProperties().builder().type("String").
                    contentType("text").contentEncoding("UTF-8").messageId(uuid).build();
            try {
                //第一個參數是exchange交換器的名字
                //第二個參數是進行消息路由的關鍵key
                producerChannel.basicPublish("com.ai.sboss.arrangement.event", "com.ai.sboss.arrangement.event.queues", properties, message.getBytes());
                RabbitProducerThread.LOGGER.info("消息發送:" + message);
            } catch (IOException e) {
                RabbitProducerThread.LOGGER.error(e.getMessage() , e);
            }

            synchronized (this) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    RabbitProducerThread.LOGGER.error(e.getMessage() , e);
                }
            }
        }
    }
}
           
  • RabbitMQ消息消費者
package com.yinwenjie.test.testRabbitMQ;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 這個測試類,用于模拟消息消費者
 * @author yinwenjie
 */
public class RabbitConsumerThread implements Runnable {

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(RabbitConsumerThread.class);

    public static void main(String[] args) throws Exception {
        new Thread(new RabbitConsumerThread()).start();
    }

    public void run() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        //連接配接叢集節點就這麼設定
        //connectionFactory.newConnection(addrs)
        Connection conn = null;
        Channel consumerChannel = null;
        try {
            conn = connectionFactory.newConnection();
            consumerChannel = conn.createChannel();
        } catch (Exception e) {
            RabbitConsumerThread.LOGGER.error(e.getMessage() , e);
            System.exit(-);
        }

        //開始監控消息,(ack是手動的)
        QueueingConsumer queueingConsumer = null; 
        try {
            queueingConsumer = new QueueingConsumer(consumerChannel);
            // 設定消費者訂閱的消息隊列名:com.ai.sboss.arrangement.event.queues
            consumerChannel.basicConsume("com.ai.sboss.arrangement.event.queues", false, queueingConsumer);
        } catch (IOException e) {
            RabbitConsumerThread.LOGGER.error(e.getMessage() , e);
            System.exit(-);
        }

        //停頓200毫秒,才處理下一條,以便模拟事件處理對應的消耗事件
        while(true) {
            QueueingConsumer.Delivery delivery = null;
            try {
                delivery = queueingConsumer.nextDelivery();
            } catch (Exception e) {
                RabbitConsumerThread.LOGGER.error(e.getMessage() , e);
            }
            long deliverytag = delivery.getEnvelope().getDeliveryTag();
            byte[] messageBytes = delivery.getBody();
            BasicProperties properties = delivery.getProperties();
            String message = new String(messageBytes);
            RabbitConsumerThread.LOGGER.info("收到事件======message = " + message + " | properties =" +properties);

            //這裡停頓200毫秒,模拟業務時間
            synchronized (this) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    RabbitConsumerThread.LOGGER.error(e.getMessage() , e);
                }
            }

            RabbitConsumerThread.LOGGER.info("事件message = " + message + "處理完成,發送ack。等待下一條消息");
            try {
                // 發送ack:消息處理成功的确認信号
                consumerChannel.basicAck(deliverytag, false);
            } catch (IOException e) {
                RabbitConsumerThread.LOGGER.error(e.getMessage() , e);
            }
        }
    }
}
           

3、場景應用——電子政務平台:駕駛人違法記錄同步功能

3-1、業務場景說明

架構設計:系統間通信(27)——其他消息中間件及場景應用(上)1、概述2、RabbitMQ及特性3、場景應用——電子政務平台:駕駛人違法記錄同步功能

隻是一個為了彙總全國機動車違法記錄而設計的多系統資料同步功能。最主要的功能是進行違法記錄的上傳以及在各省間同步跨省違法記錄。在進行架構設計之前,我們首先需要了解一些關于整個系統業務背景:任何系統設計都不能脫離系統實際業務背景而存在!

  • 首先整個系統分為全國系統和32個省級系統:由于每個省都有符合該省實際情況的、處理過程完全不同的違法記錄處理操作。并且每個省的駕管系統電子化推進情況也不盡相同:有的省已經走在了全國的前列,基本上所有駕管業務資料都已經與全國系統實作了同步;有的省可能才開始建設,甚至都沒有自己的違法記錄電子資訊。
  • 違法記錄資訊的同步過程分為上行同步和下行同步:駕駛人違法記錄資訊需要從省級系統實作到全國系統的同步(至于是省級系統确認違法資訊時立即進行同步,還是省級系統在某個固定的時間周期統一進行同步,這就是給各省級系統自己的處置權了),這樣的同步過程稱為上行同步。如果某個違法者是在本省違法的,那麼直接進行上行同步就可以了;如果某個違法者是在外省違法,那麼除了進行上行同步外,當全國系統發現這是一條異地違法記錄,并且違法者身份證所在省已經接入了全國系統,就需要通過全國系統将這條違法記錄向違法者身份證所在省的省級系統進行同步,這樣的同步過程稱為下行同步。
  • 如果某省的系統新接入了全國系統,那麼全國系統需要在這個省的系統同步功能子產品準備好後,将這個省接入全國系統前所相關的跨省異地違法記錄全部進行一次上行同步和下行同步。那麼什麼叫“省級系統準備好”呢?不一定接入全國系統的每個省級系統都能立刻穩定的工作,任何系統都有一個穩定周期。在這個穩定周期内,開發團隊需要完成諸如觀察系統工作情況、調整功能子產品的運算性能、進行軟體Bug修改、進行軟體操作過程優化等等工作。
  • 另外各省的軟體供應商不盡一樣,使用的開發語言也不相同。是以在考慮接入方案時,需要方案支援多種程式設計語言,或者是使用多種語言都支援的一種通用協定。另外省級系統和全國系統應該盡可能的進行業務脫耦,這樣才可以保證省級系統的軟體供應商不必為了實作違法記錄上行同步和下行同步功能專門更改程式設計語言,也不必為了實作以上功能專門調整省級系統的固有業務過程和系統架構(有的時候因為技術問題調整業務過程客戶方是絕對不會答應的)。
  • 由于是示範場景,目的是示範消息系統中間件在這個需求場景實作方案中的作用。是以我們假設整個需求環境是具有“要實作違法記錄資訊同步”的前置功能/前置條件的。這些前置功能/前置環境包括(但不限于):全國駕駛人基本檔案資訊庫(這部分資訊可能也是通過各省級系統同步而來)、全國人口身份資訊庫等。

3-2、總體設計思路

以上業務場景是一個典型的需要使用支援事務的消息中間件的應用場景——追求消息到達和處理的穩定性,您可以使用本章我們詳細介紹的ActiveMQ也可以使用上一節介紹的RabbitMQ:因為他們都支援多語言接入,都提供消息事務支援,都支援消費者側的消息回執确認。另外,這個業務場景中也要兼顧一定的資料吞吐量。

架構設計:系統間通信(27)——其他消息中間件及場景應用(上)1、概述2、RabbitMQ及特性3、場景應用——電子政務平台:駕駛人違法記錄同步功能
  • 在已有的系統中加入消息隊列服務最大的目的是保持已有系統的原始架構不作調整。不作調整的原因可能是因為原有系統由于設計不當已經不可能再做大的調整,否則将付出無法承受的代價;也可能是由于非技術原因,技術團隊沒有相應的權限調整已有架構設計。
  • 采用消息隊列服務方案的另一個優點是可以緩解資料洪峰。在這個示例場景中最典型的展現就是:需求中明确的提到,當一個省級系統新接入時,需要進行一次完整的違法記錄的上行同步和下行同步。這樣的話有可能在這個省級系統上積累了7、8年的違法記錄會被同步到全國系統,這個過程可能會出現一定的資料堆積。但是由于我們給出的消息服務中間件的資料持久化性能較為強勁(請參見下一小節的詳細設計),是以資料同步壓力基本上不會傳遞到上層系統的業務處理層。
  • 分析場景中對于省級系統接入的需求描述,技術層面上最大的幾個問題是:不同省級系統采用的架構不一樣,使用的程式設計語言不一樣,技術團隊水準不一樣。為了保證接入方案的安全效果、性能效果和工作效率,全國系統應該為省級系統提供不同的語言開發包和內建文檔(類似于內建微信/支付寶/淘寶等開放平台);根據經驗,全國系統應首先為各省級系統優先提供JAVA和C#的內建開發包。
  • 開發包中主要對連接配接消息服務隊列的行為進行封裝、對上行消息和下行消息的文本格式進行規範(保證各省系統上行消息的文本格式是一緻的,保證各省收到的下行消息都是上級系統所統一的格式)、 對消息的加密和解密協定進行封裝、對消息發送過程和消息訂閱過程進行封裝(包括消息生産者進行上行消息的發送和消息消費者進行下行消息的接收)。另外,為了保證傳輸過程文本消息的通訊安全,開發包中還封裝了SSL加密/解密過程。
  • 最後,由于要保證所有的上行消息和下行消息一定會被目标系統正常處理。是以這些消息都應該是PERSISTENT Meaage形式的消息。并且無論是上行消息還是下行消息,都應該在超出重試次數後被放置到“死信隊列”(Dead Letter Queue),以便進行人工幹預。重試次數應該設定為2——3次左右,因為ActiveMQ預設重發6次(redeliveryCounter==6)的值過大,在消息出現問題時重試次數過多會嚴重影響消息中間件服務的處理效率。

3-3、消息隊列服務詳細設計

下面我們來具體分析一下在這個執行個體場景下消息隊列服務部分的架構設計(即上圖中“基于ActiveMQ的消息隊列服務”部分的設計)。架構詳細設計部分分為硬體結構設計和軟體規則設計部分,我們首先讨論硬體設計部分的方案。

3-3-1、硬體方案部分

其中硬體部分的設計來源于上一節文章中已經提到的ActiveMQ服務叢集的綜合應用(《架構設計:系統間通信(26)——ActiveMQ叢集方案(下)》),為了保證每個ActiveMQ節點都能高效工作,我們還按照上文提到的ActiveMQ服務單節點的性能優化原則進行了相應配置(《架構設計:系統間通信(22)——提高ActiveMQ工作性能(上)》)。

架構設計:系統間通信(27)——其他消息中間件及場景應用(上)1、概述2、RabbitMQ及特性3、場景應用——電子政務平台:駕駛人違法記錄同步功能

在這個示例的應用場景中,雖然高并發性并不是建設方主要追求的。但如上文所述,為了保證在資料洪峰出現時資料處理壓力不傳遞給業務服務,并且ActiveMQ服務叢集能夠盡快完成資料洪峰的吞吐工作(在建設方預算允許的情況下),我們為每一組ActiveMQ M/S叢集選擇了IBM的基于SAN(Storage Area Network)的共享存儲解決方案。其中使用的IBM Storwize V7000存儲盤陣設定成RIDA5模式,并配置20TB存儲空間。

實際上在這個示例場景中,之是以采用這樣的硬體設計方案更是為了在有限的篇幅内為讀者講解更多的設計方式。由于使用了基于SAN的共享存儲方案,是以之前提到的LevelDB + zookeeper的熱備方案就不必再使用了(當然LevelDB + zookeeper的方案也是可選方案)。為了節約成本,也可以多組SAN共享存儲使用用一台FC 光交換機和一台存儲盤陣,但是這樣可能出現因為FC光交換機的單點故障或者磁盤陣列單點故障導緻整個叢集當機的情況:

架構設計:系統間通信(27)——其他消息中間件及場景應用(上)1、概述2、RabbitMQ及特性3、場景應用——電子政務平台:駕駛人違法記錄同步功能

3-3-2、軟體規則部分

在前文提到,由于省級系統都使用了全國系統統一提供的開發包進行上行消息和下行消息的處理,是以接入消息同步功能的所有系統都不必擔心消息文本的格式問題;那麼在ActiveMQ消息隊列服務的業務規則部分,最重要的規則就是如何規劃上行消息和下行消息存儲的隊列。

架構設計:系統間通信(27)——其他消息中間件及場景應用(上)1、概述2、RabbitMQ及特性3、場景應用——電子政務平台:駕駛人違法記錄同步功能

如上圖所示所有省級系統的上行消息同時共享一個消息隊列,這是因為這些省級系統都是使用上級系統統一提供的開發包進行二次開發,是以無論哪個省級系統向上同步的消息格式都是一緻的(且進行了内容加密),是以它們可以共享一個消息隊列,并由上級系統使用一套相同的處理邏輯進行接受。

當上級系統發現有跨省産生的違法記錄時,就需要通過下行隊列将這個違法記錄發送給違法者所在省的省級系統,這些下行資訊由于有不同的消費者(省級系統),且這些消費者所涉及的業務處理邏輯都可能不一樣,是以應該使用不同的消息隊列來發送針對不同省級系統的下行隊列。另外,這樣的消息下發機制還可以保證在省級系統出現故障時,下行消息不會丢失——直到這些下行消息被對應的省級系統正确處理。

3-4、主要代碼片段

由于整個方案需要相當的代碼編寫工作,是以不可能在這個示例場景中示範所有的代碼實作。為了讓讀者能夠了解其中更細節的實作情況,在這個小節中我們重點示範主要的代碼實作片段(使用Java語言)。包括省級系統開發包中如何進行上行隊列的連接配接,如何開始監聽下行隊列——隻有同時成功建立上行隊列連接配接和下行隊列連接配接,才能認為資訊同步子產品啟動成功了。

為了保證資訊同步子產品獨立于現有系統的其他功能子產品進行工作,應該使用專門的新線程建立上行隊列連接配接和下行隊列連接配接:

package mq.test.blog;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;

/**
 * 這個啟動器用于啟動上行隊列和下行隊列的連接配接。
 * 上行隊列是一個獨立的線程,下行隊列也是一個獨立的線程。
 * 
 * 另外,上行隊列和下行隊列都可以使用同一個session
 * @author yinwenjie
 */
public class ClientStartup implements Runnable {

    /**
     * 下行隊列名稱(可存放于配置檔案中)
     */
    private String downStream = "downStream";

    /**
     * 保證整個程序隻有一個Producer被建立和使用
     */
    private static MessageProducer PRODUCER;

    /**
     * 标示該啟動器是否正常連接配接到消息中間件服務
     */
    private static boolean ISSTARTED = false;

    /**
     * 這個靜态方法用于從ClientStartup啟動器中擷取整個程序中唯一一個消息生産者。
     * 注意,為了保證該程序其它線程安全擷取ClientStartup.PRODUCER,
     * 是以隻有等待run()方法成功運作完成,該ClientStartup.PRODUCER才能被其它線程拿到。
     * @return
     */
    public static MessageProducer getNewInstanceProducer() {
        synchronized (ClientStartup.class) {
            while(!ClientStartup.ISSTARTED) {
                try {
                    ClientStartup.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace(System.out);
                }
            }
        }

        return ClientStartup.PRODUCER;
    }

    @Override
    public void run() {
        // 開發包中對于消息中間件服務的連接配接一定要使用故障轉移
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://192.168.61.138:61616,tcp://192.168.61.139:61616)");
        // 這是上行消息隊列
        ActiveMQQueue upstreamQueue = new ActiveMQQueue("upstream");
        // 這是下行消息隊列
        // 下行消息隊列必須由上級系統建立,并且在下級系統使用的開發包所對應的配置檔案中進行配置
        ActiveMQQueue downStreamQueue = new ActiveMQQueue(this.downStream);

        //============開始建立
        Connection connection = null;
        Session session = null;
        try {
            //ack優化選項
            connectionFactory.setOptimizeAcknowledge(true);
            connectionFactory.setProducerWindowSize();
            connectionFactory.setSendAcksAsync(true);
            //ack資訊最大發送周期
            connectionFactory.setOptimizeAcknowledgeTimeOut();
            //連接配接屬性優化:設定重試次數為2
            RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy();
            redeliveryPolicy.setMaximumRedeliveries();
            //連接配接屬性優化:設定預取數量
            ActiveMQPrefetchPolicy prefetchPolicy =  connectionFactory.getPrefetchPolicy();
            prefetchPolicy.setQueuePrefetch();
            //設定擷取消息的線程池大小
            connectionFactory.setMaxThreadPoolSize();
            connection = connectionFactory.createQueueConnection();
            //連接配接
            connection.start();

            //建立會話(設定一個帶有事務特性的會話)
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        } catch(Exception e) {
            e.printStackTrace(System.out);
            return;
        }

        //===========首先進行訂閱消費者連接配接(下行消息隊列的連接配接)
        //注意,正式代碼中不應該允許JMS Client建立一個新的隊列
        //是以應該使用其它方式(例如其他查詢接口),在建立前判斷隊列是否已經存在
        MessageConsumer consumer;
        try {
            consumer = session.createConsumer(downStreamQueue);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    /*
                     * 這裡進行正式業務的處理
                     * */
                }
            });
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            // 一旦出錯,就關閉整個連接配接,退出啟動過程
            try {
                connection.close();
            } catch (JMSException e1) {
                e.printStackTrace(System.out);
            }
            return;
        }

        //==========然後建立消息生産者倆呢及(上行消息隊列的連接配接)
        try {
            ClientStartup.PRODUCER = session.createProducer(upstreamQueue);
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            // 一旦出錯,就關閉整個連接配接,退出啟動過程
            try {
                connection.close();
            } catch (JMSException e1) {
                e.printStackTrace(System.out);
            }
            return;
        }

        //==========通知其他線程可以擷取producer了
        ClientStartup.ISSTARTED = true;
        synchronized (ClientStartup.class) {
            ClientStartup.class.notify();
        }

        //==========鎖定該線程
        synchronized (this) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace(System.out);
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new ClientStartup()).start();
    }
}
           

3-5、其它說明

  • 安全性考量:在正式環境中使用消息隊列中間件服務一定要做相關的安全性設定。包括啟用消息隊列服務的使用者名和密碼、啟用消息隊列服務自帶的SSL加密設定。如果您使用的消息隊列服務不自帶SSL加密,則一定要自己進行加密。幸運的是,如果您使用的是ActiveMQ,那麼以上兩種安全性要求都可以滿足。甚至ActiveMQ還支援為每一個隊列單獨進行使用者名和密碼設定。
  • 錯誤資料的處理:在正式環境中使用消息隊列中間件服務一定要假設會發生傳輸的消息由于各種業務原因導緻的消費者處理錯誤的情況。是以對超出redeliveryCounter重試次數的錯誤消息一定要轉存到另外的“待處理區域”,并在後續進行人工幹預。在ActiveMQ中這個“待處理區域”就是死消息隊列:ActiveMQ.DLQ。
  • 在産品預算内賦予消息服務中間件最大的可用性:類似于ActiveMQ、RabbitMQ這樣的消息隊列中間件,其目的并不是一味地追求機關時間内消息資料的吞吐量/并發量的處理能力。它們的功能中涵蓋了諸多功能:事務機制、确認機制、重試機制、熱備機制等等,都是為了一個更重要的功能目的:保證消息完整可達。是以您和您的團隊一定要按照業務特性來确定是否适合使用這樣的中間件服務,并且您需要在預算範圍内為您的消息服務中間件配置多個服務節點、多個存儲單元,以便保證消息隊列中間件能夠完成它的任務——消息完整可達。

下文我們一起讨論一下那些專門解決高資料吞吐性能問題的消息中間件産品以及它們的應用場景。

(接下文)