深入了解RabbitMQ工作原理及簡單使用
RabbitMQ系列文章
- RabbitMQ在Ubuntu上的環境搭建
- RabbitMQ交換器Exchange介紹與實踐
- RabbitMQ事務和Confirm發送方消息确認——深入解讀
- 使用Docker部署RabbitMQ叢集
- 你不知道的RabbitMQ叢集架構全解
RabbitMQ簡介
在介紹RabbitMQ之前實作要介紹一下MQ,MQ是什麼?
MQ全稱是Message Queue,可以了解為消息隊列的意思,簡單來說就是消息以管道的方式進行傳遞。
RabbitMQ是一個實作了AMQP(Advanced Message Queuing Protocol)進階消息隊列協定的消息隊列服務,用Erlang語言的。
使用場景
在我們秒殺搶購商品的時候,系統會提醒我們稍等排隊中,而不是像幾年前一樣頁面卡死或報錯給使用者。
像這種排隊結算就用到了消息隊列機制,放入通道裡面一個一個結算處理,而不是某個時間斷突然湧入大批量的查詢新增把資料庫給搞當機,是以RabbitMQ本質上起到的作用就是削峰填谷,為業務保駕護航。
為什麼選擇RabbitMQ
現在的市面上有很多MQ可以選擇,比如ActiveMQ、ZeroMQ、Appche Qpid,那問題來了為什麼要選擇RabbitMQ?
- 除了Qpid,RabbitMQ是唯一一個實作了AMQP标準的消息伺服器;
- 可靠性,RabbitMQ的持久化支援,保證了消息的穩定性;
- 高并發,RabbitMQ使用了Erlang開發語言,Erlang是為電話交換機開發的語言,天生自帶高并發光環,和高可用特性;
- 叢集部署簡單,正是應為Erlang使得RabbitMQ叢集部署變的超級簡單;
- 社群活躍度高,根據網上資料來看,RabbitMQ也是首選;
工作機制
生産者、消費者和代理
在了解消息通訊之前首先要了解3個概念:生産者、消費者和代理。
生産者:消息的建立者,負責建立和推送資料到消息伺服器;
消費者:消息的接收方,用于處理資料和确認消息;
代理:就是RabbitMQ本身,用于扮演“快遞”的角色,本身不生産消息,隻是扮演“快遞”的角色。
消息發送原理
首先你必須連接配接到Rabbit才能釋出和消費消息,那怎麼連接配接和發送消息的呢?
你的應用程式和Rabbit Server之間會建立一個TCP連接配接,一旦TCP打開,并通過了認證,認證就是你試圖連接配接Rabbit之前發送的Rabbit伺服器連接配接資訊和使用者名和密碼,有點像程式連接配接資料庫,使用Java有兩種連接配接認證的方式,後面代碼會詳細介紹,一旦認證通過你的應用程式和Rabbit就建立了一條AMQP信道(Channel)。
信道是建立在“真實”TCP上的虛拟連接配接,AMQP指令都是通過信道發送出去的,每個信道都會有一個唯一的ID,不論是釋出消息,訂閱隊列或者介紹消息都是通過信道完成的。
為什麼不通過TCP直接發送指令?
對于作業系統來說建立和銷毀TCP會話是非常昂貴的開銷,假設高峰期每秒有成千上萬條連接配接,每個連接配接都要建立一條TCP會話,這就造成了TCP連接配接的巨大浪費,而且作業系統每秒能建立的TCP也是有限的,是以很快就會遇到系統瓶頸。
如果我們每個請求都使用一條TCP連接配接,既滿足了性能的需要,又能確定每個連接配接的私密性,這就是引入信道概念的原因。

你必須知道的Rabbit
想要真正的了解Rabbit有些名詞是你必須知道的。
包括:ConnectionFactory(連接配接管理器)、Channel(信道)、Exchange(交換器)、Queue(隊列)、RoutingKey(路由鍵)、BindingKey(綁定鍵)。
ConnectionFactory(連接配接管理器):應用程式與Rabbit之間建立連接配接的管理器,程式代碼中使用;
Channel(信道):消息推送使用的通道;
Exchange(交換器):用于接受、配置設定消息;
Queue(隊列):用于存儲生産者的消息;
RoutingKey(路由鍵):用于把生成者的資料配置設定到交換器上;
BindingKey(綁定鍵):用于把交換器的消息綁定到隊列上;
看到上面的解釋,最難了解的路由鍵和綁定鍵了,那麼他們具體怎麼發揮作用的,請看下圖:
關于更多交換器的資訊,我們在後面再講。
消息持久化
Rabbit隊列和交換器有一個不可告人的秘密,就是預設情況下重新開機伺服器會導緻消息丢失,那麼怎麼保證Rabbit在重新開機的時候不丢失呢?答案就是消息持久化。
當你把消息發送到Rabbit伺服器的時候,你需要選擇你是否要進行持久化,但這并不能保證Rabbit能從崩潰中恢複,想要Rabbit消息能恢複必須滿足3個條件:
- 投遞消息的時候durable設定為true,消息持久化,代碼:channel.queueDeclare(x, true, false, false, null),參數2設定為true持久化;
- 設定投遞模式deliveryMode設定為2(持久),代碼:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),參數3設定為存儲純文字到磁盤;
- 消息已經到達持久化交換器上;
- 消息已經到達持久化的隊列;
持久化工作原理
Rabbit會将你的持久化消息寫入磁盤上的持久化日志檔案,等消息被消費之後,Rabbit會把這條消息辨別為等待垃圾回收。
持久化的缺點
消息持久化的優點顯而易見,但缺點也很明顯,那就是性能,因為要寫入硬碟要比寫入記憶體性能較低很多,進而降低了伺服器的吞吐量,盡管使用SSD硬碟可以使事情得到緩解,但他仍然吸幹了Rabbit的性能,當消息成千上萬條要寫入磁盤的時候,性能是很低的。
是以使用者要根據自己的情況,選擇适合自己的方式。
虛拟主機
每個Rabbit都能建立很多vhost,我們稱之為虛拟主機,每個虛拟主機其實都是mini版的RabbitMQ,擁有自己的隊列,交換器和綁定,擁有自己的權限機制。
vhost特性
- RabbitMQ預設的vhost是“/”開箱即用;
- 多個vhost是隔離的,多個vhost無法通訊,并且不用擔心命名沖突(隊列和交換器和綁定),實作了多層分離;
- 建立使用者的時候必須指定vhost;
vhost操作
可以通過rabbitmqctl工具指令建立:
rabbitmqctl add_vhost[vhost_name]
删除vhost:
rabbitmqctl delete_vhost[vhost_name]
檢視所有的vhost:
rabbitmqctl list_vhosts
環境搭建
前文我們已經介紹了Ubuntu搭建RabbitMQ的步驟:RabbitMQ在Ubuntu上的環境搭建
如果你是在Windows10上去安裝那就更簡單了,先放下載下傳位址:
Erlang/Rabbit Server百度網盤連結:https://pan.baidu.com/s/1TnKDV-ZuXLiIgyK8c8f9dg 密碼:wct9
當然也可去Erlang和Rabbit官網去下,就是速度比較慢。我的百度雲Rabbit最新版本:3.7.6,Erlang版本:20.2,注意:不要下載下傳最新的Erlang,在Windows10上打開擴充插件有問題,打不開。
- 安裝Erlang;
- 安裝Rabbit Server;
- 進入安裝目錄\sbin下,使用指令“rabbitmq-plugins enable rabbitmq_management”啟動網頁管理插件;
- 重新開機Rabbit服務;
使用:http://localhost:15672進行測試,預設的登陸賬号為:guest,密碼為:guest
重複安裝Rabbit Server的坑
如果不是第一次在Windows上安裝Rabbit Server一定要把Rabbit和Erlang解除安裝幹淨之後,找到系統資料庫:HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 删除其下的所有項。
不然會出現Rabbit安裝之後啟動不了的情況,理論上解除安裝的順序也是先Rabbit在Erlang。
代碼實作
java版實作,使用maven項目,建立可以檢視:MyEclipse2017破解設定與maven項目搭建
項目建立成功之後,添加Rabbit Client jar包,隻需要在pom.xml裡面配置,如下資訊:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>
java實作代碼分為兩個類,第一個是建立Rabbit連接配接,第二是應用類使用最簡單的方式釋出和消費消息。
Rabbit的連接配接,兩種方式:
方式一:
public static Connection GetRabbitConnection() {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(Config.UserName);
factory.setPassword(Config.Password);
factory.setVirtualHost(Config.VHost);
factory.setHost(Config.Host);
factory.setPort(Config.Port);
Connection conn = null;
try {
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
方式二:
public static Connection GetRabbitConnection2() {
ConnectionFactory factory = new ConnectionFactory();
// 連接配接格式:amqp://userName:password@hostName:portNumber/virtualHost
String uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port,
Config.VHost);
Connection conn = null;
try {
factory.setUri(uri);
factory.setVirtualHost(Config.VHost);
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
第二部分:應用類,使用最簡單的方式釋出和消費消息
public static void main(String[] args) {
Publisher(); // 推送消息
Consumer(); // 消費消息
}
/**
* 推送消息
*/
public static void Publisher() {
// 建立一個連接配接
Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
if (conn != null) {
try {
// 建立通道
Channel channel = conn.createChannel();
// 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨占模式;參數四:消費者斷開連接配接時是否删除隊列;參數五:消息其他參數】
channel.queueDeclare(Config.QueueName, false, false, false, null);
String content = String.format("目前時間:%s", new Date().getTime());
// 發送内容【參數說明:參數一:交換機名稱;參數二:隊列名稱,參數三:消息的其他屬性-routing headers,此屬性為MessageProperties.PERSISTENT_TEXT_PLAIN用于設定純文字消息存儲到硬碟;參數四:消息主體】
channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8"));
System.out.println("已發送消息:" + content);
// 關閉連接配接
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消費消息
*/
public static void Consumer() {
// 建立一個連接配接
Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
if (conn != null) {
try {
// 建立通道
Channel channel = conn.createChannel();
// 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨占模式;參數四:消費者斷開連接配接時是否删除隊列;參數五:消息其他參數】
channel.queueDeclare(Config.QueueName, false, false, false, null);
// 建立訂閱器,并接受消息
channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey(); // 隊列名稱
String contentType = properties.getContentType(); // 内容類型
String content = new String(body, "utf-8"); // 消息正文
System.out.println("消息正文:" + content);
channel.basicAck(envelope.getDeliveryTag(), false); // 手動确認消息【參數說明:參數一:該消息的index;參數二:是否批量應答,true批量确認小于index的消息】
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
代碼裡面已經寫了很詳細的注釋,在這裡也不過多的介紹了。
執行效果,如圖:
關注下面二維碼,訂閱更多精彩内容。
關注公衆号(加好友):
作者:
王磊的部落格
出處:
http://vipstone.cnblogs.com/