本文将提供一個基于高可用配置的RabbitMQ叢集方案。通過介紹RabbitMQ的基本概念、主要作用和使用場景,并搭建RabbitMQ單節點環境、用程式示範消息發送接收過程,以及搭建RabbitMQ高可用叢集環境,來生動而完整地介紹整個叢集方案。并結合自己的實踐,分享一些在叢集環境下的用戶端開發優化經驗。
本文内容主要包括以下幾個方面:
1、RabbitMQ基礎介紹
2、RabbitMQ單節點環境的實踐
3、RabbitMQ高可用叢集環境的實踐
4、總結
一、 RabbitMQ基礎介紹
RabbitMQ是基于AMQP(Advanced Message Queuing Protocol,進階消息隊列協定)實作的消息中間件。消息中間件主要用于元件之間的解耦,消息的發送者無需知道消息接收者的存在,反之亦然。RabbitMQ有很多優點:
1、支援叢集環境和高可用隊列,魯棒性強
2、支援豐富的開發平台,如Java、.NET、Python、Perl、PHP、JavaScript等;
3、可運作在大多數作業系統;
4、開源、有活躍的社群、有豐富的插件。
RabbitMQ作為消息代理(message broker),給我們提供了發送、接收消息的平台,并能在消息被接收之前保持其持久性。通常用于應用程式之間、以及程式内各元件之間的消息傳遞等場景。比如,用于實作Java程式與Perl程式之間的消息傳遞;又如在Java Web程式中用于背景功能子產品與前台通知子產品之間的消息傳遞,等等。
在使用RabbitMQ進行用戶端開發之前,我們需要了解下它的基本元件和概念:
Broker: 消息隊列伺服器,是接受用戶端連接配接,實作AMQP消息隊列和路由功能的程序。
1、Virtual Host: 是一個邏輯概念,一個Virtual Host裡面可以有若幹個Exchange和Queue,它是權限控制的最小單元。
2、Queue: 消息隊列載體,用于存儲消息。
3、Exchange: 路由,用于接收消息并根據Binding規則将消息路由給伺服器中的隊列。
4、Binding: 綁定,用于把Exchange和Queue按照路由規則綁定起來。
5、RoutingKey: 路由關鍵字,Exchange根據這個關鍵字進行消息投遞。
6、Connection: 連接配接,一個位于用戶端和Broker之間的TCP連接配接。
7、Channel: 消息通道,在用戶端的每個Connection裡可建立多個Channel,每個channel代表一個會話。之是以需要Channel,是因為TCP連接配接的建立和釋放都是十分昂貴的。
二、 RabbitMQ單節點環境的實踐
上面簡單介紹了RabbitMQ的作用和優點,以及基本元件和概念。接下來,我們以Ubuntu 14.04 LTS 64bit的伺服器為例,動手搭建一個RabbitMQ Server的單節點環境(目前最新版是rabbitmq-server_3.6.5),并編寫java程式來測試RabbitMQ發送、接受消息的具體過程,具體步驟如下:
1. 通過APT方式安裝RabbitMQ服務
1.1 添加APT庫到本地軟體更新的伺服器位址清單中:
echo 'deb http://www.rabbitmq.com/debian/ testing main' |sudo tee /etc/apt/sources.list.d/rabbitmq.list
1.2 下載下傳RabbitMQ的公鑰并添加到本地trusted資料庫中:
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc |sudo apt-key add -
1.3 更新APT庫軟體包清單:
sudo apt-get update
1.4 安裝RabbitMQ軟體包(安裝之後預設啟動):
sudo apt-get install rabbitmq-server
2. 啟用常用插件,包括管理插件、web-stomp插件等
rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp
3. 添加管理使用者并授權
RabbitMQ安裝之後,會建立預設的Virtual Host(即“/”),以及預設的管理賬戶(guest/guest),但該guest賬戶隻能以localhost方式連接配接到Broker。如果需要遠端通路Broker, 我們可以通過配置新的管理者賬戶來實作。
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
4. 重新開機服務
service rabbitmq-server restart
▲RabbitMQ安裝過程
另外,通過service rabbitmq-server status 指令可以檢視該服務是否正常運作
5. 通路管理頁面
重新開機服務之後,在浏覽器中輸入http://{server_ip}:15672,并用上面建立的admin/admin賬戶來登入,即可監控RabbitMQ的運作情況。
通過管理頁面,我們可以管理Connection、Channel、Exchange、Queue等元件,并能夠通過Admin子產品來管理User、Virtual Host及Policy。
▲RabbitMQ管理頁面(需啟用管理插件)
6. 使用Java程式發送、接收消息
現在我們來通過java程式來實作一個簡單的生産者-消費者模型。生産者(P)發送消息到Broker的Queue中,消費者(C)從Broker的Queue中擷取消息。如下圖:
▲基于消息隊列的生産者消費者模型
6.1 編寫生産者java類:Send.java
▲Send.java
6.2 編寫消費者java類:Recv.java
▲Recv.java
6.3 運作程式Send.java及Recv.java,檢視消息發送、接收情況。
▲Send.java與Recv.java執行結果
三、 RabbitMQ高可用叢集環境的實踐
叢集是保證服務可靠性的一種方式,同時可以通過水準擴充以提升消息吞吐能力。RabbitMQ是用分布式程式設計語言erlang開發的,是以天生就支援叢集。接下來,将介紹RabbitMQ分布式消息處理方式、叢集模式、節點類型,并動手搭建一個高可用叢集環境,最後通過java程式來驗證叢集的高可用性。
1. 三種分布式消息處理方式
RabbitMQ分布式的消息處理方式有以下三種:
1、Clustering:不支援跨網段,各節點需運作同版本的Erlang和RabbitMQ, 應用于同網段區域網路。
2、Federation:允許單台伺服器上的Exchange或Queue接收釋出到另一台伺服器上Exchange或Queue的消息, 應用于廣域網,。
3、Shovel:與Federation類似,但工作在更低層次。
RabbitMQ對網絡延遲很敏感,在LAN環境建議使用clustering方式;在WAN環境中,則使用Federation或Shovel。我們平時說的RabbitMQ叢集,說的就是clustering方式,它是RabbitMQ内嵌的一種消息處理方式,而Federation或Shovel則是以plugin形式存在。
2. 兩種叢集模式
▲RabbitMQ叢集結構
2.1 普通模式(預設)
圖7是由3個節點(Node1,Node2,Node3)組成的RabbitMQ普通叢集環境,Exchange A的中繼資料資訊在所有節點上是一緻的;而Queue的完整資訊隻有在建立它的節點上,各個節點僅有相同的中繼資料,即隊列結構。
當producer發送消息到Node1節點的Queue1中後,consumer從Node3節點拉取時,RabbitMQ會臨時在Node1、Node3間進行消息傳輸,把Node1中的消息實體取出并經過Node3發送給consumer。
該模式存在一個問題:當Node1節點發生故障後,Node3節點無法取到Node1節點中還未被消費的消息實體。如果消息沒做持久化,那麼消息将永久性丢失;如果做了持久化,那麼隻有等Node1節點故障恢複後,消息才能被其他節點消費。
2.2 鏡像模式(基于鏡像隊列)
它是在普通模式的基礎上,把需要的隊列做成鏡像隊列,存在于多個節點來實作高可用(HA)。該模式解決了上述問題,Broker會主動地将消息實體在各鏡像節點間同步,在consumer取資料時無需臨時拉取。
該模式帶來的副作用也很明顯,除了降低系統性能外,如果鏡像隊列數量過多,加之大量的消息進入,叢集内部的網絡帶寬将會被大量消耗。通常地,對可靠性要求較高的場景建議采用鏡像模式。
3. 兩種叢集節點類型
1、RAM Node(記憶體節點):将所有的Virtual Host、Queue、Exchange、Binding、User等的中繼資料存儲在記憶體中,是以性能比較出色。
2、DISC Node(磁盤節點): 将中繼資料存儲在磁盤中。
注:RabbitMQ單節點環境隻允許是磁盤節點,防止重新開機RabbitMQ時丢失系統的配置資訊。RabbitMQ叢集環境至少要有一個磁盤節點,因為當節點加入或者離開叢集時,必須要将該變更通知到至少一個磁盤節點。
4. 搭建普通叢集環境
現在,将以圖7為例來搭建普通叢集環境。
4.1 首先根據上文“RabbitMQ單節點環境的搭建”章節相關内容,準備好以下3個節點:
▲各節點配置資訊(待設定)
4.2 設定各節點的hostname:
vim /etc/hostname
……
reboot
注:其他Linux發行版可能需要通過“vim /etc/sysconfig/network”來修改hostname
4.3 修改各節點的hosts檔案,并添加以下DNS資訊:
vim /etc/hosts
192.168.1.10 rabbit01 rabbit01
192.168.1.20 rabbit02 rabbit02
192.168.1.30 rabbit03 rabbit03
4.4 将各節點的erlang.cookie設定為相同值,比如都使用rabbit01節點的值:
#先備份原cookie檔案
rabbit02# cp /var/lib/rabbitmq/.erlang.cookie /var/lib/rabbitmq/.erlang.cookie.bak
rabbit03# cp /var/lib/rabbitmq/.erlang.cookie /var/lib/rabbitmq/.erlang.cookie.bak
#修改cookie的值
chmod 777 /var/lib/rabbitmq/.erlang.cookie
vim /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
4.5 停止所有節點上的RabbitMQ服務,然後以detached方式獨立運作:
rabbit01# rabbitmqctl stop
rabbit02# rabbitmqctl stop
rabbit03# rabbitmqctl stop
rabbit01# rabbitmq-server -detached
rabbit02# rabbitmq-server -detached
rabbit03# rabbitmq-server -detached
4.6 檢視各節點的叢集資訊:
rabbit01# rabbitmqctl cluster_status
rabbit02# rabbitmqctl cluster_status
rabbit03# rabbitmqctl cluster_status
可以看到,各節點均以單磁盤節點的叢集方式各自運作着。
▲各節點叢集資訊查詢(加入叢集前)
4.7 将rabbit02、rabbit03 作為記憶體節點,加入到rabbit01的叢集中
rabbit02# rabbitmqctl stop_app
rabbit02# rabbitmqctl join_cluster --ram rabbit@rabbit01
rabbit02# rabbitmqctl start_app
rabbit03# rabbitmqctl stop_app
rabbit03# rabbitmqctl join_cluster --ram rabbit@rabbit01
rabbit03# rabbitmqctl start_app
4.8 再次檢視各節點的叢集資訊:
可以看到,各節點處于一個由rabbit01(disc node)、rabbit02(ram node)、rabbit03(ram node)組成的叢集,名為“rabbit@rabbit01”
▲各節點叢集資訊查詢(加入叢集後)
▲從管理頁面看叢集資訊
注:
如果需要将某個節點從叢集中移除,使其變回獨立節點,可以使用以下指令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
如果需要啟停某個節點來進行維護,可以使用以下指令:
rabbitmqctl stop
#FormatImgID_24##FormatImgID_25#rabbitmq-server -detached
當新的節點加入到叢集之後,其使用者資訊也被重置了(之前新增的admin賬戶不見了),需要重新配置管理者使用者,以便通路RabbitMQ管理頁面(在任意節點添加使用者,會自動同步到各個叢集節點):
#添加管理者使用者并授權:
rabbit01# rabbitmqctl add_user admin admin
rabbit01# rabbitmqctl set_user_tags admin administrator
rabbit01# rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
▲重新添加管理者賬戶
5. 搭建高可用叢集環境
通過Policy(政策)設定鏡像隊列,來實作RabbitMQ的高可用方案。政策主要用來控制和修改叢集範圍的某個Virtual Host中Exchange和Queue的行為。具體有以下三種政策類型:
▲政策的3種類型
這裡,我們采用以下政策(在叢集中任意節點啟用政策,政策會自動同步到各個叢集節點):
#同步以"ha."開頭的隊列到叢集中各節點,應用于所有節點(包括新增節點)
#FormatImgID_27##FormatImgID_28#rabbitmqctl set_policy my-ha-all "^ha\." '{"ha-mode":"all",,"ha-sync-mode":"automatic"}'
rabbitmqctl set_policy my-ha-all "^ha\." '{"ha-mode":"all"}'
6. 驗證叢集的高可用性
接下來,基于圖7的叢集環境來驗證叢集中鏡像隊列的高可用性。
6.1 修改生産者程式(Send.java),連接配接到Node1(192.168.1.10)節點,發送消息到鏡像隊列ha.hello中(修改部分如下圖):
▲從Node1發送消息到鏡像隊列
6.2 修改消費者程式(Recv.java),連接配接到Node3(192.168.1.30)節點,接收ha.hello隊列中的消息(修改部分如下圖):
▲從Node3接收鏡像隊列的消息
6.3 首先執行生産者程式(Send.java)發送100條消息,然後通過指令“rabbitmqctl stop”停止Node1的RabbitMQ服務,再執行消費者程式(Recv.java)接收消息。
▲發送消息到ha.hello隊列
▲停止Node1節點
▲ha.hello隊列中消息被成功消費
7. 基于叢集環境的用戶端開發優化建議
上面介紹的Send.java與Recv.java代碼,存在以下兩點不足需要改進:
1、生産者、消費者都隻連接配接到了叢集中的某個節點。如果該節點故障之後,用戶端程式将無法正常發送或接收消息;
2、沒有設定自動重連機制,使得用戶端程式與Broker之間建立的TCP連接配接很脆弱。一旦由于網絡異常導緻Connection關閉,用戶端程式将程式将無法正常接收消息。
基于上述兩個實際用戶端開發的痛點,我們需要對程式進行叢集全節點連接配接、自動重連的改進。改進後的Recv.java完整代碼如下圖19~圖21所示:
首先讓Recv類實作Runnable、Consumer接口,讓Recv執行個體以Consomer線程的方式運作。
▲實作Runnable及Consumer接口
然後在重寫run方法中進行自動重連、連接配接到所有節點的設定
▲實作run方法,在其中設定
然後重寫handleDelivery方法,來設定接收到消息後的處理邏輯;并空實作Consumer接口中其他handleXXX的方法;最後通過main方法以線程的方式建立Consumer的執行個體來接收消息。
▲重寫handleDelivery方法實作消息處理
四、總結