天天看點

安裝 RabbitMQ C#使用-摘自網絡(包括RabbitMQ的配置)

1、什麼是RabbitMQ。詳見 http://www.rabbitmq.com/ 。

作用就是提高系統的并發性,将一些不需要及時響應用戶端且占用較多資源的操作,放入隊列,再由另外一個線程,去異步處理這些隊列,可極大的提高系統的并發能力。

2、安裝

RabbitMQ服務: http://www.rabbitmq.com/download.html 。

(安裝完RabbitMQ服務後,會在Windows服務中看到。如果沒有Erlang運作環境,在安裝過程中會提醒先安裝Erlang環境。)

.net用戶端類庫: http://www.rabbitmq.com/dotnet.html

3、插件

RabbitMQ提供了很多好用的插件,最常用的就是web管理工具,啟動此插件。

CMD中運作指令:rabbitmq-plugins enable rabbitmq_management

注:rabbitmq-plugins 所在路徑為:D:Program FilesRabbitMQ Server

abbitmq_server-3.4.0sbin

修改plugins db log路徑

需要建立rabbitmq-env.conf:

RABBITMQ_MNESIA_BASE=e:/rabbitmq/db

RABBITMQ_LOG_BASE=e:/rabbitmq/log

RABBITMQ_PLUGINS_DIR=e:/rabbitmq/plugins

4、配置

配置檔案位址為:C:Documents and SettingsAdministratorApplication DataRabbitMQ

abbitmq.config,預設沒有rabbit.config檔案,需要手工建立(預設會有rabbitmq.config.example 作為參考)。基于安全,做了兩個配置,如下:

[

{rabbit,

[

{loopback_users, [<<"guest">>]},

{tcp_listeners, [{"127.0.0.1", 1234},

{"172.31.26.24", 8009}]},

{vm_memory_high_watermark, 0.5},

{disk_free_limit,1000000000}

]}

].

  • The web UI is located at: http://server-name:15672/

loopback_users:設定隻能在與RabbitMq服務同一台機器上通路服務的使用者。

tcp_listeners:設定RabbitMQ監聽的IP位址與端口。隻監聽區域網路内網iP、修改預設端口,防止被入侵攻擊。

設定完後,别忘記了以下操作,否則配置不起作用。

  • 停止RabbitMQ服務;
  • 重新安裝服務使配置生效:rabbitmq-service.bat install

此指令要切換到路徑:D:Program FilesRabbitMQ Server

abbitmq_server-3.4.0sbin

  • 啟動RabbitMQ服務;

5、Demo練習。

消息生産者:

class Program
  {
    static void Main(string[] args)
    {
      try
      {
        ConnectionFactory factory = new ConnectionFactory();
        factory.HostName = Constants.MqHost;
        factory.Port = Constants.MqPort;
        factory.UserName = Constants.MqUserName;
        factory.Password = Constants.MqPwd;
        using (IConnection conn = factory.CreateConnection())
        {
          using (IModel channel = conn.CreateModel())
          {
            //在MQ上定義一個持久化隊列,如果名稱相同不會重複建立
            channel.QueueDeclare("MyFirstQueue", true, false, false, null);
            while (true)
            {
              string customStr = Console.ReadLine();
              RequestMsg requestMsg = new RequestMsg();
              requestMsg.Name = string.Format("Name_{0}", customStr);
              requestMsg.Code = string.Format("Code_{0}", customStr);
              string jsonStr = JsonConvert.SerializeObject(requestMsg);
              byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
              
              //設定消息持久化
              IBasicProperties properties = channel.CreateBasicProperties();
              properties.DeliveryMode = 2;
              channel.BasicPublish("", "MyFirstQueue", properties, bytes);

              //channel.BasicPublish("", "MyFirstQueue", null, bytes);

              Console.WriteLine("消息已發送:" + requestMsg.ToString());
            }
          }
        }
      }
      catch (Exception e1)
      {
        Console.WriteLine(e1.ToString());
      }
      Console.ReadLine();
    }
  }      
class Program
  {
    static void Main(string[] args)
    {
      try
      {
        ConnectionFactory factory = new ConnectionFactory();
        factory.HostName = Constants.MqHost;
        factory.Port = Constants.MqPort;
        factory.UserName = Constants.MqUserName;
        factory.Password = Constants.MqPwd;
        using (IConnection conn = factory.CreateConnection())
        {
          using (IModel channel = conn.CreateModel())
          {
            //在MQ上定義一個持久化隊列,如果名稱相同不會重複建立
            channel.QueueDeclare("MyFirstQueue", true, false, false, null);

            //輸入1,那如果接收一個消息,但是沒有應答,則用戶端不會收到下一個消息
            channel.BasicQos(0, 1, false);
            
            Console.WriteLine("Listening...");

            //在隊列上定義一個消費者
            QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
            //消費隊列,并設定應答模式為程式主動應答
            channel.BasicConsume("MyFirstQueue", false, consumer);

            while (true)
            {
              //阻塞函數,擷取隊列中的消息
              BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
              byte[] bytes = ea.Body;
              string str = Encoding.UTF8.GetString(bytes);
              RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
              Console.WriteLine("HandleMsg:" + msg.ToString());
              //回複确認
              channel.BasicAck(ea.DeliveryTag, false);
            }
          }
        }
      }
      catch (Exception e1)
      {