RabbitMQ 的行為藝術

序
好像,今天已經是 2 月 28 号了。
聽說,29、30、31 号放假。
據說,有圖,有真相。
目錄
- 簡介
- 環境搭建
- 示例一:簡單的 Hello World
- 示例二:釋出/訂閱模式
- 嘗試發現 - 新物種 EasyNetQ
RabbitMQ:一個消息系統,基于 AMQP 系統協定,由 Erlang 語言開發。
優點:健壯、使用簡單、開源和支援各種流行的語言(如 Python、java、.NET)等。
MQ(Message Queue):消息隊列的簡稱,是一種應用程式之間的通信機制。
作用:将部分無需立即回調擷取結果,并且耗時的操作,使用異步處理的方式提高伺服器的吞吐量及性能。如:日志記錄。
圖:簡單的通信方式,及加入 MQ 後的變化
A 端:生産者将消息寫(插)入隊列;
MQ(隊列) :中間件,消息的載體;
B 端:消費者從隊列讀(取)出消息。
MQ 特點:消費者 - 生産者模型的一種表現形式。
1.官網下載下傳安裝包:http://www.rabbitmq.com/ ;
2.安裝時會提示你下載下傳 Erlang 語言環境;
3.啟動安裝完的服務:RabbitMQ;
4.在 cmd 中指向 sbin 目錄,并輸入以下指令,才能打開 WEB 管理界面:
rabbitmq-plugins enable rabbitmq_management
5.預設 url:http://localhost:15672/#/
P(Producer):生産者,意味着發送;
Queue:隊列,本質上是一個無限的緩沖區,可以儲存盡可能多的資訊;
C(Consumer):消費者,等待并接收消息。
【備注】生産者和消費者不需要駐留在同一台伺服器上。
Producer.cs
1 public class Producer
2 {
3 public static void Send()
4 {
5 var factory = new ConnectionFactory { HostName = "localhost" };
6
7 //建立連接配接對象,基于 Socket
8 using (var connection = factory.CreateConnection())
9 {
10 //建立新的管道、會話
11 using (var channel = connection.CreateModel())
12 {
13 //聲明隊列
14 channel.QueueDeclare(queue: "hello", //隊列名
15 durable: false, //持久性
16 exclusive: false, //排他性
17 autoDelete: false, //自動删除
18 arguments: null);
19
20 const string message = "Hello World!";
21 var body = Encoding.UTF8.GetBytes(message);
22
23 channel.BasicPublish(exchange: "", //交換機名
24 routingKey: "hello", //路由鍵
25 basicProperties: null,
26 body: body);
27 }
28 }
29 }
30 }
【備注】隊列名如果已存在,将不會重複建立。假設隊列已存在,修改 channel.QueueDeclare() 方法内的參數後啟動會出現異常。
【備注】消息内容是一個位元組數組。
Consumer.cs
1 class Consumer
2 {
3 public static void Receive()
4 {
5 var factory = new ConnectionFactory() { HostName = "localhost" };
6
7 using (var connection = factory.CreateConnection())
8 {
9 using (var channel = connection.CreateModel())
10 {
11 channel.QueueDeclare(queue: "hello",
12 durable: false,
13 exclusive: false,
14 autoDelete: false,
15 arguments: null);
16
17 //建立基于該隊列的消費者,綁定事件
18 var consumer = new EventingBasicConsumer(channel);
19 consumer.Received += (model, ea) =>
20 {
21 var body = ea.Body; //消息主體
22 var message = Encoding.UTF8.GetString(body);
23 Console.WriteLine(" [x] Received {0}", message);
24 };
25
26 //啟動消費者
27 channel.BasicConsume(queue: "hello", //隊列名
28 noAck: true, //false:手動應答;true:自動應答
29 consumer: consumer);
30
31 Console.Read();
32 }
33 }
34 }
35 }
【疑問】在消費者的類裡面為什麼會再次聲明隊列(channel.QueueDeclare())呢?-- 因為接收方可能會在發送方啟動前啟動,這是出于保險起見。
1.Exchange 交換機和 Exchange Type 交換類型
RabbitMQ 消息傳遞模型的核心思想是,生産者不會直接将消息發給隊列。
這裡我們将引入新的名詞 Exchange(交換機)。交換機傳遞消息的類型也有很多種:direct, topic, headers(不常用) 和 fanout,我們稱之為交換類型。
圖:Direct
圖:Fanout
圖:Topic
--上述 3 張圖來源:http://m.blog.csdn.net/article/details?id=52262850
這裡,建立一個名為 “logs” 的交換機,它的類型為廣播類型(fanout:可以将收到的所有消息,廣播給所有已知的隊列)。
channel.ExchangeDeclare(exchange: "logs", //交換機名
type: "fanout"); //交換類型
2.臨時隊列
作為消費者,我們有時候隻需要一些新的(或者空的)隊列,此時,更好的方式就是讓它自動生成一個随機名字的隊列;其次,當隊列連接配接中斷時會選擇自動删除對應的消費者。
建立一個非持久,有排他性和自動删除特性的隊列(無參時)。
var queueName = channel.QueueDeclare().QueueName;
3.Binding 綁定
【疑問】有了 Exchange 和 channel,這時,還需要什麼東西呢?-- 我們要建立 Exchange 和 channel 關系的橋梁,這個橋梁稱之為 Binding(綁定)。
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
1 class Producer
2 {
3 public static void Send()
4 {
5 var factory = new ConnectionFactory()
6 {
7 HostName = "localhost",
8 Port = 5672,
9 UserName = "guest",
10 Password = "guest"
11 };
12
13 using (var connection = factory.CreateConnection())
14 {
15 using (var channel = connection.CreateModel())
16 {
17 channel.ExchangeDeclare(exchange: "logs", //交換機名
18 type: "fanout"); //交換類型
19
20 // Guid
21 var message = Guid.NewGuid().ToString();
22 var body = Encoding.UTF8.GetBytes(message);
23 channel.BasicPublish(exchange: "logs",
24 routingKey: "",
25 basicProperties: null,
26 body: body);
27
28 Console.WriteLine(" [x] Sent {0}", message);
29 }
30
31 Console.WriteLine(" Press [enter] to exit.");
32 Console.ReadLine();
33 }
34 }
35 }
Producer.cs //生産者
1 class Reciver
2 {
3 public static void Recive()
4 {
5 var factory = new ConnectionFactory()
6 {
7 HostName = "localhost",
8 Port = 5672,
9 UserName = "guest",
10 Password = "guest"
11 };
12
13 using (var connection = factory.CreateConnection())
14 using (var channel = connection.CreateModel())
15 {
16 channel.ExchangeDeclare(exchange: "wen_logs", //交換機名
17 type: "fanout"); //交換類型
18
19 //建立隊列
20 var queueName = channel.QueueDeclare().QueueName;
21 channel.QueueBind(queue: queueName,
22 exchange: "wen_logs",
23 routingKey: "");
24
25 Console.WriteLine(" [*] Waiting for logs.");
26
27 var consumer = new EventingBasicConsumer(channel);
28 consumer.Received += (model, ea) =>
29 {
30 var body = ea.Body;
31 var message = Encoding.UTF8.GetString(body);
32 Console.WriteLine(" [x] {0}", message);
33 };
34 channel.BasicConsume(queue: queueName,
35 noAck: true,
36 consumer: consumer);
37
38 Console.WriteLine(" Press [enter] to exit.");
39 Console.ReadLine();
40 }
41 }
42 }
Reciver.cs //接收者
這都不是事!EasyNetQ,看名字就知道,搞定 MQ,So easy!
連接配接 RabbitMQ 代理:
var bus = RabbitHutch.CreateBus("host=localhost");
釋出:
bus.Publish(message);
訂閱:
bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
下面我們通過 Demo 來感受一下 Easy 的程度吧,建立項目(效果圖如下,附 Demo 下載下傳):
Wen.EasyNetQDemo.Model:類庫
Wen.EasyNetQDemo.Publisher,Wen.EasyNetQDemo.Subscriber:控制台應用程式,都使用 Nuget 直接安裝
EasyNetQ 包,都引用類庫 Model。
Demo.cs
public class Demo
{
public string Message { get; set; }
}
Publisher
1 using System;
2 using EasyNetQ;
3 using Wen.EasyNetQDemo.Model;
4
5 namespace Wen.EasyNetQDemo.Publisher
6 {
7 internal class Program
8 {
9 private static void Main(string[] args)
10 {
11 using (var bus = RabbitHutch.CreateBus("host=localhost"))
12 {
13 string input;
14 Console.WriteLine("請輸入資訊。 如果是“esc” 将退出目前視窗。");
15
16 while ((input = Console.ReadLine()) != "esc")
17 {
18 bus.Publish(new Demo
19 {
20 Message = input
21 });
22 }
23
24 }
25 }
26 }
27 }
【備注】RabbitHutch.CreateBus() 方法可以建立一個簡單的釋出/訂閱和包含請求/響應 API 的消息總線。
Subscriber
1 using System;
2 using EasyNetQ;
3 using Wen.EasyNetQDemo.Model;
4
5 namespace Wen.EasyNetQDemo.Subscriber
6 {
7 internal class Program
8 {
9 private static void Main(string[] args)
10 {
11 using (var bus = RabbitHutch.CreateBus("host=localhost"))
12 {
13 bus.Subscribe<Demo>("test", HandleDemo);
14
15 Console.WriteLine("監聽資訊中...輸入“return”将退出目前視窗!");
16 Console.ReadLine();
17 }
18 }
19
20 private static void HandleDemo(Demo demo)
21 {
22 Console.ForegroundColor = ConsoleColor.Green;
23 Console.WriteLine($"Got message: {demo.Message}");
24 Console.ResetColor();
25 }
26 }
27 }
圖:效果圖
「世事洞明皆學問 人情練達即文章」
【部落客】反骨仔
【原文】http://www.cnblogs.com/liqingwen/p/6412089.html