天天看點

ActiveMQ

前言

 MQ——Message Queue,中文翻譯為“消息隊列”,維基百科上的這樣描述:

消息隊列(英語:Message queue)是一種程序間通信或同一程序的不同線程間的通信方式,軟體的貯列用來處理一系列的輸入,通常是來自使用者。消息隊列提供了異步的通信協定,每一個貯列中的紀錄包含詳細說明的資料,包含發生的時間,輸入裝置的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不需要同時與消息隊列互交。消息會儲存在隊列中,直到接收者取回它。

實際應用中有多種MQ,包括MSMQ,Active MQ,Jboss MQ等.本文簡單介紹.net環境下Active MQ入門。

準備工作

1.下載下傳Active MQ,位址:http://activemq.apache.org/download.html(我下的是ActiveMQ 5.7.0 Release),下載下傳完成後解壓縮即可。ActiveMQ是一個開源的JMS伺服器;

2.下載下傳NMS,位址:http://activemq.apache.org/nms/download.html。網站提供源碼下載下傳,可以根據需要編譯成指定.net framework版本。如果下載下傳的是release版本,直接将具體.net版本目錄下Apache.NMS.ActiveMQ.dll和Apache.NMS.dll拷貝出來。

Hello ActiveMQ

1.建立空解決方案:HelloActiveMQ.sln

2.在目前解決方案中分别建立兩個控制台程式:ConsoleCustomer.csproj(消費者)和ConsoleProcucer.csproj(生産者)。生産者将消息發送給ActiveMQ,然後ActiveMQ将消息推送給符合條件的消費者;

3.在ConsoleCustomer.csproj(消費者)和ConsoleProcucer.csproj(生産者)兩個項目中均添加對Apache.NMS.ActiveMQ.dll和Apache.NMS.dll的引用;

4.在ConsoleProcucer項目的program.cs中添加如下代碼(代碼中已經又注釋):

class Program
    {
        static IConnectionFactory _factory = null;
        static IConnection _connection = null;
        static ITextMessage _message = null;

        static void Main(string[] args)
        {
            //建立工廠
            _factory = new ConnectionFactory("tcp://localhost:61616/");

            try
            {
                //建立連接配接
                using (_connection = _factory.CreateConnection())
                {
                    //建立會話
                    using (ISession session = _connection.CreateSession())
                    {
                        //建立一個主題
                        IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");

                        //建立生産者
                        IMessageProducer producer = session.CreateProducer(destination);

                        Console.WriteLine("Please enter any key to continue! ");
                        Console.ReadKey();
                        Console.WriteLine("Sending: ");

                        //建立一個文本消息
                        _message = producer.CreateTextMessage("Hello AcitveMQ....");

                        //發送消息
                        producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
                    }
                }

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }

            Console.ReadLine();

        }
    }      

5.在ConsoleCustomer項目的program.cs中添加如下代碼:

class Program
    {
        static IConnectionFactory _factory = null;

        static void Main(string[] args)
        {
            try
            {
                //建立連接配接工廠
                _factory = new ConnectionFactory("tcp://localhost:61616/");
                //建立連接配接
                using (IConnection conn = _factory.CreateConnection())
                {
                    //設定用戶端ID
                    conn.ClientId = "Customer";
                    conn.Start();
                    //建立會話
                    using (ISession session = conn.CreateSession())
                    {
                        //建立主題
                        var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
                        
                        //建立消費者
                        IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false);

                        //注冊監聽事件
                        consumer.Listener += new MessageListener(consumer_Listener);
                       
                        //這句代碼非常重要,
                        //這裡沒有read方法,Session會話會被關閉,那麼消費者将監聽不到生産者的消息
                        Console.Read();
                    }

                    //關閉連接配接
                    conn.Stop();
                    conn.Close();
                }

            }
            catch (Exception ex)
            {
                Console.Write(ex.ToString());
            }

        }

        /// <summary>
        /// 消費監聽事件
        /// </summary>
        /// <param name="message"></param>
        static void consumer_Listener(IMessage message)
        {
            ITextMessage msg = (ITextMessage)message;
            Console.WriteLine("Receive: " + msg.Text);  
        }      

上述代碼生成成功後,一個ActiveMQ的入門程式已經完成,接下來便是測試了。

測試

 1.啟動之前解壓過的ActiveMQ程式,在apache-activemq-5.7.0\bin目錄下運作activemq.bat批處理檔案。運作截圖如下:

ActiveMQ

 2.分别運作之前生成的ConsoleProcucer.exe.和ConsoleCustomer.exe,在ConsoleProcucer.exe中根據提示輸入任意鍵。我們便可以檢測ConsoleCustomer.exe有沒有接收到我們發送的“Hello ActiveMQ”文本。結果如下:

ConsoleProcucer.exe截圖

ActiveMQ

ConsoleCustomer.exe截圖

ActiveMQ

通過驗證我們發現通過ActiveMQ發送消息成功。

 總結:

1.ConnectionFactory 是連接配接工廠,負責建立Connection,Connection 負責建立 Session,Session 建立 MessageProducer(用來發消息) 和 MessageConsumer(用來接收消息),Destination 是消息的目的地;

2.Connection一定要調用Start(),才能接收消息。關閉Connection時先調用Stop(),然後再調用Close();

3.Session在接收消息之前不能關閉,否則監聽不到消息;

4.Connection,Session不使用時應該及時關閉,避免記憶體洩露;

5.ActiveMQ支援topic(主題,廣播消息)和Queue(隊列,點對點消息)。

注:源碼中另外提供了winform實作。需要注意的是winform中消息監聽事件在接受到消息後,由于目前不是處于UI線程,是不能直接操作Winform UI的,需要使用Invoke,詳情請參照源碼。

 點選下載下傳源代碼

為系統而生,為架構而死,為debug奮鬥一輩子; 吃符号的虧,上大小寫的當,最後死在需求上。