本次和大家分享的是RabbitMQ隊列的用法,前一篇文章 隊列工廠之(MSMQ) 中在描述的時候已經搭建了簡單工廠,是以本章内容是在其之上擴充的子項不再過多講解工廠的代碼了;RabbitMQ應該是現在網際網路公司消息隊列用的最多的一種之一吧,看看招聘基本都會有這個單詞的出現,她相比前一篇分享的MSMQ來說配置更多樣化,安裝步驟數兩者都差不多吧,最大差别MSMQ是windows捆綁的服務幾乎隻能在windows上使用,而Rabbit現目前運作支援的系統比較多;在寫隊列工廠第二篇文章的時候,其實代碼已經都完成了目前隊列工廠包括有如下隊列(msmq,redis,rabbitmq),你可以去下載下傳源碼和測試用例: QueueReposity-隊列工廠
;希望大家能夠喜歡,也希望各位多多"掃碼支援"和"推薦"謝謝!
» RabbitMQ安裝和控制台
» 封裝RabbitMQ隊列的讀和寫
» 隊列工廠之RabbitMQ測試用例
下面一步一個腳印的來分享:
要說RabbitMQ的安裝,首先我們要下載下傳對應伺服器作業系統的RabbitMQ安裝檔案,因為她有對應不同作業系統的安裝版本,這點需要注意;我本地電腦系統是win7(屬于windows)是以去官網下載下傳安裝包:https://www.rabbitmq.com/,進入官網後選擇“Installation”,能看到很多安裝版本的下載下傳源,這裡可選擇從Downloads on rabbitmq.com下載下傳,點選“windows”即可下載下傳:

目前最新版本位址:
https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6.exe;
通常我都是進入這個界面的Installation Guides節點後-》With installer (recommended),這個時候進入的是windows系統所需的幫助文檔吧,同樣可以選擇版本下載下傳;進入該界面的主要目的是需要下載下傳一個Erlang Windows的安裝檔案(更深層原因:RabbitMq運作依賴于Erlang語言),點選Erlang Windows Binary File進入下載下傳界面,然後選擇您作業系統對應的版本,如果您也是windows64位的可以直接用這個位址下載下傳:
http://erlang.org/download/otp_win64_19.2.exe
此刻兩個必須的東西已經下載下傳完成,先安裝erlang語言的exe,再安裝rabbitmq-server-3.6.6.exe;有剛開始接觸RabbitMQ的朋友會問為什麼需要Erlang的支援,因為她就是Erlang開發出來的,Erlang語言是一種通用的面向并發的程式設計語言,專門用來編寫分布式的一種語言;當你安裝前面說的那個erlang安裝包後,您電腦開始菜單中就有Erlang開發編輯器,有時間您可以用來練練手,就目前而言這種語言單詞一般出現在一流大公司的招聘中,中小型一般沒有,可能也因為很少中小型公司會涉及到并發的原因吧;到這裡安裝就完成了,下面需要通過指令行執行一些指令,由于RabbitMQ配置很多這裡我撿一定會用到的幾個來示範,其他具體可以參考官方文檔:
首先找到安裝rabbitmq的目錄并進入rabbitmq_server-3.6.6找到sbin檔案夾-》按住Shift+滑鼠右鍵sbin檔案夾-》在此處打開指令窗體-》參考這個位址https://www.rabbitmq.com/management.html的指令:rabbitmq-plugins enable rabbitmq_management -》錄入到剛才打開的cmd窗體中:
這個是開啟rabbitmq管理器的指令,這個時候你可以在你浏覽器中錄入http://localhost:15672/ 通過遊客賬号進入rabbitmq的監控背景:
url:http://localhost:15672/
Username:guest
Password:guest
此刻如果你看到如下圖的界面,那恭喜你成功了,搭建RabbitMQ服務成功了:
上面的guest已經夠咋們測試使用了,至于剩餘的什麼管理者賬号或密碼等操作的設定可以去看這個:
rabbitmqctl操作的文檔:https://www.rabbitmq.com/man/rabbitmqctl.1.man.html#
plugins操作文檔:https://www.rabbitmq.com/plugins.html
在C#中運用RabbitMQ官網列舉了幾種方式,這裡我選擇直接使用其提供的RabbitMQ.Client的nuget包,就目前這個nuget而言4.0.0及以上版本必須要NETFramework 4.5.1及以上版本或netcore版本才允許使用,筆者這裡用的是Framework4.5架構是以引用了此版本的nuget包:
Install-Package RabbitMQ.Client -Version 3.6.6
引用過後就是往前面講的隊列工廠填寫代碼,首先繼承統一配置檔案讀取類 PublicClass.ConfClass<QRabbitMQ> ,然後實作 IQueue 接口,這裡封裝了RabbitMq常用的幾個操作方法,具體代碼:
1 /// <summary>
2 /// RabbitMq
3 /// </summary>
4 public class QRabbitMQ : PublicClass.ConfClass<QRabbitMQ>, IQueue
5 {
6 private IConnection con = null;
7
8 public void Create()
9 {
10 if (string.IsNullOrWhiteSpace(this.ApiUrl) || string.IsNullOrWhiteSpace(this.ApiKey)) { throw new Exception("建立RabbitMq隊列需要指定隊列:HostName和Port"); }
11
12 try
13 {
14 var factory = new ConnectionFactory() { HostName = this.ApiUrl, Port = Convert.ToInt32(this.ApiKey) };
15 con = con ?? factory.CreateConnection();
16 }
17 catch (Exception ex)
18 {
19 throw new Exception(ex.Message);
20 }
21 }
22
23 public long Total(string name = "Redis_01")
24 {
25 if (con == null) { throw new Exception("請先建立隊列連接配接"); }
26 using (var channel = con.CreateModel())
27 {
28 return channel.MessageCount(name);
29 }
30 }
31
32 public Message Read(string name = "RabbitMQ_01")
33 {
34 if (con == null) { throw new Exception("請先建立隊列連接配接"); }
35 if (string.IsNullOrWhiteSpace(name)) { throw new Exception("name不能為空"); }
36
37 var message = new Message();
38 message.Label = name;
39 message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
40 using (var channel = con.CreateModel())
41 {
42 var baseResult = channel.BasicGet(name, true); //true:擷取後删除隊列 false:不删除
43 if (baseResult == null) { return message; }
44 var body = baseResult.Body;
45 message.Body = Encoding.UTF8.GetString(body);
46 }
47 return message;
48 }
49
50 public bool Write(string content, string name = "RabbitMQ_Queue01")
51 {
52 if (con == null) { throw new Exception("請先建立隊列連接配接"); }
53 if (string.IsNullOrWhiteSpace(content) || string.IsNullOrWhiteSpace(name)) { throw new Exception("content和name不能為空"); }
54
55 using (var channel = con.CreateModel())
56 {
57 channel.QueueDeclare(name, false, false, false, null);
58 var body = Encoding.UTF8.GetBytes(content);
59
60 channel.BasicPublish(string.Empty, name, null, body);
61 return true;
62 }
63 }
64
65 public void Dispose()
66 {
67 if (con != null)
68 {
69 con.Close();
70 con.Dispose();
71 con = null;
72 }
73 }
74 }
代碼主要使用流程是:建立(Create)-》讀(Read)|寫(Write)-》釋放(Dispose);有了具體的RabbitMq實作類,那麼在工廠中直接通過泛型映射來擷取該實作類的對象:
1 /// <summary>
2 /// ==================
3 /// author:神牛步行3
4 /// des:該列工廠開源,包括隊列有MSMQ,RedisMQ,RabbitMQ
5 /// blogs:http://www.cnblogs.com/wangrudong003/
6 /// ==================
7 /// 隊列工廠
8 /// </summary>
9 public class QueueReposity<T> where T : class,IQueue, new()
10 {
11 public static IQueue Current
12 {
13 get
14 {
15 return PublicClass.ConfClass<T>.Current;
16 }
17 }
18 }
通過上面配置環境和封裝自己的方法,這裡寫了一個簡單的測試用例,分為Server(加入消息隊列)和Client(擷取消息隊列),首先來看Server端的代碼:
1 /// <summary>
2 /// 隊列服務端測試用例
3 /// </summary>
4 class Program
5 {
6 static void Main(string[] args)
7 {
8 //Redis_Server();
9
10 RabbitMQ_Server();
11
12 //MSMQ_Server();
13 }
14 private static void RabbitMQ_Server()
15 {
16 //執行個體化QMsmq對象
17 var mq = QueueReposity<QRabbitMQ>.Current;
18
19 try
20 {
21 Console.WriteLine("Server端建立:RabbitMQ執行個體");
22 mq.Create();
23
24 var num = 0;
25 do
26 {
27 Console.WriteLine("輸入循環數量(數字,0表示結束):");
28 var readStr = Console.ReadLine();
29 num = string.IsNullOrWhiteSpace(readStr) ? 0 : Convert.ToInt32(readStr);
30
31 Console.WriteLine("插入資料:");
32 for (int i = 0; i < num; i++)
33 {
34 var str = "我的編号是:" + i;
35 mq.Write(str);
36 Console.WriteLine(str);
37 }
38 } while (num > 0);
39 }
40 catch (Exception ex)
41 {
42 }
43 finally
44 {
45 Console.WriteLine("釋放。");
46 mq.Dispose();
47 }
48 Console.ReadLine();
49 }
50 }
通過:建立(Create)-》讀(Read)|寫(Write)-》釋放(Dispose) 的流程來使用我們的隊列工廠,感覺挺簡單的,此時我們運作下這個Server端,然後錄入參數:
這個時候就往RabbitMq隊列中加入了11條資料,我們通過她的背景去找剛才添加的隊列:
能夠看到我們剛剛插入的隊列總數和名稱,如果你想看裡面具體内容,可以點選名字“mq_01”進入某一個隊列的界面,往下面拉滾動條找到“Get messages”選項,預設檢視Messages是1我們修改為10,再點選get messages就能夠看到如下圖我們剛才插入的具體内容了:
截圖有點長哦,不知道dudu會不會怪我哈哈,到這裡能看到隊列插入是成功的,然後我們來通過client端消費隊列,具體代碼:
1 /// <summary>
2 /// 隊列用戶端測試用例
3 /// </summary>
4 class Program
5 {
6 static void Main(string[] args)
7 {
8 //RedisMQ_Client();
9
10 RabbitMQ_Client();
11
12 //MSMQ_Client();
13 }
14
15 private static void RabbitMQ_Client()
16 {
17 //執行個體化QMsmq對象
18 var mq = QueueReposity<QRabbitMQ>.Current;
19 try
20 {
21 Console.WriteLine("Client端建立:RabbitMQ執行個體");
22 mq.Create();
23
24 while (true)
25 {
26 try
27 {
28 var total = mq.Total();
29 if (total > 0) { Console.WriteLine("隊列條數:" + total); }
30
31 var result = mq.Read();
32 if (result.Body == null) { continue; }
33 Console.WriteLine(string.Format("接受隊列{0}:{1}", result.Label, result.Body));
34 }
35 catch (Exception ex)
36 { Console.WriteLine("異常資訊:" + ex.Message); }
37 }
38 }
39 catch (Exception ex)
40 {
41 throw ex;
42 }
43 finally
44 {
45 Console.WriteLine("釋放。");
46 mq.Dispose();
47 }
48 }
49 }
再來咋們運作exe看下效果:
此刻剛剛加入隊列中的資料就讀取出來了,這個時候我們再看Rabbitmq控制台,get messages已經擷取不出來具體的内容資訊了,因為這個用戶端消費了資料,隊列中的資料自動清除了,至于是否想清除資料這個設定在代碼:
1 var baseResult = channel.BasicGet(name, true); //true:擷取後删除隊列 false:不删除
以上對封裝RabbitMQ的代碼分享和環境搭建講解,希望能給您帶來好的幫助,謝謝閱讀;