前言
MQ——Message Queue,中文翻譯為“消息隊列”,維基百科上的這樣描述:
消息隊列(英語:Message queue)是一種程序間通信或同一程序的不同線程間的通信方式,軟體的貯列用來處理一系列的輸入,通常是來自使用者。消息隊列提供了異步的通信協定,每一個貯列中的紀錄包含詳細說明的資料,包含發生的時間,輸入裝置的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不需要同時與消息隊列互交。消息會儲存在隊列中,直到接收者取回它。
實際應用中有多種MQ,包括MSMQ,Active MQ,Jboss MQ等.本文簡單介紹.net環境下Active MQ入門。
準備工作
1.下載下傳Active MQ,位址:http://activemq.apache.org/download.html(我下的是ActiveMQ 5.7.0 Release),下載下傳完成後解壓縮即可。ActiveMQ是一個開源的JMS伺服器;
2.下載下傳NMS,位址:http://activemq.apache.org/nms/download.html。網站提供源碼下載下傳,可以根據需要編譯成指定.net framework版本。如果下載下傳的是release版本,直接将具體.net版本目錄下Apache.NMS.ActiveMQ.dll和Apache.NMS.dll拷貝出來。
Hello ActiveMQ
1.建立空解決方案:HelloActiveMQ.sln
2.在目前解決方案中分别建立兩個控制台程式:ConsoleCustomer.csproj(消費者)和ConsoleProcucer.csproj(生産者)。生産者将消息發送給ActiveMQ,然後ActiveMQ将消息推送給符合條件的消費者;
3.在ConsoleCustomer.csproj(消費者)和ConsoleProcucer.csproj(生産者)兩個項目中均添加對Apache.NMS.ActiveMQ.dll和Apache.NMS.dll的引用;
4.在ConsoleProcucer項目的program.cs中添加如下代碼(代碼中已經又注釋):
class Program
{
static IConnectionFactory _factory = null;
static IConnection _connection = null;
static ITextMessage _message = null;
static void Main(string[] args)
{
//建立工廠
_factory = new ConnectionFactory("tcp://localhost:61616/");
try
{
//建立連接配接
using (_connection = _factory.CreateConnection())
{
//建立會話
using (ISession session = _connection.CreateSession())
{
//建立一個主題
IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
//建立生産者
IMessageProducer producer = session.CreateProducer(destination);
Console.WriteLine("Please enter any key to continue! ");
Console.ReadKey();
Console.WriteLine("Sending: ");
//建立一個文本消息
_message = producer.CreateTextMessage("Hello AcitveMQ....");
//發送消息
producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
Console.ReadLine();
}
}
5.在ConsoleCustomer項目的program.cs中添加如下代碼:
class Program
{
static IConnectionFactory _factory = null;
static void Main(string[] args)
{
try
{
//建立連接配接工廠
_factory = new ConnectionFactory("tcp://localhost:61616/");
//建立連接配接
using (IConnection conn = _factory.CreateConnection())
{
//設定用戶端ID
conn.ClientId = "Customer";
conn.Start();
//建立會話
using (ISession session = conn.CreateSession())
{
//建立主題
var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
//建立消費者
IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false);
//注冊監聽事件
consumer.Listener += new MessageListener(consumer_Listener);
//這句代碼非常重要,
//這裡沒有read方法,Session會話會被關閉,那麼消費者将監聽不到生産者的消息
Console.Read();
}
//關閉連接配接
conn.Stop();
conn.Close();
}
}
catch (Exception ex)
{
Console.Write(ex.ToString());
}
}
/// <summary>
/// 消費監聽事件
/// </summary>
/// <param name="message"></param>
static void consumer_Listener(IMessage message)
{
ITextMessage msg = (ITextMessage)message;
Console.WriteLine("Receive: " + msg.Text);
}
上述代碼生成成功後,一個ActiveMQ的入門程式已經完成,接下來便是測試了。
測試
1.啟動之前解壓過的ActiveMQ程式,在apache-activemq-5.7.0\bin目錄下運作activemq.bat批處理檔案。運作截圖如下:
2.分别運作之前生成的ConsoleProcucer.exe.和ConsoleCustomer.exe,在ConsoleProcucer.exe中根據提示輸入任意鍵。我們便可以檢測ConsoleCustomer.exe有沒有接收到我們發送的“Hello ActiveMQ”文本。結果如下:
ConsoleProcucer.exe截圖
ConsoleCustomer.exe截圖
通過驗證我們發現通過ActiveMQ發送消息成功。
總結:
1.ConnectionFactory 是連接配接工廠,負責建立Connection,Connection 負責建立 Session,Session 建立 MessageProducer(用來發消息) 和 MessageConsumer(用來接收消息),Destination 是消息的目的地;
2.Connection一定要調用Start(),才能接收消息。關閉Connection時先調用Stop(),然後再調用Close();
3.Session在接收消息之前不能關閉,否則監聽不到消息;
4.Connection,Session不使用時應該及時關閉,避免記憶體洩露;
5.ActiveMQ支援topic(主題,廣播消息)和Queue(隊列,點對點消息)。
注:源碼中另外提供了winform實作。需要注意的是winform中消息監聽事件在接受到消息後,由于目前不是處于UI線程,是不能直接操作Winform UI的,需要使用Invoke,詳情請參照源碼。
點選下載下傳源代碼
為系統而生,為架構而死,為debug奮鬥一輩子; 吃符号的虧,上大小寫的當,最後死在需求上。