天天看點

ActiveMQ源碼架構解析第一節(轉)

  工作四年已久,也快到了而立之年,本人也酷愛技術,總是想找一些途徑來提升自己,想着溫故而知新是以就寫起了部落格,然而寫部落格這個想法也是醞釀了很久,近期也看到了有很多人在問關于ActiveMQ的相關問題,有幸接觸ActiveMQ從今天開始我會定期總結一些關于ActiveMQ的相關知識,本篇重點是先分析ActiveMQ的架構以及設計模式等相關知識,然後在說如何更好的使用,讓ActiveMQ發揮最好的性能,歡迎大家跟帖讨論,有錯誤的地方還請大家不吝雅正,小方在此謝過!

第一篇文章我們先從hello world寫起,下面是使用java代碼調用activemq的api發送一條消息。

public class test {

    public static void main(String[] args) throws Exception {

       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

              "tcp://localhost:61616");

       Connection connection = connectionFactory.createConnection(); 

       connection.start();      

       Session session = connection.createSession(false,

              Session.AUTO_ACKNOWLEDGE);

       MessageProducer producer = session.createProducer(session

              .createQueue("testQueue"));

       Message message = session.createTextMessage("hello everybody!");

       producer.send(message);

       producer.close();

       session.close();

       connection.close();

    }

}

下面來分析發送消息的幾個步驟,如下:

1、建立ActiveMQConnectionFactory,入參是url,指定schema以及要連接配接的ip和端口号,其次是建立ActiveMQConnection,tcp協定互動肯定是要使用Socket類,是以說明下ActiveMQConnection->Transport->Socket的關系,Transport是對Socket的封裝,而ActiveMQConnection則是對Transport的封裝,如下圖所示:

2、建立ActiveMQConnection,因為上圖已經說明ActiveMQConnection和Transport是組合關系,是以建立ActiveMQConnection時首先要建立Transport,因為ActiveMQ的互動方式分為Tcp、Udp以及HTTP協定,ActiveMQ使用了非常經典的簡單工廠設計模式,使用這個模式的好處是工廠可以根據uri的schema頭來動态建立相應的TransportFactory工廠,例如使用者輸入tcp://localhost:61616,ObjectFactory則可以擷取到schema是tcp然後來執行個體化TcpTransportFactory,然後在調用TcpTransportFactory工廠來生産TcpTransport對象,簡單工廠模式如下圖,我是把2個工廠畫到了一起:          建立完TcpTransport還不夠,因為TcpTransport隻實作了發送和接受消息,還需要做一些封裝來實作相應的業務處理,說到封裝這裡使用到了包裝設計模式,也叫裝飾者模式,jdk的java.io輸入輸出流也有使用,類圖如下:   

1、1、MutexTransportFilter類實作了對每個請求的同步鎖,同一時間隻允許發送一個請求,如果有第二個請求需要等待第一個請求發送完畢才可繼續發送。

2、2、WireFormatNegotiator類實作了在用戶端連接配接broker的時候先發送資料解析相關的協定資訊,例如解析版本号,是否使用緩存等資訊。

3、3、InactivityMonitor類實作了連接配接成功後啟動心跳檢查機制,用戶端每10秒發送一次心跳資訊,服務端每30秒讀一次心跳資訊,如果沒有讀到則會斷開連接配接,心跳檢測是互相的,用戶端也會每30秒讀取服務端發送來的心跳資訊,如果沒有讀到也一樣會斷開連接配接。

4、4、ResponseCorrelator類實作了異步請求但需要擷取響應資訊否則就會阻塞等待功能。

建立ActiveMQConnection的時序圖如下:

用戶端調用createConnection()方法,通過TcpTransportFactory擷取到TcpTransport對象,得到TcpTransport對象後,TcpTransportFactory又調用自己的configure方法對TcpTransport進行了包裝,代碼如下:

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {

    transport = compositeConfigure(transport, wf, options);

    transport = new MutexTransport(transport);

    transport = new ResponseCorrelator(transport);

    return transport;

這就是包裝設計模式的使用,當調用transport.request()發送消息時,時序圖如下:

       每次transport調用request方法時都會先判斷next是否為空,如果不為空則先調用next.request(),當然在調用之前和之後都會加入相應的邏輯,包裝設計模式的好處就是當我不想啟動心跳檢測功能時則可以非常簡單的實作,隻需要不為WireFormatNegotiator設定next屬性為InactivityMonitor即可,擷取加入其它功能不需要修改原來的設計隻需要新增一個類即可,充分的展現了設計模式的開放封閉原則以及單一職責原則,設計模式在ActiveMQ中可以說是使用的淋漓盡緻,恰到好處。

    建立完ActiveMQConnection之後就是建立ActiveMQSession以及ActiveMQMessageProducer了,這兩個對象的建立就比較簡單了沒什麼可說的,當然在建立這些對象的時候用戶端會發送相應的資訊給服務端,本節主要講解連接配接的建立。資訊的互動以及Message的發送就留到下一節在說了,第一次寫這麼長的博文,可能有的地方講的思路不清晰,還請大家多多指正,謝謝大家,睡覺了,明天還要加班!

http://www.iteye.com/topic/1137523