天天看點

RabbitMQ 安裝與使用

前言

吃多了拉就是隊列,吃飽了吐就是棧

  • 使用場景
    • 對操作的實時性要求不高,而需要執行的任務極為耗時;(發送短信,郵件提醒,更新文章閱讀計數,記錄使用者記錄檔)
    • 存在異構系統間的整合;

安裝

  • 下載下傳 Erlang
    • 安裝完确定ERLANG_HOME環境變量是否添加,否則:

      Setx ERLANG_HOME “D:\Program Files\erl8.2″

  • 下載下傳安裝包
    • 安裝完通過

      rabbitmqctl status

      确定rabbitmq狀态
  • 管理服務
    • 預設安裝成功會自動啟動服務
    • 通過開始菜單可以啟動,停止,解除安裝服務
  • 占用端口
    • 4369(叢集、Erlang)
    • 5671,5672(應用層标準進階消息隊列協定)
    • 25672(Erlang分發,CLI通信)
    • 15672(如果管理插件啟用)
    • 61613,61614(如果消息文本協定STOMP已啟用)
    • 1883,8883(如果erl實時通信已啟用)
  • 支援的平台
    • 基于Ubuntu和Debian的Linux發行版
    • 基于Fedora,CentOS和RPM的Linux發行版
    • Mac OS X
    • Windows XP及更高版本

概念

  • Connections:用戶端連接配接,建立該資源非常耗時,應盡量避免多次建立。
  • Channel:消息通道,在用戶端的每個連接配接裡,可建立多個channel,每個channel代表一個會話任務。
  • Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。
  • Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
  • Broker:簡單來說就是消息隊列伺服器實體。
  • Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
  • Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  • vhost:虛拟主機,一個broker裡可以開設多個vhost,用作不同使用者的權限分離。
  • producer:消息生産者,就是投遞消息的程式。
  • consumer:消息消費者,就是接收消息的程式。

消息隊列的發送過程大概如下:

  1. 用戶端建立Connection,連接配接到消息隊列伺服器,打開一個channel。
  2. 用戶端聲明一個Exchange,并設定相關屬性。
  3. 用戶端聲明一個Queue,并設定相關屬性。
  4. 用戶端使用routing key,在exchange和queue之間建立好綁定關系。
  5. 用戶端發送消息首先到exchange
  6. exchange根據type路由到對應的隊列(可以是多個隊列)中.

Exchange Type

  • direct(直連)
    • routing key 與 binding key相同
  • fanout
    • 給所有綁定隊列發送消息
  • topic
    • routing key:audit.irs.corporate => binding key:audit.#
    • routing key:audit.irs => binding key:audit.*
  • default
    • direct
    • binding key為queue名稱

常用指令

  • 管理插件
    • rabbitmq-plugins enable rabbitmq_management

       // 啟用
    • rabbitmq-plugins disable rabbitmq_management

       // 禁用
  • 管理隊列
    • rabbitmqctl list_queues

       // 檢視隊列
  • 管理使用者及權限
    • rabbitmqctl list_users

       // 檢視所有使用者
    • rabbitmqctl add_user user_admin passwd_admin

       // 添加使用者
    • rabbitmqctl set_user_tags user_admin administrator

       // 添權重限
    • rabbitmqctl delete_user guest

       // 删除使用者
    • rabbitmqctl change_password {username} {newpassowrd}

       // 修改密碼
  • 管理虛拟主機vhost
    • rabbitmqctl add_vhost vhostpath

       // 建立虛拟主機
    • rabbitmqctl delete_vhost vhostpath

       // 删除虛拟主機
    • rabbitmqctl list_vhosts

       // 列出所有虛拟主機

使用

  • 發送消息(以持久化代碼為例)
var factory = new ConnectionFactory
{
    HostName = hostName,                // rabbit server
    UserName = "admin",
    Password = "admin",
    Port = 5672,                        // Broker端口
    VirtualHost = "/"                   // 虛拟Host,需提前配置
};

using (var connection = factory.CreateConnection()) // 建立與RabbitMQ伺服器的連接配接
{
    using (var channel = connection.CreateModel())  // 建立1個Channel(大部分API在該Channel中)
    {
        // 定義1個隊列,自動會和預設的exchange 做direct類型綁定
        channel.QueueDeclare(
            queue: "hello",                     // 隊列名稱
            durable: true,                      // 隊列是否持久化
            exclusive: false,                   // 排他隊列:如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接配接可見,并在連接配接斷開時自動删除。(活動在一次連接配接内)
            autoDelete: false,                  // 自動删除:當最後一個消費者取消訂閱時,隊列自動删除。如果您需要僅由一個使用者使用的臨時隊列,請将自動删除與排除。當消費者斷​​開連接配接時,隊列将被删除。(至少消費者能連一次)
            arguments: null);                   // 配置參數

        var randomQueue = channel.QueueDeclare();                   // 定義随機的隊列 該隊列為臨時隊列(排他隊列 + 自動删除)


        // 定義Exchange(一般而言,不需要定義exchange,rabbitmq預設建立了所有類型的exchange)
        //channel.ExchangeDeclare("direct-demo", ExchangeType.Direct);    // 定義direct exchange
        //channel.ExchangeDeclare("fannout-demo", ExchangeType.Fanout);   // 定義fanout exchange
        //channel.ExchangeDeclare("topic-demo", ExchangeType.Topic);      // 定義fanout exchange


        // 定義queue exchange key 關系(在某些業務場景下,會使用該關系做路由功能)
        //channel.QueueBind(queue: "hello", exchange: "amq.direct", routingKey: "hello"); // 預設綁定的關系和該行代碼效果一樣
        //channel.QueueBind("hello", "amq.fanout", "hello");                              // 該類型下的routingKey 實際不需要

        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        while (true)
        {
            string message = "Hello World!" + DateTime.Now;
            var body = Encoding.UTF8.GetBytes(message);

            // 發送消息到隊列中
            channel.BasicPublish(
                exchange: string.Empty,         // 傳遞為Empty的時候,通過   `(AMQP default)`傳遞
                routingKey: "hello",            // routing key 與 queuebind中的binding key對應
                basicProperties: properties,    // 消息header
                body: body);                    // 消息body:發送的是bytes 可以任意編碼

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}           
  • 接收消息(以消息響應為例)
    var factory = new ConnectionFactory
    {
    HostName = hostName,                // rabbit server
    UserName = "admin",
    Password = "admin",
    Port = 5672,                        // Broker端口
    VirtualHost = "/"                   // 虛拟Host,需提前配置
    };
    using (var connection = factory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
        var consumer = new EventingBasicConsumer(channel);  // 建立Consumer
        consumer.Received += (model, ea) =>         // 通過回調函數異步推送我們的消息
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            Thread.Sleep(1000);
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); // 消息響應
            Console.WriteLine(" [x] Received {0}", message);
        };
    
        channel.BasicQos(0, 1, false);  // 設定perfetchCount=1 。這樣就告訴RabbitMQ 不要在同一時間給一個工作者發送多于1個的消息
        channel.BasicConsume(queue: "hello",
                                noAck: false,      // 需要消息響應(Acknowledgments)機制
                                consumer: consumer);
    
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
    }           
  • 消息響應(acknowledgments)
    • 為了防止消息丢失,RabbitMQ提供了消息響應(acknowledgments)機制。消費者會通過一個ack(響應),告訴RabbitMQ已經收到并處理了某條消息,然後RabbitMQ才會釋放并删除這條消息。
  • 持久化
    • 新隊列(無法修改隊列)配置為可持久化
    • 發送消息配置為持久化
    • 消息什麼時候刷到磁盤?
      • 寫入檔案前會有一個Buffer,大小為1M,資料在寫入檔案時,首先會寫入到這個Buffer,如果Buffer已滿,則會将Buffer寫入到檔案(未必刷到磁盤)。
      • 固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每個25ms,Buffer裡的資料及未重新整理到磁盤的檔案内容必定會刷到磁盤。
      • 每次消息寫入後,如果沒有後續寫入請求,則會直接将已寫入的消息刷到磁盤:使用Erlang的receive x after 0實作,隻要程序的信箱裡沒有消息,則産生一個timeout消息,而timeout會觸發刷盤操作。

常見問題

  • RabbitMQ 管理插件啟動報錯
    • 确認RabbitMQ服務是否啟動
    • C:\Windows目錄下,将.erlang.cookie檔案,拷貝到使用者目錄下 C:\Users{使用者名},這是Erlang的Cookie檔案,允許與Erlang進行互動
    • 重新安裝erl 和 rabbit,盡量不要帶空格的路徑
  • 修改配置檔案
    • http://www.rabbitmq.com/configure.html