天天看點

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

消息隊列及常見消息隊列介紹

一、消息隊列(MQ)概述

消息隊列(Message Queue),是分布式系統中重要的元件,其通用的使用場景可以簡單地描述為:

當不需要立即獲得結果,但是并發量又需要進行控制的時候,差不多就是需要使用消息隊列的時候。

消息隊列主要解決了應用耦合、異步處理、流量削鋒等問題。

目前使用較多的消息隊列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分資料庫如Redis、Mysql以及phxsql也可實作消息隊列的功能。

二、消息隊列使用場景

消息隊列在實際應用中包括如下四個場景:

  • 應用耦合:多應用間通過消息隊列對同一消息進行處理,避免調用接口失敗導緻整個過程失敗;
  • 異步處理:多應用對消息隊列中同一消息進行處理,應用間并發處理消息,相比串行處理,減少處理時間;
  • 限流削峰:廣泛應用于秒殺或搶購活動中,避免流量過大導緻應用系統挂掉的情況;
  • 消息驅動的系統:系統分為消息隊列、消息生産者、消息消費者,生産者負責産生消息,消費者(可能有多個)負責對消息進行處理;

下面詳細介紹上述四個場景以及消息隊列如何在上述四個場景中使用:

2.1 異步處理

具體場景:使用者為了使用某個應用,進行注冊,系統需要發送注冊郵件并驗證短信。對這兩個操作的處理方式有兩種:串行及并行。

(1)串行方式:新注冊資訊生成後,先發送注冊郵件,再發送驗證短信;

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

在這種方式下,需要最終發送驗證短信後再傳回給用戶端。

(2)并行處理:新注冊資訊寫入後,由發短信和發郵件并行處理;

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

在這種方式下,發短信和發郵件 需處理完成後再傳回給用戶端。

假設以上三個子系統處理的時間均為50ms,且不考慮網絡延遲,則總的處理時間:

串行:50+50+50=150ms

并行:50+50 = 100ms

若使用消息隊列:

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

并在寫入消息隊列後立即傳回成功給用戶端,則總的響應時間依賴于寫入消息隊列的時間,而寫入消息隊列的時間本身是可以很快的,基本可以忽略不計,是以總的處理時間相比串行提高了2倍,相比并行提高了一倍;

2.2 應用耦合

具體場景:使用者使用QQ相冊上傳一張圖檔,人臉識别系統會對該圖檔進行人臉識别,一般的做法是,伺服器接收到圖檔後,圖檔上傳系統立即調用人臉識别系統,調用完成後再傳回成功,如下圖所示:

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

該方法有如下缺點:

  • 人臉識别系統被調失敗,導緻圖檔上傳失敗;
  • 延遲高,需要人臉識别系統處理完成後,再傳回給用戶端,即使使用者并不需要立即知道結果;
  • 圖檔上傳系統與人臉識别系統之間互相調用,需要做耦合;
RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

用戶端上傳圖檔後,圖檔上傳系統将圖檔資訊如uin、批次寫入消息隊列,直接傳回成功;而人臉識别系統則定時從消息隊列中取資料,完成對新增圖檔的識别。

此時圖檔上傳系統并不需要關心人臉識别系統是否對這些圖檔資訊的處理、以及何時對這些圖檔資訊進行處理。事實上,由于使用者并不需要立即知道人臉識别結果,人臉識别系統可以選擇不同的排程政策,按照閑時、忙時、正常時間,對隊列中的圖檔資訊進行處理。

2.3 限流削峰

具體場景:購物網站開展秒殺活動,一般由于瞬時通路量過大,伺服器接收過大,會導緻流量暴增,相關系統無法處理請求甚至崩潰。而加入消息隊列後,系統可以從消息隊列中取資料,相當于消息隊列做了一次緩沖。

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

該方法有如下優點:

  1. 請求先入消息隊列,而不是由業務處理系統直接處理,做了一次緩沖,極大地減少了業務處理系統的壓力;
  2. 隊列長度可以做限制,事實上,秒殺時,後入隊列的使用者無法秒殺到商品,這些請求可以直接被抛棄,傳回活動已結束或商品已售完資訊;

2.4 消息驅動的系統

具體場景:使用者新上傳了一批照片, 人臉識别系統需要對這個使用者的所有照片進行聚類,聚類完成後由對賬系統重新生成使用者的人臉索引(加快查詢)。這三個子系統間由消息隊列連接配接起來,前一個階段的處理結果放入隊列中,後一個階段從隊列中擷取消息繼續處理。

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙
  • 避免了直接調用下一個系統導緻目前系統失敗;
  • 每個子系統對于消息的處理方式可以更為靈活,可以選擇收到消息時就處理,可以選擇定時處理,也可以劃分時間段按不同處理速度處理;

三、消息隊列的兩種模式

消息隊列包括兩種模式,點對點模式(point to point, queue)和釋出/訂閱模式(publish/subscribe,topic)。

3.1 點對點模式

點對點模式下包括三個角色:

  • 消息隊列
  • 發送者 (生産者)
  • 接收者(消費者)
RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

消息發送者生産消息發送到queue中,然後消息接收者從queue中取出并且消費消息。消息被消費以後,queue中不再有存儲,是以消息接收者不可能消費到已經被消費的消息。

點對點模式特點:

  • 每個消息隻有一個接收者(Consumer)(即一旦被消費,消息就不再在消息隊列中);
  • 發送者和接收者間沒有依賴性,發送者發送消息之後,不管有沒有接收者在運作,都不會影響到發送者下次發送消息;
  • 接收者在成功接收消息之後需向隊列應答成功,以便消息隊列删除目前接收的消息;

3.2 釋出/訂閱模式

釋出/訂閱模式下包括三個角色:

  • 角色主題(Topic)
  • 釋出者(Publisher)
  • 訂閱者(Subscriber)
RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

釋出者将消息發送到Topic,系統将這些消息傳遞給多個訂閱者。

釋出/訂閱模式特點:

  • 每個消息可以有多個訂閱者;
  • 釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的消息。
  • 為了消費消息,訂閱者需要提前訂閱該角色主題,并保持線上運作;

四、常用消息隊列介紹

本部分主要介紹四種常用的消息隊列(RabbitMQ/ActiveMQ/RocketMQ/Kafka)的主要特性、優點、缺點。

4.1 RabbitMQ

RabbitMQ 2007年釋出,是一個在AMQP(進階消息隊列協定)基礎上完成的,可複用的企業消息系統,是目前最主流的消息中間件之一。

主要特性:

  1. 可靠性: 提供了多種技術可以讓你在性能和可靠性之間進行權衡。這些技術包括持久性機制、投遞确認、釋出者證明和高可用性機制;
  2. 靈活的路由: 消息在到達隊列前是通過交換機進行路由的。RabbitMQ為典型的路由邏輯提供了多種内置交換機類型。如果你有更複雜的路由需求,可以将這些交換機組合起來使用,你甚至可以實作自己的交換機類型,并且當做RabbitMQ的插件來使用;
  3. 消息叢集:在相同區域網路中的多個RabbitMQ伺服器可以聚合在一起,作為一個獨立的邏輯代理來使用;
  4. 隊列高可用:隊列可以在叢集中的機器上進行鏡像,以確定在硬體問題下還保證消息安全;
  5. 多種協定的支援:支援多種消息隊列協定;
  6. 伺服器端用Erlang語言編寫,支援隻要是你能想到的所有程式設計語言;
  7. 管理界面: RabbitMQ有一個易用的使用者界面,使得使用者可以監控和管理消息Broker的許多方面;
  8. 跟蹤機制:如果消息異常,RabbitMQ提供消息跟蹤機制,使用者可以找出發生了什麼;
  9. 插件機制:提供了許多插件,來從多方面進行擴充,也可以編寫自己的插件;

使用RabbitMQ需要:

  • ErLang語言包
  • RabbitMQ安裝包

RabbitMQ可以運作在Erlang語言所支援的平台之上:

Solaris

BSD

Linux

MacOSX

TRU64

Windows NT/2000/XP/Vista/Windows 7/Windows 8

Windows Server 2003/2008/2012

Windows 95, 98

VxWorks

優點:

  1. 由于erlang語言的特性,mq 性能較好,高并發;
  2. 健壯、穩定、易用、跨平台、支援多種語言、文檔齊全;
  3. 有消息确認機制和持久化機制,可靠性高;
  4. 高度可定制的路由;
  5. 管理界面較豐富,在網際網路公司也有較大規模的應用;
  6. 社群活躍度高;

缺點:

  1. 盡管結合erlang語言本身的并發優勢,性能較好,但是不利于做二次開發和維護;
  2. 實作了代理架構,意味着消息在發送到用戶端之前可以在中央節點上排隊。此特性使得RabbitMQ易于使用和部署,但是使得其運作速度較慢,因為中央節點增加了延遲,消息封裝後也比較大;
  3. 需要學習比較複雜的接口和協定,學習和維護成本較高;

4.2 ActiveMQ

ActiveMQ是由Apache出品,ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實作。它非常快速,支援多種語言的用戶端和協定,而且可以非常容易的嵌入到企業的應用環境中,并有許多進階功能。

  1. 服從 JMS 規範:JMS 規範提供了良好的标準和保證,包括:同步或異步的消息分發,一次和僅一次的消息分發,消息接收和訂閱等等。遵從 JMS 規範的好處在于,不論使用什麼 JMS 實作提供者,這些基礎特性都是可用的;
  2. 連接配接性:ActiveMQ 提供了廣泛的連接配接選項,支援的協定有:HTTP/S,IP 多點傳播,SSL,STOMP,TCP,UDP,XMPP等等。對衆多協定的支援讓 ActiveMQ 擁有了很好的靈活性。
  3. 支援的協定種類多:OpenWire、STOMP、REST、XMPP、AMQP ;
  4. 持久化插件和安全插件:ActiveMQ 提供了多種持久化選擇。而且,ActiveMQ 的安全性也可以完全依據使用者需求進行自定義鑒權和授權;
  5. 支援的用戶端語言種類多:除了 Java 之外,還有:C/C++,.NET,Perl,PHP,Python,Ruby;
  6. 代理叢集:多個 ActiveMQ 代理可以組成一個叢集來提供服務;
  7. 異常簡單的管理:ActiveMQ 是以開發者思維被設計的。是以,它并不需要專門的管理者,因為它提供了簡單又使用的管理特性。有很多中方法可以監控 ActiveMQ 不同層面的資料,包括使用在 JConsole 或者 ActiveMQ 的Web Console 中使用 JMX,通過處理 JMX 的告警消息,通過使用指令行腳本,甚至可以通過監控各種類型的日志。

使用ActiveMQ需要:

  • Java JDK
  • ActiveMQ安裝包

ActiveMQ可以運作在Java語言所支援的平台之上。

  1. 跨平台(JAVA編寫與平台無關有,ActiveMQ幾乎可以運作在任何的JVM上)
  2. 可以用JDBC:可以将資料持久化到資料庫。雖然使用JDBC會降低ActiveMQ的性能,但是資料庫一直都是開發人員最熟悉的存儲媒體。将消息存到資料庫,看得見摸得着。而且公司有專門的DBA去對資料庫進行調優,主從分離;
  3. 支援JMS :支援JMS的統一接口;
  4. 支援自動重連;
  5. 有安全機制:支援基于shiro,jaas等多種安全配置機制,可以對Queue/Topic進行認證和授權。
  6. 監控完善:擁有完善的監控,包括Web Console,JMX,Shell指令行,Jolokia的REST API;
  7. 界面友善:提供的Web Console可以滿足大部分情況,還有很多第三方的元件可以使用,如hawtio;
  8. 社群活躍度不及RabbitMQ高;
  9. 根據其他使用者回報,會出莫名其妙的問題,會丢失消息;
  10. 目前重心放到activemq6.0産品-apollo,對5.x的維護較少;
  11. 不适合用于上千個隊列的應用場景;

4.3 RocketMQ

RocketMQ出自 阿裡公司的開源産品,用 Java 語言實作,在設計時參考了 Kafka,并做出了自己的一些改進,消息可靠性上比 Kafka 更好。RocketMQ在阿裡集團被廣泛應用在訂單,交易,充值,流計算,消息推送,日志流式處理,binglog分發等場景。

  1. 是一個隊列模型的消息中間件,具有高性能、高可靠、高實時、分布式特點;
  2. Producer、Consumer、隊列都可以分布式;
  3. Producer向一些隊列輪流發送消息,隊列集合稱為Topic,Consumer如果做廣播消費,則一個consumer執行個體消費這個Topic對應的所有隊列,如果做叢集消費,則多個Consumer執行個體平均消費這個topic對應的隊列集合;
  4. 能夠保證嚴格的消息順序;
  5. 提供豐富的消息拉取模式;
  6. 高效的訂閱者水準擴充能力;
  7. 實時的消息訂閱機制;
  8. 億級消息堆積能力;
  9. 較少的依賴;

使用RocketMQ需要:

  • 安裝git、Maven
  • RocketMQ安裝包

RocketMQ可以運作在Java語言所支援的平台之上。

  1. 單機支援 1 萬以上持久化隊列
  2. RocketMQ 的所有消息都是持久化的,先寫入系統 PAGECACHE,然後刷盤,可以保證記憶體與磁盤都有一份資料,

    通路時,直接從記憶體讀取。

  3. 模型簡單,接口易用(JMS 的接口很多場合并不太實用);
  4. 性能非常好,可以大量堆積消息在broker中;
  5. 支援多種消費,包括叢集消費、廣播消費等。
  6. 各個環節分布式擴充設計,主從HA;
  7. 開發度較活躍,版本更新很快。

支援的用戶端語言不多,目前是java及c++,其中c++不成熟;

RocketMQ社群關注度及成熟度也不及前兩者;

沒有web管理界面,提供了一個CLI(指令行界面)管理工具帶來查詢、管理和診斷各種問題;

沒有在 mq 核心中去實作JMS等接口;

4.4 Kafka

Apache Kafka是一個分布式消息釋出訂閱系統。它最初由LinkedIn公司基于獨特的設計實作為一個分布式的送出日志系統( a distributed commit log),,之後成為Apache項目的一部分。Kafka系統快速、可擴充并且可持久化。它的分區特性,可複制和可容錯都是其不錯的特性。

  1. 快速持久化,可以在O(1)的系統開銷下進行消息持久化;
  2. 高吞吐,在一台普通的伺服器上既可以達到10W/s的吞吐速率;
  3. .完全的分布式系統,Broker、Producer、Consumer都原生自動支援分布式,自動實作負載均衡;
  4. 支援同步和異步複制兩種HA;
  5. 支援資料批量發送和拉取;
  6. zero-copy:減少IO操作步驟;
  7. 資料遷移、擴容對使用者透明;
  8. 無需停機即可擴充機器;
  9. 其他特性:嚴格的消息順序、豐富的消息拉取模型、高效訂閱者水準擴充、實時的消息訂閱、億級的消息堆積能力、定期删除機制;

使用Kafka需要:

  • Kafka安裝包
  1. 用戶端語言豐富,支援java、.net、php、ruby、python、go等多種語言;
  2. 性能卓越,單機寫入TPS約在百萬條/秒,消息大小10個位元組;
  3. 提供完全分布式架構, 并有replica機制, 擁有較高的可用性和可靠性, 理論上支援消息無限堆積;
  4. 支援批量操作;
  5. 消費者采用Pull方式擷取消息, 消息有序, 通過控制能夠保證所有消息被消費且僅被消費一次;
  6. 有優秀的第三方Kafka Web管理界面Kafka-Manager;
  7. 在日志領域比較成熟,被多家公司和多個開源項目使用;
  1. Kafka單機超過64個隊列/分區,Load會發生明顯的飙高現象,隊列越多,load越高,發送消息響應時間變長
  2. 使用短輪詢方式,實時性取決于輪詢間隔時間;
  3. 消費失敗不支援重試;
  4. 支援消息順序,但是一台代理當機後,就會産生消息亂序;
  5. 社群更新較慢;

4.5 RabbitMQ/ActiveMQ/RocketMQ/Kafka對比

這裡列舉了上述四種消息隊列的差異對比:

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

結論:

Kafka在于分布式架構,RabbitMQ基于AMQP協定來實作,RocketMQ/思路來源于kafka,改成了主從結構,在事務性可靠性方面做了優化。廣泛來說,電商、金融等對事務性要求很高的,可以考慮RabbitMQ和RocketMQ,對性能要求高的可考慮Kafka。

五、參考資料: 5.1 消息隊列:

  1. 大型網站架構之分布式消息隊列
  2. 消息隊列的使用場景
  3. 淺談異步消息隊列模型
  4. 消息隊列的兩種模式

5.2 RabbitMQ

  1. RabbitMQ首頁
  2. RabbitMQ學習教程
  3. 專欄:RabbitMQ從入門到精通
  4. RabbitMQ能為你做些什麼
  5. RabbitMQ指南(1)-特性及功能

5.3 ActiveMQ

  1. ActiveMQ首頁
  2. Apache ActiveMQ介紹
  3. ActiveMQ的簡介與安裝
  4. ActiveMQ 和消息簡介

總結:

消息隊列利用高效可靠的消息傳遞機制進行平台無關的資料交流,并基于資料通信來進行分布式系統的內建。目前業界有很多的MQ産品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用資料庫redis充當消息隊列的案例。而這些消息隊列産品,各有側重,在實際選型時,需要結合自身需求及MQ産品特征,綜合考慮。

RabbitMQ的應用場景以及基本原理介紹

1.背景

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實作。

2.應用場景

2.1異步處理

場景說明:使用者注冊後,需要發注冊郵件和注冊短信,傳統的做法有兩種1.串行的方式;2.并行的方式 

(1)串行方式:将注冊資訊寫入資料庫後,發送注冊郵件,再發送注冊短信,以上三個任務全部完成後才傳回給用戶端。 這有一個問題是,郵件,短信并不是必須的,它隻是一個通知,而這種做法讓用戶端等待沒有必要等待的東西. 

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

(2)并行方式:将注冊資訊寫入資料庫後,發送郵件的同時,發送短信,以上三個任務完成後,傳回給用戶端,并行的方式能提高處理的時間。 

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

假設三個業務節點分别使用50ms,串行方式使用時間150ms,并行使用時間100ms。雖然并性已經提高的處理時間,但是,前面說過,郵件和短信對我正常的使用網站沒有任何影響,用戶端沒有必要等着其發送完成才顯示注冊成功,英愛是寫入資料庫後就傳回. 

(3)消息隊列 

引入消息隊列後,把發送郵件,短信不是必須的業務邏輯異步處理 

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

由此可以看出,引入消息隊列後,使用者的響應時間就等于寫入資料庫的時間+寫入消息隊列的時間(可以忽略不計),引入消息隊列後處理後,響應時間是串行的3倍,是并行的2倍。

2.2 應用解耦

場景:雙11是購物狂節,使用者下單後,訂單系統需要通知庫存系統,傳統的做法就是訂單系統調用庫存系統的接口. 

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

這種做法有一個缺點:

  • 當庫存系統出現故障時,訂單就會失敗。(這樣馬雲将少賺好多好多錢^ ^)
  • 訂單系統和庫存系統高耦合. 

    引入消息隊列 

    RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙
  • 訂單系統:使用者下單後,訂單系統完成持久化處理,将消息寫入消息隊列,傳回使用者訂單下單成功。
  • 庫存系統:訂閱下單的消息,擷取下單消息,進行庫操作。 

    就算庫存系統出現故障,消息隊列也能保證消息的可靠投遞,不會導緻消息丢失(馬雲這下高興了).

流量削峰

流量削峰一般在秒殺活動中應用廣泛 

場景:秒殺活動,一般會因為流量過大,導緻應用挂掉,為了解決這個問題,一般在應用前端加入消息隊列。 

作用: 

1.可以控制活動人數,超過此一定閥值的訂單直接丢棄(我為什麼秒殺一次都沒有成功過呢^^) 

2.可以緩解短時間的高流量壓垮應用(應用程式按自己的最大處理能力擷取訂單) 

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

1.使用者的請求,伺服器收到之後,首先寫入消息隊列,加入消息隊列長度超過最大值,則直接抛棄使用者請求或跳轉到錯誤頁面. 

2.秒殺業務根據消息隊列中的請求資訊,再做後續處理.

3.系統架構

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

幾個概念說明: 

Broker:它提供一種傳輸服務,它的角色就是維護一條從生産者到消費者的路線,保證資料能按照指定的方式進行傳輸, 

Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。 

Queue:消息的載體,每個消息都會被投到一個或多個隊列。 

Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來. 

Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。 

vhost:虛拟主機,一個broker裡可以有多個vhost,用作不同使用者的權限分離。 

Producer:消息生産者,就是投遞消息的程式. 

Consumer:消息消費者,就是接受消息的程式. 

Channel:消息通道,在用戶端的每個連接配接裡,可建立多個channel.

4.任務分發機制

4.1Round-robin dispathching循環分發

RabbbitMQ的分發機制非常适合擴充,而且它是專門為并發程式設計的,如果現在load加重,那麼隻需要建立更多的Consumer來進行任務處理。

4.2Message acknowledgment消息确認

為了保證資料不被丢失,RabbitMQ支援消息确認機制,為了保證資料能被正确處理而不僅僅是被Consumer收到,那麼我們不能采用no-ack,而應該是在處理完資料之後發送ack. 

在處理完資料之後發送ack,就是告訴RabbitMQ資料已經被接收,處理完成,RabbitMQ可以安全的删除它了. 

如果Consumer退出了但是沒有發送ack,那麼RabbitMQ就會把這個Message發送到下一個Consumer,這樣就保證在Consumer異常退出情況下資料也不會丢失. 

RabbitMQ它沒有用到逾時機制.RabbitMQ僅僅通過Consumer的連接配接中斷來确認該Message并沒有正确處理,也就是說RabbitMQ給了Consumer足夠長的時間做資料處理。 

如果忘記ack,那麼當Consumer退出時,Mesage會重新分發,然後RabbitMQ會占用越來越多的記憶體.

5.Message durability消息持久化

要持久化隊列queue的持久化需要在聲明時指定durable=True; 

這裡要注意,隊列的名字一定要是Broker中不存在的,不然不能改變此隊列的任何屬性. 

隊列和交換機有一個建立時候指定的标志durable,durable的唯一含義就是具有這個标志的隊列和交換機會在重新開機之後重建立立,它不表示說在隊列中的消息會在重新開機後恢複 

消息持久化包括3部分 

1. exchange持久化,在聲明時指定durable => true

hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的      
  • 1

2.queue持久化,在聲明時指定durable => true

channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久化的      

3.消息持久化,在投遞時指定delivery_mode => 2(1是非持久化).

channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());        

如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的,如果exchange和queue兩者之間有一個持久化,一個非持久化,則不允許建立綁定. 

注意:一旦建立了隊列和交換機,就不能修改其标志了,例如,建立了一個non-durable的隊列,然後想把它改變成durable的,唯一的辦法就是删除這個隊列然後重制建立。

6.Fair dispath 公平分發

你可能也注意到了,分發機制不是那麼優雅,預設狀态下,RabbitMQ将第n個Message分發給第n個Consumer。n是取餘後的,它不管Consumer是否還有unacked Message,隻是按照這個預設的機制進行分發. 

那麼如果有個Consumer工作比較重,那麼就會導緻有的Consumer基本沒事可做,有的Consumer卻毫無休息的機會,那麼,Rabbit是如何處理這種問題呢? 

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

通過basic.qos方法設定prefetch_count=1,這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message,換句話說,在接收到該Consumer的ack前,它不會将新的Message分發給它

channel.basic_qos(prefetch_count=1)       

注意,這種方法可能會導緻queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者建立更多的virtualHost來細化你的設計。

7.分發到多個Consumer

7.1Exchange

先來溫習以下交換機路由的幾種類型: 

Direct Exchange:直接比對,通過Exchange名稱+RountingKey來發送與接收消息. 

Fanout Exchange:廣播訂閱,向所有的消費者釋出消息,但是隻有消費者将隊列綁定到該路由器才能收到消息,忽略Routing Key. 

Topic Exchange:主題比對訂閱,這裡的主題指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.來分隔多個詞,隻有消息這将隊列綁定到該路由器且指定RoutingKey符合比對規則時才能收到消息; 

Headers Exchange:消息頭訂閱,消息釋出前,為消息定義一個或多個鍵值對的消息頭,然後消費者接收消息同時需要定義類似的鍵值對請求頭:(如:x-mactch=all或者x_match=any),隻有請求頭與消息頭比對,才能接收消息,忽略RoutingKey. 

預設的exchange:如果用空字元串去聲明一個exchange,那麼系統就會使用”amq.direct”這個exchange,我們建立一個queue時,預設的都會有一個和建立queue同名的routingKey綁定到這個預設的exchange上去

channel.BasicPublish("", "TaskQueue", properties, bytes);      

因為在第一個參數選擇了預設的exchange,而我們申明的隊列叫TaskQueue,是以預設的,它在建立一個也叫TaskQueue的routingKey,并綁定在預設的exchange上,導緻了我們可以在第二個參數routingKey中寫TaskQueue,這樣它就會找到定義的同名的queue,并把消息放進去。 

如果有兩個接收程式都是用了同一個的queue和相同的routingKey去綁定direct exchange的話,分發的行為是負載均衡的,也就是說第一個是程式1收到,第二個是程式2收到,以此類推。 

如果有兩個接收程式用了各自的queue,但使用相同的routingKey去綁定direct exchange的話,分發的行為是複制的,也就是說每個程式都會收到這個消息的副本。行為相當于fanout類型的exchange。 

下面詳細來說:

7.2 Bindings 綁定

綁定其實就是關聯了exchange和queue,或者這麼說:queue對exchange的内容感興趣,exchange要把它的Message deliver到queue。

7.3Direct exchange

Driect exchange的路由算法非常簡單:通過bindingkey的完全比對,可以用下圖來說明. 

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

Exchange和兩個隊列綁定在一起,Q1的bindingkey是orange,Q2的binding key是black和green. 

當Producer publish key是orange時,exchange會把它放到Q1上,如果是black或green就會到Q2上,其餘的Message被丢棄.

7.4 Multiple bindings

多個queue綁定同一個key也是可以的,對于下圖的例子,Q1和Q2都綁定了black,對于routing key是black的Message,會被deliver到Q1和Q2,其餘的Message都會被丢棄. 

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

7.5 Topic exchange

對于Message的routing_key是有限制的,不能使任意的。格式是以點号“.”分割的字元表。比如:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。你可以放任意的key在routing_key中,當然最長不能超過255 bytes。 

對于routing_key,有兩個特殊字元

  • *(星号)代表任意一個單詞
  • #(hash)0個或多個單詞 Producer發送消息時需要設定routing_key,routing_key包含三個單詞和連個點号o,第一個key描述了celerity(靈巧),第二個是color(色彩),第三個是物種: 在這裡我們建立了兩個綁定: Q1 的binding key 是”.orange.“; Q2 是 “..rabbit” 和 “lazy.#”:
  • Q1感興趣所有orange顔色的動物
  • Q2感興趣所有rabbits和所有的lazy的. 

    例子:rounting_key 為 “quick.orange.rabbit”将會發送到Q1和Q2中 

    rounting_key 為”lazy.orange.rabbit.hujj.ddd”會被投遞到Q2中,#比對0個或多個單詞。

8.消息序列化

RabbitMQ使用ProtoBuf序列化消息,它可作為RabbitMQ的Message的資料格式進行傳輸,由于是結構化的資料,這樣就極大的友善了Consumer的資料高效處理,當然也可以使用XML,與XML相比,ProtoBuf有以下優勢: 

1.簡單 

2.size小了3-10倍 

3.速度快了20-100倍 

4.易于程式設計 

6.減少了語義的歧義. 

,ProtoBuf具有速度和空間的優勢,使得它現在應用非常廣泛

RabbitMQ基礎知識詳解

什麼是MQ?

       MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程式對應用程式的通信方法。MQ是消費-生産者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取隊列中的消息。

      RabbitMQ是MQ的一種。下面詳細介紹一下RabbitMQ的基本概念。

1、隊列、生産者、消費者

      隊列是RabbitMQ的内部對象,用于存儲消息。生産者(下圖中的P)生産消息并投遞到隊列中,消費者(下圖中的C)可以從隊列中擷取消息并消費。

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

      多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

2、Exchange、Binding

      剛才我們看到生産者将消息投遞到隊列中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生産者将消息發送到Exchange(交換器,下圖中的X),再通過Binding将Exchange與Queue關聯起來。

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

3、Exchange Type、Bingding key、routing key

      在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key。在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。

      生産者在将消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,生産者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪裡。

      RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。

      fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。

      direct:把消息投遞到那些binding key與routing key完全比對的隊列中。

      topic:将消息路由到binding key與routing key模式比對的隊列中。

      附上一張RabbitMQ的結構圖:

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

最後來具體解析一下幾個問題:

1、可以自動建立隊列,也可以手動建立隊列,如果自動建立隊列,那麼是誰負責建立隊列呢?是生産者?還是消費者? 

      如果隊列不存在,當然消費者不會收到任何的消息。但是如果隊列不存在,那麼生産者發送的消息就會丢失。是以,為了資料不丢失,消費者和生産者都可以建立隊列。那麼如果建立一個已經存在的隊列呢?那麼不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次建立如果參數和第一次不一樣,那麼該操作雖然成功,但是隊列屬性并不會改變。

      隊列對于負載均衡的處理是完美的。對于多個消費者來說,RabbitMQ使用輪詢的方式均衡的發送給不同的消費者。

2、RabbitMQ的消息确認機制

      預設情況下,如果消息已經被某個消費者正确的接收到了,那麼該消息就會被從隊列中移除。當然也可以讓同一個消息發送到很多的消費者。

      如果一個隊列沒有消費者,那麼,如果這個隊列有資料到達,那麼這個資料會被緩存,不會被丢棄。當有消費者時,這個資料會被立即發送到這個消費者,這個資料被消費者正确收到時,這個資料就被從隊列中删除。

     那麼什麼是正确收到呢?通過ack。每個消息都要被acknowledged(确認,ack)。我們可以顯示的在程式中去ack,也可以自動的ack。如果有資料沒有被ack,那麼:

     RabbitMQ Server會把這個資訊發送到下一個消費者。

     如果這個app有bug,忘記了ack,那麼RabbitMQServer不會再發送資料給它,因為Server認為這個消費者處理能力有限。

    而且ack的機制可以起到限流的作用(Benefitto throttling):在消費者處理完成資料後發送ack,甚至在額外的延時後發送ack,将有效的均衡消費者的負載。

RabbitMQ布曙

2017.05.06 16:03* 字數 4884 閱讀 34889評論 13喜歡 160

關于消息隊列,從前年開始斷斷續續看了些資料,想寫很久了,但一直沒騰出空,近來分别碰到幾個朋友聊這塊的技術選型,是時候把這塊的知識整理記錄一下了。

市面上的消息隊列産品有很多,比如老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,還有 ZeroMQ ,去年底阿裡巴巴捐贈給 Apache 的 RocketMQ ,連 redis 這樣的 NoSQL 資料庫也支援 MQ 功能。總之這塊知名的産品就有十幾種,就我自己的使用經驗和興趣隻打算談談 RabbitMQ、Kafka 和 ActiveMQ ,本文先講 RabbitMQ ,在此之前先看下消息隊列的相關概念。

什麼叫消息隊列

消息(Message)是指在應用間傳送的資料。消息可以非常簡單,比如隻包含文本字元串,也可以更複雜,可能包含嵌入對象。

消息隊列(Message Queue)是一種應用間的通信方式,消息發送後可以立即傳回,由消息系統來確定消息的可靠傳遞。消息釋出者隻管把消息釋出到 MQ 中而不用管誰來取,消息使用者隻管從 MQ 中取消息而不管是誰釋出的。這樣釋出者和使用者都不用知道對方的存在。

為何用消息隊列

從上面的描述中可以看出消息隊列是一種應用間的異步協作機制,那什麼時候需要使用 MQ 呢?

以常見的訂單系統為例,使用者點選【下單】按鈕之後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一起同步執行,随着業務的發展訂單量增長,需要提升系統服務的性能,這時可以将一些不需要立即生效的操作拆分出來異步執行,比如發放紅包、發短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之後發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。

以上是用于業務解耦的情況,其它常見場景包括最終一緻性、廣播、錯峰流控等等。

RabbitMQ 特點

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實作。

AMQP :Advanced Message Queue,進階消息隊列協定。它是應用層協定的一個開放标準,為面向消息的中間件設計,基于此協定的用戶端與消息中間件可傳遞消息,并不受産品、開發語言等條件的限制。

RabbitMQ 最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴充性、高可用性等方面表現不俗。具體特點包括:

  1. 可靠性(Reliability)

    RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸确認、釋出确認。

  2. 靈活的路由(Flexible Routing)

    在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經提供了一些内置的 Exchange 來實作。針對更複雜的路由功能,可以将多個 Exchange 綁定在一起,也通過插件機制實作自己的 Exchange 。

  3. 消息叢集(Clustering)

    多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。

  4. 高可用(Highly Available Queues)

    隊列可以在叢集中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。

  5. 多種協定(Multi-protocol)

    RabbitMQ 支援多種消息隊列協定,比如 STOMP、MQTT 等等。

  6. 多語言用戶端(Many Clients)

    RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby 等等。

  7. 管理界面(Management UI)

    RabbitMQ 提供了一個易用的使用者界面,使得使用者可以監控和管理消息 Broker 的許多方面。

  8. 跟蹤機制(Tracing)

    如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什麼。

  9. 插件機制(Plugin System)

    RabbitMQ 提供了許多插件,來從多方面進行擴充,也可以編寫自己的插件。

RabbitMQ 中的概念模型

消息模型

所有 MQ 産品從模型抽象上來說都是一樣的過程:

消費者(consumer)訂閱某個隊列。生産者(producer)建立消息,然後釋出到隊列(queue)中,最後将消息發送到監聽的消費者。

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

消息流

RabbitMQ 基本概念

上面隻是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念需要解釋。上面介紹過 RabbitMQ 是 AMQP 協定的一個開源實作,是以其内部實際上也是 AMQP 中的基本概念:

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

RabbitMQ 内部結構

  1. Message

    消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。

  2. Publisher

    消息的生産者,也是一個向交換器釋出消息的用戶端應用程式。

  3. Exchange

    交換器,用來接收生産者發送的消息并将這些消息路由給伺服器中的隊列。

  4. Binding

    綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵将交換器和消息隊列連接配接起來的路由規則,是以可以将交換器了解成一個由綁定構成的路由表。

  5. Queue

    消息隊列,用來儲存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裡面,等待消費者連接配接到這個隊列将其取走。

  6. Connection

    網絡連接配接,比如一個TCP連接配接。

  7. Channel

    信道,多路複用連接配接中的一條獨立的雙向資料流通道。信道是建立在真實的TCP連接配接内地虛拟連接配接,AMQP 指令都是通過信道發出去的,不管是釋出消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于作業系統來說建立和銷毀 TCP 都是非常昂貴的開銷,是以引入了信道的概念,以複用一條 TCP 連接配接。

  8. Consumer

    消息的消費者,表示一個從消息隊列中取得消息的用戶端應用程式。

  9. Virtual Host

    虛拟主機,表示一批交換器、消息隊列和相關對象。虛拟主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接配接時指定,RabbitMQ 預設的 vhost 是 / 。

  10. Broker

    表示消息隊列伺服器實體。

AMQP 中的消息路由

AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生産者把消息釋出到 Exchange 上,消息最終到達隊列并被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

AMQP 的消息路由過程

Exchange 類型

Exchange分發消息時根據類型的不同分發政策有差別,目前共四種類型:direct、fanout、topic、headers 。headers 比對 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一緻,但性能差很多,目前幾乎用不到了,是以直接看另外三種類型:

  1. direct
    RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

    direct 交換器

    消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一緻, 交換器就将消息發到對應的隊列中。路由鍵與隊列名完全比對,如果一個隊列綁定到交換機要求路由鍵為“dog”,則隻轉發 routing key 标記為“dog”的消息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全比對、單點傳播的模式。

  2. fanout
    RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

    fanout 交換器

    每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,隻是簡單的将隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每台子網内的主機都獲得了一份複制的消息。fanout 類型轉發消息是最快的。

  3. topic
    RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

    topic 交換器

    topic 交換器通過模式比對配置設定消息的路由鍵屬性,将路由鍵和某個模式進行比對,此時隊列需要綁定到一個模式上。它将路由鍵和綁定鍵的字元串切分成單詞,這些單詞之間用點隔開。它同樣也會識别兩個通配符:符号“#”和符号“”。#比對0個或多個單詞,比對不多不少一個單詞。

RabbitMQ 安裝

一般來說安裝 RabbitMQ 之前要安裝 Erlang ,可以去​​Erlang官網​​下載下傳。接着去​​RabbitMQ官網​​下載下傳安裝包,之後解壓縮即可。根據作業系統不同官網提供了相應的安裝說明:​​Windows​​、​​Debian / Ubuntu​​、​​RPM-based Linux​​、​​Mac​​

如果是Mac 使用者,個人推薦使用 HomeBrew 來安裝,安裝前要先更新 brew:

brew update
      

接着安裝 rabbitmq 伺服器:

brew install rabbitmq
      

這樣 RabbitMQ 就安裝好了,安裝過程中會自動其所依賴的 Erlang 。

RabbitMQ 運作和管理

  1. 啟動

    啟動很簡單,找到安裝後的 RabbitMQ 所在目錄下的 sbin 目錄,可以看到該目錄下有6個以 rabbitmq 開頭的可執行檔案,直接執行 rabbitmq-server 即可,下面将 RabbitMQ 的安裝位置以 . 代替,啟動指令就是:

./sbin/rabbitmq-server
      

啟動正常的話會看到一些啟動過程資訊和最後的 completed with 7 plugins,這也說明啟動的時候預設加載了7個插件。

RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙

正常啟動

  1. 背景啟動

    如果想讓 RabbitMQ 以守護程式的方式在背景運作,可以在啟動的時候加上 -detached 參數:

./sbin/rabbitmq-server -detached
      
  1. 查詢伺服器狀态

    sbin 目錄下有個特别重要的檔案叫 rabbitmqctl ,它提供了 RabbitMQ 管理需要的幾乎一站式解決方案,絕大部分的運維指令它都可以提供。

    查詢 RabbitMQ 伺服器的狀态資訊可以用參數 status :

./sbin/rabbitmqctl status
      

該指令将輸出伺服器的很多資訊,比如 RabbitMQ 和 Erlang 的版本、OS 名稱、記憶體等等

  1. 關閉 RabbitMQ 節點

    我們知道 RabbitMQ 是用 Erlang 語言寫的,在Erlang 中有兩個概念:節點和應用程式。節點就是 Erlang 虛拟機的每個執行個體,而多個 Erlang 應用程式可以運作在同一個節點之上。節點之間可以進行本地通信(不管他們是不是運作在同一台伺服器之上)。比如一個運作在節點A上的應用程式可以調用節點B上應用程式的方法,就好像調用本地函數一樣。如果應用程式由于某些原因奔潰,Erlang 節點會自動嘗試重新開機應用程式。

    如果要關閉整個 RabbitMQ 節點可以用參數 stop :

./sbin/rabbitmqctl stop
      

它會和本地節點通信并訓示其幹淨的關閉,也可以指定關閉不同的節點,包括遠端節點,隻需要傳入參數 -n :

./sbin/rabbitmqctl -n [email protected] stop 
      

-n node 預設 node 名稱是 rabbit@server ,如果你的主機名是 ​​server.example.com​​ ,那麼 node 名稱就是 ​​[email protected]​​ 。

  1. 關閉 RabbitMQ 應用程式

    如果隻想關閉應用程式,同時保持 Erlang 節點運作則可以用 stop_app:

./sbin/rabbitmqctl stop_app
      

這個指令在後面要講的叢集模式中将會很有用。

  1. 啟動 RabbitMQ 應用程式
./sbin/rabbitmqctl start_app
      
  1. 重置 RabbitMQ 節點
./sbin/rabbitmqctl reset
      

該指令将清除所有的隊列。

  1. 檢視已聲明的隊列
./sbin/rabbitmqctl list_queues
      
  1. 檢視交換器
./sbin/rabbitmqctl list_exchanges
      

該指令還可以附加參數,比如列出交換器的名稱、類型、是否持久化、是否自動删除:

./sbin/rabbitmqctl list_exchanges name type durable auto_delete
      
  1. 檢視綁定
./sbin/rabbitmqctl list_bindings
      

Java 用戶端通路

RabbitMQ 支援多種語言通路,以 Java 為例看下一般使用 RabbitMQ 的步驟。

  1. maven工程的pom檔案中添加依賴
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>
      
  1. 消息生産者
package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        //設定 RabbitMQ 位址
        factory.setHost("localhost");
        //建立到代理伺服器到連接配接
        Connection conn = factory.newConnection();
        //獲得信道
        Channel channel = conn.createChannel();
        //聲明交換器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);

        String routingKey = "hola";
        //釋出消息
        byte[] messageBodyBytes = "quit".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();
        conn.close();
    }
}
      
  1. 消息消費者
package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        //建立到代理伺服器到連接配接
        Connection conn = factory.newConnection();
        //獲得信道
        final Channel channel = conn.createChannel();
        //聲明交換器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        //聲明隊列
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "hola";
        //綁定隊列,通過鍵 hola 将隊列和交換器綁定起來
        channel.queueBind(queueName, exchangeName, routingKey);

        while(true) {
            //消費消息
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    System.out.println("消費的路由鍵:" + routingKey);
                    System.out.println("消費的内容類型:" + contentType);
                    long deliveryTag = envelope.getDeliveryTag();
                    //确認消息
                    channel.basicAck(deliveryTag, false);
                    System.out.println("消費的消息體内容:");
                    String bodyStr = new String(body, "UTF-8");
                    System.out.println(bodyStr);

                }
            });
        }
    }
}
      
  1. 啟動 RabbitMQ 伺服器
./sbin/rabbitmq-server
      
  1. 運作 Consumer

    先運作 Consumer ,這樣當生産者發送消息的時候能在消費者後端看到消息記錄。

  2. 運作 Producer

    接着運作 Producer ,釋出一條消息,在 Consumer 的控制台能看到接收的消息:

    RabbitMQ,Apache的ActiveMQ,阿裡RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可實作消息隊列,RabbitMQ的應用場景以及基本原理介紹,RabbitMQ基礎知識詳解,RabbitMQ布曙
    Consumer 控制台

RabbitMQ 叢集

RabbitMQ 最優秀的功能之一就是内建叢集,這個功能設計的目的是允許消費者和生産者在節點崩潰的情況下繼續運作,以及通過添加更多的節點來線性擴充消息通信吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通信架構 OTP 來滿足上述需求,使用戶端在失去一個 RabbitMQ 節點連接配接的情況下,還是能夠重新連接配接到叢集中的任何其他節點繼續生産、消費消息。

RabbitMQ 叢集中的一些概念

RabbitMQ 會始終記錄以下四種類型的内部中繼資料:

  1. 隊列中繼資料

    包括隊列名稱和它們的屬性,比如是否可持久化,是否自動删除

  2. 交換器中繼資料

    交換器名稱、類型、屬性

  3. 綁定中繼資料

    内部是一張表格記錄如何将消息路由到隊列

  4. vhost 中繼資料

    為 vhost 内部的隊列、交換器、綁定提供命名空間和安全屬性

在單一節點中,RabbitMQ 會将所有這些資訊存儲在記憶體中,同時将标記為可持久化的隊列、交換器、綁定存儲到硬碟上。存到硬碟上可以確定隊列和交換器在節點重新開機後能夠重建。而在叢集模式下同樣也提供兩種選擇:存到硬碟上(獨立節點的預設設定),存在記憶體中。

如果在叢集中建立隊列,叢集隻會在單個節點而不是所有節點上建立完整的隊列資訊(中繼資料、狀态、内容)。結果是隻有隊列的所有者節點知道有關隊列的所有資訊,是以當叢集節點崩潰時,該節點的隊列和綁定就消失了,并且任何比對該隊列的綁定的新消息也丢失了。還好RabbitMQ 2.6.0之後提供了鏡像隊列以避免叢集節點故障導緻的隊列内容不可用。

RabbitMQ 叢集中可以共享 user、vhost、exchange等,所有的資料和狀态都是必須在所有節點上複制的,例外就是上面所說的消息隊列。RabbitMQ 節點可以動态的加入到叢集中。

當在叢集中聲明隊列、交換器、綁定的時候,這些操作會直到所有叢集節點都成功送出中繼資料變更後才傳回。叢集中有記憶體節點和磁盤節點兩種類型,記憶體節點雖然不寫入磁盤,但是它的執行比磁盤節點要好。記憶體節點可以提供出色的性能,磁盤節點能保障配置資訊在節點重新開機後仍然可用,那叢集中如何平衡這兩者呢?

RabbitMQ 隻要求叢集中至少有一個磁盤節點,所有其他節點可以是記憶體節點,當節點加入火離開叢集時,它們必須要将該變更通知到至少一個磁盤節點。如果隻有一個磁盤節點,剛好又是該節點崩潰了,那麼叢集可以繼續路由消息,但不能建立隊列、建立交換器、建立綁定、添加使用者、更改權限、添加或删除叢集節點。換句話說叢集中的唯一磁盤節點崩潰的話,叢集仍然可以運作,但知道該節點恢複,否則無法更改任何東西。

RabbitMQ 叢集配置和啟動

如果是在一台機器上同時啟動多個 RabbitMQ 節點來組建叢集的話,隻用上面介紹的方式啟動第二、第三個節點将會因為節點名稱和端口沖突導緻啟動失敗。是以在每次調用 rabbitmq-server 指令前,設定環境變量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 來明确指定唯一的節點名稱和端口。下面的例子端口号從5672開始,每個新啟動的節點都加1,節點也分别命名為test_rabbit_1、test_rabbit_2、test_rabbit_3。

啟動第1個節點:

RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
      

啟動第2個節點:

RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached
      

啟動第2個節點前建議将 RabbitMQ 預設激活的插件關掉,否則會存在使用了某個插件的端口号沖突,導緻節點啟動不成功。

現在第2個節點和第1個節點都是獨立節點,它們并不知道其他節點的存在。叢集中除第一個節點外後加入的節點需要擷取叢集中的中繼資料,是以要先停止 Erlang 節點上運作的 RabbitMQ 應用程式,并重置該節點中繼資料,再加入并且擷取叢集的中繼資料,最後重新啟動 RabbitMQ 應用程式。

停止第2個節點的應用程式:

./sbin/rabbitmqctl -n test_rabbit_2 stop_app
      

重置第2個節點中繼資料:

./sbin/rabbitmqctl -n test_rabbit_2 reset
      

第2節點加入第1個節點組成的叢集:

./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
      

啟動第2個節點的應用程式

./sbin/rabbitmqctl -n test_rabbit_2 start_app
      

第3個節點的配置過程和第2個節點類似:

RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached

./sbin/rabbitmqctl -n test_rabbit_3 stop_app

./sbin/rabbitmqctl -n test_rabbit_3 reset

./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost

./sbin/rabbitmqctl -n test_rabbit_3 start_app
      
RabbitMQ 叢集運維

停止某個指定的節點,比如停止第2個節點:

RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop
      

檢視節點3的叢集狀态:

./sbin/rabbitmqctl -n test_rabbit_3 cluster_status
      

繼續閱讀