1、什麼是RabbitMQ。詳見 http://www.rabbitmq.com/ 。
作用就是提高系統的并發性,将一些不需要及時響應用戶端且占用較多資源的操作,放入隊列,再由另外一個線程,去異步處理這些隊列,可極大的提高系統的并發能力。
2、安裝
RabbitMQ服務: http://www.rabbitmq.com/download.html 。
(安裝完RabbitMQ服務後,會在Windows服務中看到。如果沒有Erlang運作環境,在安裝過程中會提醒先安裝Erlang環境。)
.net用戶端類庫: http://www.rabbitmq.com/dotnet.html
3、插件
RabbitMQ提供了很多好用的插件,最常用的就是web管理工具,啟動此插件。
CMD中運作指令:rabbitmq-plugins enable rabbitmq_management
注:rabbitmq-plugins 所在路徑為:D:Program FilesRabbitMQ Server
abbitmq_server-3.4.0sbin
修改plugins db log路徑
需要建立rabbitmq-env.conf:
RABBITMQ_MNESIA_BASE=e:/rabbitmq/db
RABBITMQ_LOG_BASE=e:/rabbitmq/log
RABBITMQ_PLUGINS_DIR=e:/rabbitmq/plugins
4、配置
配置檔案位址為:C:Documents and SettingsAdministratorApplication DataRabbitMQ
abbitmq.config,預設沒有rabbit.config檔案,需要手工建立(預設會有rabbitmq.config.example 作為參考)。基于安全,做了兩個配置,如下:
[
{rabbit,
[
{loopback_users, [<<"guest">>]},
{tcp_listeners, [{"127.0.0.1", 1234},
{"172.31.26.24", 8009}]},
{vm_memory_high_watermark, 0.5},
{disk_free_limit,1000000000}
]}
].
- The web UI is located at: http://server-name:15672/
loopback_users:設定隻能在與RabbitMq服務同一台機器上通路服務的使用者。
tcp_listeners:設定RabbitMQ監聽的IP位址與端口。隻監聽區域網路内網iP、修改預設端口,防止被入侵攻擊。
設定完後,别忘記了以下操作,否則配置不起作用。
- 停止RabbitMQ服務;
- 重新安裝服務使配置生效:rabbitmq-service.bat install
此指令要切換到路徑:D:Program FilesRabbitMQ Server
abbitmq_server-3.4.0sbin
- 啟動RabbitMQ服務;
5、Demo練習。
消息生産者:
class Program
{
static void Main(string[] args)
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = Constants.MqHost;
factory.Port = Constants.MqPort;
factory.UserName = Constants.MqUserName;
factory.Password = Constants.MqPwd;
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//在MQ上定義一個持久化隊列,如果名稱相同不會重複建立
channel.QueueDeclare("MyFirstQueue", true, false, false, null);
while (true)
{
string customStr = Console.ReadLine();
RequestMsg requestMsg = new RequestMsg();
requestMsg.Name = string.Format("Name_{0}", customStr);
requestMsg.Code = string.Format("Code_{0}", customStr);
string jsonStr = JsonConvert.SerializeObject(requestMsg);
byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
//設定消息持久化
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.BasicPublish("", "MyFirstQueue", properties, bytes);
//channel.BasicPublish("", "MyFirstQueue", null, bytes);
Console.WriteLine("消息已發送:" + requestMsg.ToString());
}
}
}
}
catch (Exception e1)
{
Console.WriteLine(e1.ToString());
}
Console.ReadLine();
}
}
class Program
{
static void Main(string[] args)
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = Constants.MqHost;
factory.Port = Constants.MqPort;
factory.UserName = Constants.MqUserName;
factory.Password = Constants.MqPwd;
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//在MQ上定義一個持久化隊列,如果名稱相同不會重複建立
channel.QueueDeclare("MyFirstQueue", true, false, false, null);
//輸入1,那如果接收一個消息,但是沒有應答,則用戶端不會收到下一個消息
channel.BasicQos(0, 1, false);
Console.WriteLine("Listening...");
//在隊列上定義一個消費者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
//消費隊列,并設定應答模式為程式主動應答
channel.BasicConsume("MyFirstQueue", false, consumer);
while (true)
{
//阻塞函數,擷取隊列中的消息
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
byte[] bytes = ea.Body;
string str = Encoding.UTF8.GetString(bytes);
RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
Console.WriteLine("HandleMsg:" + msg.ToString());
//回複确認
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
catch (Exception e1)
{