消息隊列簡介及應用
msmq(microsoft message queue,微軟消息隊列)是在多個不同的應用之間實作互相通信的一種異步傳輸模式,互相通信的應用可以分布于同一台機器上,也可以分布于相連的網絡空間中的任一位置。
它的實作原理是:
消息的發送者把自己想要發送的資訊放入一個容器中(我們稱之為message),然後把它儲存
至一個系統公用空間的消息隊列(message queue)中;本地或者是異地的消息接收程式再從該隊列中取出發給它
的消息進行處理。
在消息傳遞機制中,有兩個比較重要的概念。一個是消息,一個是隊列。消息是由通信的雙方所需要傳遞的資訊,它可以是各式各樣的媒體,如文本、聲音、圖象等等。
消息最終的了解方式,為消息傳遞的雙方事先商定,這樣做的好處是,一是相當于對資料進行了簡單的加密,二則采用自己定義的格式可以節省通信的傳遞量。消息可以含有發送和接收者的辨別,這樣隻有指定的使用者才能看到隻傳遞給他的資訊和傳回是否操作成功的回執。消息也可以含有時間戳,以便于接收方對某些與時間相關的應用進行處理。消息還可以含有到期時間,它表明如果在指定時間内消息還未到達則廢棄,這主要應用與時間性關聯較為緊密的應用。
消息隊列是發送和接收消息的公用存儲空間,它可以存在于記憶體中或者是實體檔案中。
消息可以以兩種方式發送,即快遞方式(express)和可恢複模式(recoverable),它們的差別在于,快遞方式為了消息的快速傳遞,把消息放置于記憶體中,而不放于實體磁盤上,以擷取較高的處理能力;可恢複模式在傳送過程的每一步驟中,都把消息寫入實體磁盤中,以得到較好的故障恢複能力。
消息隊列可以放置在發送方、接收方所在的機器上,也可以單獨放置在另外一台機器上。正是由于消息隊列在放置方式上的靈活性,形成了消息傳送機制的可靠性。當儲存消息隊列的機器發生故障而重新啟動以後,以可恢複模式發送的消息可以恢複到故障發生之前的狀态,而以快遞方式發送的消息則丢失了。另一方面,采用消息傳遞機制,發送方必要再擔心接收方是否啟動、是否發生故障等等非必要因素,隻要消息成功發送出去,就可以認為處理完成,而實際上對方可能甚至未曾開機,或者實際完成交易時可能已經是第二天了。
采用msmq帶來的好處是:
由于是異步通信,無論是發送方還是接收方都不用等待對方傳回成功消息,就可以執行餘下的代碼,因而大大地提高了事物處理的能力;當資訊傳送過程中,資訊發送機制具有一定功能的故障恢複能力;msmq的消息傳遞機制使得消息通信的雙方具有不同的實體平台成為可能。
在微軟的.net平台上利用其提供的msmq功能,可以輕松建立或者删除消息隊列、發送或者接收消息、甚至于對消息隊列進行管理。
在.net産品中,提供了一個msmq類庫“system.messaging.dll”。它提供了兩個類分别對消息對象和消息隊列對象
進行操作。在能夠使用msmq功能之前,你必須确定你的機器上安裝了msmq消息隊列元件,并確定服務正在運作中。在使用asp.net程式設計時,應在頭部使用:
<%@ assembly name=”system.messaging”%>
<%@ import namespace=”system.messsaging”%>
利用 msmq(microsoft message queue),應用程式開發人員可以通過發送和接收消息友善地與應用程式進行快速可靠的通信。消息處理為您提供了有保障的消息傳遞和執行許多業務處理的可靠的防故障方法。
msmq與xml web services和.net remoting一樣,是一種分布式開發技術。但是在使用xml web services或.net remoting元件時,client端需要和server端實時交換資訊,server需要保持聯機。msmq則可以在server離線的情況下工作,将message臨時儲存在client端的消息隊列中,以後聯機時再發送到server端處理。
顯然,msmq不适合于client需要server端及時響應的這種情況,msmq以異步的方式和server端互動,不用擔心等待server端的長時間處理過程。
雖然xml web services和.net remoting都提供了[oneway]屬性來處理異步調用,用來解決server端長方法調用長時間阻礙client端。但是不能解決大量client負載的問題,此時server接受的請求快于處理請求。
一般情況下,[oneway]屬性不用于專門的消息服務中。
1. 基本術語和概念(basic terms and concepts)
“消息”是在兩台計算機間傳送的資料機關。消息可以非常簡單,例如隻包含文本字元串;也可以更複雜,可能包含嵌入對象。
消息被發送到隊列中。“消息隊列”是在消息的傳輸過程中儲存消息的容器。消息隊列管理器在将消息從它的源中繼到它的目标時充當中間人。隊列的主要目的是提供路由并保證消息的傳遞;如果發送消息時接收者不可用,消息隊列會保留消息,直到可以成功地傳遞它。
“消息隊列”是 microsoft 的消息處理技術,它在任何安裝了 microsoft windows 的計算機組合中,為任何應用程式提供消息處理和消息隊列功能,無論這些計算機是否在同一個網絡上或者是否同時聯機。
“消息隊列網絡”是能夠互相間來回發送消息的任何一組計算機。網絡中的不同計算機在確定消息順利處理的過程中扮演不同的角色。它們中有些提供路由資訊以确定如何發送消息,有些儲存整個網絡的重要資訊,而有些隻是發送和接收消息。
“消息隊列”安裝期間,管理者确定哪些伺服器可以互相通信,并設定特定伺服器的特殊角色。構成此“消息隊列”網絡的計算機稱為“站點”,它們之間通過“站點連結”互相連接配接。每個站點連結都有一個關聯的“開銷”,它由管理者确定,訓示了經過此站點連結傳遞消息的頻率。
“消息隊列”管理者還在網絡中設定一台或多台作為“路由伺服器”的計算機。路由伺服器檢視各站點連結的開銷,确定經過多個站點傳遞消息的最快和最有效的方法,以此決定如何傳遞消息。
2. 隊列類型(queue type)
有兩種主要的隊列類型:由您或網絡中的其他使用者建立的隊列和系統隊列。
使用者建立的隊列可能是以下任何一種隊列:
“公共隊列”在整個“消息隊列”網絡中複制,并且有可能由網絡連接配接的所有站點通路。
“專用隊列”不在整個網絡中釋出。相反,它們僅在所駐留的本地計算機上可用。專用隊列隻能由知道隊列的完整路徑名或标簽的應用程式通路。
“管理隊列”包含确認在給定“消息隊列”網絡中發送的消息回執的消息。指定希望 messagequeue 元件使用的管理隊列(如果有的話)。
“響應隊列”包含目标應用程式接收到消息時傳回給發送應用程式的響應消息。指定希望 messagequeue 元件使用的響應隊列(如果有的話)。
系統生成的隊列一般分為以下幾類:
“日記隊列”可選地存儲發送消息的副本和從隊列中移除的消息副本。每個“消息隊列”用戶端上的單個日記隊列存儲從該計算機發送的消息副本。在伺服器上為每個隊列建立了一個單獨的日記隊列。此日記跟蹤從該隊列中移除的消息。
“死信隊列”存儲無法傳遞或已過期的消息的副本。如果過期或無法傳遞的消息是事務性消息,則被存儲在一種特殊的死信隊列中,稱為“事務性死信隊列”。死信存儲在過期消息所在的計算機上。有關逾時期限和過期消息的更多資訊,請參見預設消息屬性。
“報告隊列”包含訓示消息到達目标所經過的路由的消息,還可以包含測試消息。每台計算機上隻能有一個報告隊列。
“專用系統隊列”是一系列存儲系統執行消息處理操作所需的管理和通知消息的專用隊列。
在應用程式中進行的大多數工作都涉及通路公共隊列及其消息。但是,根據應用程式的日記記錄、确認和其他特殊處理需要,在日常操作中很可能要使用幾種不同的系統隊列。
3. 同步和異步通信(synchronous vs. asynchronous communication)
隊列通信天生就是異步的,因為将消息發送到隊列和從隊列中接收消息是在不同的程序中完成的。另外,可以異步執行接收操作,因為要接收消息的人可以對任何給定的隊列調用 beginreceive 方法,然後立即繼續其他任務而不用等待答複。這與人們所了解的“同步通信”截然不同。
在同步通信中,請求的發送方在執行其他任務前,必須等待來自預定接收方的響應。發送方等待的時間完全取決于接收方處理請求和發送響應所用的時間。
4. 同消息隊列互動(interacting with message queues)
消息處理和消息為基于伺服器的應用程式元件之間的程序間通信提供了強大靈活的機制。同元件間的直接調用相比,它們具有若幹優點,其中包括:
穩定性 — 元件失敗對消息的影響程度遠遠小于元件間的直接調用,因為消息存儲在隊列中并一直留在那裡,直到被适當地處理。消息處理同僚務處理相似,因為消息處理是有保證的。
消息優先級 — 更緊急或更重要的消息可在相對不重要的消息之前接收,是以可以為關鍵的應用程式保證足夠的響應時間。
脫機能力 — 發送消息時,它們可被發送到臨時隊列中并一直留在那裡,直到被成功地傳遞。當因任何原因對所需隊列的通路不可用時,使用者可以繼續執行操作。同時,其他操作可以繼續進行,如同消息已經得到了處理一樣,這是因為網絡連接配接恢複時消息傳遞是有保證的。
事務性消息處理 — 将多個相關消息耦合為單個事務,確定消息按順序傳遞、隻傳遞一次并且可以從它們的目标隊列中被成功地檢索。如果出現任何錯誤,将取消整個事務。
安全性 — messagequeue 元件基于的消息隊列技術使用 windows 安全來保護通路控制,提供稽核,并對元件發送和接收的消息進行加密和驗證。
5. 在.net環境下編寫簡單的message queue程式
(1)先安裝message queuing services
通過control panel,“add/remove programs” – “add/remove windows components”步驟安裝msmq。
msmq可以安裝為工作組模式或域模式。如果安裝程式沒有找到一台運作提供目錄服務的消息隊列的伺服器,則隻可以安裝為工作組模式,此計算機上的“消息隊列”隻支援建立專用隊列和建立與其他運作“消息隊列”的計算機的直接連接配接。
(2)配置msmq
打開computer management – message queuing,在private queues下建立msmqdemo隊列
(3)代碼執行個體
1
public partial class form1 : form
2
{
3
private messagequeue queue = null;
4
private string queuepath = ".\\private$\\msmqdemo"; //"formatname:direct=tcp:10.129.3.133\\private$\\msmqdemo";
5
private ipaddress ipaddress;
6
public form1()
7
8
initializecomponent();
9
}
10
private void form1_load(object sender, eventargs e)
11
12
getipaddress();
13
//檢查隊列,如果隊列不存在,則建立
14
ensurequeueexists(queuepath);
15
16
queue = new system.messaging.messagequeue(queuepath);
17
18
private void getipaddress()
19
20
ipaddress = new system.net.ipaddress(dns.gethostbyname(dns.gethostname()).addresslist[0].address);
21
22
23
private void sendbutton_click(object sender, eventargs e)
24
25
//不允許發空消息
26
if (textbox1.text == "")
27
28
messagebox.show("發送消息不能為空!");
29
return;
30
}
31
try
32
33
// create message
34
system.messaging.message message = new system.messaging.message();
35
message.body = textbox1.text.trim();
36
message.formatter = new system.messaging.xmlmessageformatter(new type[]
{ typeof(string) });
37
// put message into queue
38
queue.send(message);
39
40
catch (exception ex1)
41
42
messagebox.show(ex1.tostring());
43
44
45
46
//判斷隊列是否存在
47
private static void ensurequeueexists(string path)
48
49
if (!messagequeue.exists(path))
50
51
if (!messagequeue.exists(path))
52
53
messagequeue.create(path);
54
messagequeue mqtemp = new messagequeue(path);
55
//everone全部權限
56
mqtemp.setpermissions("everyone", system.messaging.messagequeueaccessrights.fullcontrol);
57
}
58
59
}
60
//擷取消息隊列中的消息
61
public system.collections.arraylist getmessage()
62
63
system.messaging.messagequeue mq = new system.messaging.messagequeue(queuepath, false);
64
mq.formatter = new xmlmessageformatter(new type[]
65
system.messaging.message[] arrm = mq.getallmessages();
66
mq.close();
67
system.collections.arraylist al = new system.collections.arraylist();
68
if (arrm.length == 0)
69
return null;
70
foreach (system.messaging.message m in arrm)
71
72
al.add(m.body.tostring());
73
74
return al;
75
76
77
private void recivebutton_click(object sender, eventargs e)
78
79
//擷取全部消息
80
system.collections.arraylist msglist = getmessage();
81
//if (msglist != null)
82
//{
83
// foreach (object str in msglist)
84
// {
85
// textbox2.appendtext(str.tostring());
86
// }
87
//}
88
//直接讀取的方法
89
//system.messaging.messagequeue queue = new system.messaging.messagequeue(".\\private$\\msmqdemo");
90
// receive message, 同步的receive方法阻塞目前執行線程,直到一個message可以得到
91
system.messaging.message message = queue.receive();
92
message.formatter = new system.messaging.xmlmessageformatter(new type[]
93
textbox2.appendtext("ip:" + ipaddress.tostring() + "" + ": message:" + message.body.tostring());
94
95
}
運作結果: