前言
吃多了拉就是隊列,吃飽了吐就是棧
- 使用場景
- 對操作的實時性要求不高,而需要執行的任務極為耗時;(發送短信,郵件提醒,更新文章閱讀計數,記錄使用者記錄檔)
- 存在異構系統間的整合;
安裝
- 下載下傳 Erlang
- 安裝完确定ERLANG_HOME環境變量是否添加,否則:
Setx ERLANG_HOME “D:\Program Files\erl8.2″
- 安裝完确定ERLANG_HOME環境變量是否添加,否則:
- 下載下傳安裝包
- 安裝完通過
确定rabbitmq狀态rabbitmqctl status
- 安裝完通過
- 管理服務
- 預設安裝成功會自動啟動服務
- 通過開始菜單可以啟動,停止,解除安裝服務
- 占用端口
- 4369(叢集、Erlang)
- 5671,5672(應用層标準進階消息隊列協定)
- 25672(Erlang分發,CLI通信)
- 15672(如果管理插件啟用)
- 61613,61614(如果消息文本協定STOMP已啟用)
- 1883,8883(如果erl實時通信已啟用)
- 支援的平台
- 基于Ubuntu和Debian的Linux發行版
- 基于Fedora,CentOS和RPM的Linux發行版
- Mac OS X
- Windows XP及更高版本
概念
- Connections:用戶端連接配接,建立該資源非常耗時,應盡量避免多次建立。
- Channel:消息通道,在用戶端的每個連接配接裡,可建立多個channel,每個channel代表一個會話任務。
- Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。
- Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
- Broker:簡單來說就是消息隊列伺服器實體。
- Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
- Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
- vhost:虛拟主機,一個broker裡可以開設多個vhost,用作不同使用者的權限分離。
- producer:消息生産者,就是投遞消息的程式。
- consumer:消息消費者,就是接收消息的程式。
消息隊列的發送過程大概如下:
- 用戶端建立Connection,連接配接到消息隊列伺服器,打開一個channel。
- 用戶端聲明一個Exchange,并設定相關屬性。
- 用戶端聲明一個Queue,并設定相關屬性。
- 用戶端使用routing key,在exchange和queue之間建立好綁定關系。
- 用戶端發送消息首先到exchange
- exchange根據type路由到對應的隊列(可以是多個隊列)中.
Exchange Type
- direct(直連)
- routing key 與 binding key相同
- fanout
- 給所有綁定隊列發送消息
- topic
- routing key:audit.irs.corporate => binding key:audit.#
- routing key:audit.irs => binding key:audit.*
- default
- direct
- binding key為queue名稱
常用指令
- 管理插件
-
// 啟用rabbitmq-plugins enable rabbitmq_management
-
// 禁用rabbitmq-plugins disable rabbitmq_management
-
- 管理隊列
-
// 檢視隊列rabbitmqctl list_queues
-
- 管理使用者及權限
-
// 檢視所有使用者rabbitmqctl list_users
-
// 添加使用者rabbitmqctl add_user user_admin passwd_admin
-
// 添權重限rabbitmqctl set_user_tags user_admin administrator
-
// 删除使用者rabbitmqctl delete_user guest
-
// 修改密碼rabbitmqctl change_password {username} {newpassowrd}
-
- 管理虛拟主機vhost
-
// 建立虛拟主機rabbitmqctl add_vhost vhostpath
-
// 删除虛拟主機rabbitmqctl delete_vhost vhostpath
-
// 列出所有虛拟主機rabbitmqctl list_vhosts
-
使用
- 發送消息(以持久化代碼為例)
var factory = new ConnectionFactory
{
HostName = hostName, // rabbit server
UserName = "admin",
Password = "admin",
Port = 5672, // Broker端口
VirtualHost = "/" // 虛拟Host,需提前配置
};
using (var connection = factory.CreateConnection()) // 建立與RabbitMQ伺服器的連接配接
{
using (var channel = connection.CreateModel()) // 建立1個Channel(大部分API在該Channel中)
{
// 定義1個隊列,自動會和預設的exchange 做direct類型綁定
channel.QueueDeclare(
queue: "hello", // 隊列名稱
durable: true, // 隊列是否持久化
exclusive: false, // 排他隊列:如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接配接可見,并在連接配接斷開時自動删除。(活動在一次連接配接内)
autoDelete: false, // 自動删除:當最後一個消費者取消訂閱時,隊列自動删除。如果您需要僅由一個使用者使用的臨時隊列,請将自動删除與排除。當消費者斷開連接配接時,隊列将被删除。(至少消費者能連一次)
arguments: null); // 配置參數
var randomQueue = channel.QueueDeclare(); // 定義随機的隊列 該隊列為臨時隊列(排他隊列 + 自動删除)
// 定義Exchange(一般而言,不需要定義exchange,rabbitmq預設建立了所有類型的exchange)
//channel.ExchangeDeclare("direct-demo", ExchangeType.Direct); // 定義direct exchange
//channel.ExchangeDeclare("fannout-demo", ExchangeType.Fanout); // 定義fanout exchange
//channel.ExchangeDeclare("topic-demo", ExchangeType.Topic); // 定義fanout exchange
// 定義queue exchange key 關系(在某些業務場景下,會使用該關系做路由功能)
//channel.QueueBind(queue: "hello", exchange: "amq.direct", routingKey: "hello"); // 預設綁定的關系和該行代碼效果一樣
//channel.QueueBind("hello", "amq.fanout", "hello"); // 該類型下的routingKey 實際不需要
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
while (true)
{
string message = "Hello World!" + DateTime.Now;
var body = Encoding.UTF8.GetBytes(message);
// 發送消息到隊列中
channel.BasicPublish(
exchange: string.Empty, // 傳遞為Empty的時候,通過 `(AMQP default)`傳遞
routingKey: "hello", // routing key 與 queuebind中的binding key對應
basicProperties: properties, // 消息header
body: body); // 消息body:發送的是bytes 可以任意編碼
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
- 接收消息(以消息響應為例)
var factory = new ConnectionFactory { HostName = hostName, // rabbit server UserName = "admin", Password = "admin", Port = 5672, // Broker端口 VirtualHost = "/" // 虛拟Host,需提前配置 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { var consumer = new EventingBasicConsumer(channel); // 建立Consumer consumer.Received += (model, ea) => // 通過回調函數異步推送我們的消息 { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Thread.Sleep(1000); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); // 消息響應 Console.WriteLine(" [x] Received {0}", message); }; channel.BasicQos(0, 1, false); // 設定perfetchCount=1 。這樣就告訴RabbitMQ 不要在同一時間給一個工作者發送多于1個的消息 channel.BasicConsume(queue: "hello", noAck: false, // 需要消息響應(Acknowledgments)機制 consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
- 消息響應(acknowledgments)
- 為了防止消息丢失,RabbitMQ提供了消息響應(acknowledgments)機制。消費者會通過一個ack(響應),告訴RabbitMQ已經收到并處理了某條消息,然後RabbitMQ才會釋放并删除這條消息。
- 持久化
- 新隊列(無法修改隊列)配置為可持久化
- 發送消息配置為持久化
- 消息什麼時候刷到磁盤?
- 寫入檔案前會有一個Buffer,大小為1M,資料在寫入檔案時,首先會寫入到這個Buffer,如果Buffer已滿,則會将Buffer寫入到檔案(未必刷到磁盤)。
- 固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每個25ms,Buffer裡的資料及未重新整理到磁盤的檔案内容必定會刷到磁盤。
- 每次消息寫入後,如果沒有後續寫入請求,則會直接将已寫入的消息刷到磁盤:使用Erlang的receive x after 0實作,隻要程序的信箱裡沒有消息,則産生一個timeout消息,而timeout會觸發刷盤操作。
常見問題
- RabbitMQ 管理插件啟動報錯
- 确認RabbitMQ服務是否啟動
- C:\Windows目錄下,将.erlang.cookie檔案,拷貝到使用者目錄下 C:\Users{使用者名},這是Erlang的Cookie檔案,允許與Erlang進行互動
- 重新安裝erl 和 rabbit,盡量不要帶空格的路徑
- 修改配置檔案
- http://www.rabbitmq.com/configure.html