天天看點

消息中間件RabbitMQ簡介

消息隊列

消息(Message)是指在應用間傳送的資料,帶有某種資訊的信号。消息機制的三大要點:消息隊列、消息循環(分發)、消息處理。

消息隊列(Message Queue,MQ)是一種應用間的通信方式,消息發送後可以立即傳回,由消息系統來確定消息的可靠傳遞。消息釋出者隻管把消息釋出到MQ中,消息消費者隻管從MQ中取消息。釋出者和消費者沒有同步接口調用邏輯,即所謂解耦。

作用 & 優勢:

業務解耦、異步(将消息寫入消息隊列,非必要的業務邏輯以異步的方式運作,加快響應速度)、削峰填谷(平滑消費消息)、流量控制、廣播消息、最終一緻性。

缺點:

  • 降低系統可用性,增加系統元件;
  • 增加系統複雜,比如:叢集部署時的一緻性問題、如何保證消息不被重複消費,如何保證保證消息可靠傳輸;

消息隊列的架構圖

消息隊列從邏輯角度而言,可以分為四大子產品:生産者子產品、存儲子產品、消費者子產品和配置管理子產品,每個子產品均可以叢集方式部署。

生産者子產品:主要負責接收上遊系統發送過來的資料。解決如何處理大量的連接配接請求,并且快速的處理發送過來的資料,将資料以适當的形式進行組裝,然後發送給存儲子產品。

存儲子產品:主要負責存儲生産者子產品發送過來的資料,然後将存儲的資料傳遞給消費者子產品。

該子產品需要解決如何序列化資料并存儲,以及如何讀取序列化的資料,該子產品的性能對消息隊列整體的性能影響較大。一方面,如果序列化及存儲性能很高,那麼接收資料的性能也會很高。另一方面,如果讀取序列化資料的性能很高,那麼發送的性能也會很高,是以,該子產品決定消息隊列的服務能力。

消費者子產品:主要負責消費存儲子產品的資料。該子產品主要解決的是消費邏輯,比如:是廣播消費還是叢集消費等,也可以簡單了解是廣播還是單點傳播。

Config Server:主要作為注冊中心,存儲元件的中繼資料,以及維護發送和消費關系等。

如果進一步将各個子產品分解的話,可以發現每一個子產品都包含若幹子子產品。現分解如下:

生産者子產品:主要有兩個部分構成。一個是接收上遊的資料;另一個是協定處理,負責将接收到的資料根據約定的協定進行解析,然後傳遞給存儲子產品。

存儲子產品:主要包括,資料接收處理、資料辨別生成、資料寫入和資料讀取。該部分主要解決資料如何序列化及如何存儲。

消費者子產品:主要解決消費的邏輯處理,如:廣播模式、叢集模式和分組管理等。

消息中間件

消息中間件,Message Oriented Middleware,MOM,已知開源的有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ等,基本上大同小異,但是各自有着不同的應用場景和特點;Kafka注重的是消息的吞吐量,不保證消息存儲的可靠性以及一緻性,是以多用于日志系統資料的上報; RabbitMQ能保證消息可靠存儲投遞。

RabbitMQ

RabbitMQ,Erlang 語言開發的 AMQP 的開源實作。

AMQP :Advanced Message Queue,進階消息隊列協定。它是應用層協定的一個開放标準,為面向消息的中間件設計,基于此協定的用戶端與消息中間件可傳遞消息,不受産品、開發語言等條件的限制。

RabbitMQ 最初起源于金融系統,用于在分布式系統中存儲轉發消息。特點:

  • 可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸确認、發送應答、釋出确認。
  • 靈活路由(Flexible Routing) 在消息進入隊列之前,通過 Exchange 來路由/轉發消息的。對于典型的路由功能,RabbitMQ 已經提供一些内置的 Exchange 來實作。針對更複雜的路由功能,可以将多個 Exchange 綁定/組合在一起,也通過插件機制實作自己的 Exchange 。
  • 消息叢集(Clustering) 多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。
  • 高可用(Highly Available Queues) 支援跨機器叢集,支援隊列安全鏡像備份,即隊列可以在叢集中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
  • 多種協定(Multi-protocol) 支援多種消息隊列協定,比如 STOMP、MQTT 等。
  • 多語言用戶端(Many Clients)
  • 管理界面(Management UI) 提供一個易用的使用者界面,使得使用者可以監控和管理消息 Broker 的許多方面。
  • 擴充性:支援負載均衡,動态增減伺服器簡單友善。
  • 跟蹤機制(Tracing) 如果消息異常,RabbitMQ 提供消息跟蹤機制。
  • 權限管理:靈活的使用者角色權限管理,Virtual Host 是權限控制的最小粒度。
  • 插件機制(Plugin System) 提供許多插件,來從多方面進行擴充,也可以編寫自己的插件。

安裝

Windows安裝

需要先安裝erlang開發環境,下載下傳exe程式otp_win64_19.2.exe,一直next到死。

添加環境變量:​​

​ERLANG_HOME=D:\Program Files\erlx.x.x​

​​(erlang安裝路徑);

更新PATH:​​

​%ERLANG_HOME%\bin;​

​​ 驗證ErLang安裝情況,輸入指令:​

​erl​

​ 然後下載下傳Rabbit-server-3.6.12.exe,一路Next即可完成安裝,自帶安裝成service的腳本。

目錄介紹:

  • ebin:一些調用的erlang程式
  • etc:配置檔案
  • include:依賴檔案
  • sbin:一些bat腳本,用來運作、控制、管理rabbitmq
  • rabbitmq-server.bat 以應用方式啟動rabbitmq;
  • rabbitmq-service.bat 以服務方式啟動rabbitmq;
  • rabbitmqctl.bat,rabbitmq管理工具;
  • rabbitmq-plugins.bat 擴充插件管理工具
  • plugins:插件

    執行指令:​​

    ​rabbitmq-plugins enable rabbitmq_management​

    ​​安裝管理插件。

    安裝完成之後,登入管理頁面http://127.0.0.1:15672/,預設賬号和密碼:guest/guest;

    ​​

    ​C:/Users/<user>/AppData/Roaming/RabbitMQ/log/[email protected]

    ​ 檢視 RabbitMQ 的日志資訊。

端口:

  • 4369 (epmd)
  • 5672, 5671 (AMQP 0-9-1 and 1.0 without and with TLS)
  • 25672,這個端口用于Erlang分布節點間和CLI工具溝通,是在動态範圍内配置設定的(預設情況下限制在一個單獨的端口,計算方法:AMQP端口+20000)。
  • 15672 (rabbitmq web管理工具插件)
  • 61613, 61614 (STOMP 插件)
  • 1883, 8883 (if MQTT 插件)

Linux安裝

Mac 安裝

​brew install rabbitmq​

​ Max 系統的 brew 自動其所依賴的 Erlang。

常用指令

rabbitmq-server

運作​

​rabbitmq-server -detached​

​指令來重新開機服務并背景運作。

rabbitmqctl

​rabbitmqctl​

​​,提供幾乎 RabbitMQ 管理所需的一站式解決方案:

​​

​rabbitmqctl status​

​​:查詢 RabbitMQ 伺服器的狀态資訊,指令輸出伺服器資訊:如 RabbitMQ 和 Erlang 的版本、OS 名稱、記憶體等;

​​

​rabbitmqctl reset​

​​:重置 RabbitMQ 節點;

​​

​rabbitmqctl start_app​

​​:啟動 RabbitMQ 應用程式;

​​

​rabbitmqctl list_queues​

​​:檢視已聲明的隊列;

​​

​rabbitmqctl list_exchanges name type durable auto_delete​

​​:檢視交換器,附加參數,比如列出交換器的名稱、類型、是否持久化、是否自動删除;

​​

​rabbitmqctl list_bindings​

​:檢視綁定;

背景知識

在Erlang 中有兩個概念:節點和應用程式。節點就是 Erlang 虛拟機的每個執行個體,而多個 Erlang 應用程式可以運作在同一個節點之上。節點之間可以進行本地通信(不管是不是運作在同一台伺服器之上)。比如一個運作在節點A上的應用程式可以調用節點B上應用程式的方法,就好像調用本地函數一樣。如果應用程式由于某些原因奔潰,Erlang 節點會自動嘗試重新開機應用程式。

​​

​rabbitmqctl -n [email protected] stop​

​​:關閉整個 RabbitMQ 節點可以用參數 stop。預設情況下,不指定 ​

​-n​

​​ 選項時,它會和本地節點通信并訓示其幹淨的關閉;指定 ​

​-n(node)​

​​ 選項後,可以指定關閉遠端節點,node 預設名稱是 rabbit@server;

​​

​rabbitmqctl stopapp​

​:隻想關閉應用程式,同時保持 Erlang 節點運作,該指令将清除所有的隊列。

概念

ConnectionFactory連接配接管理器:使用者與MQ建立的連接配接管理器;

Channel信道:消息推送使用的通道;

Exchange交換機:用于接收、配置設定消息;

Queue隊列:用于存儲生産者的消息;

RoutingKey路由鍵:用于把生産者的資料配置設定(路由)到交換機上;

BingDingKey綁定建:用于把交換器的消息綁定到隊列中。

三種廣播模式

fanout:凡事綁定到此交換機和隊列都可以接受消息;

direct:通過路由鍵和交換機決定唯一的隊列可以接受消息;

topic:所有符合路由鍵所綁定的隊列都可以接受消息。

TTL

TTL是MQ中一個消息或者隊列的屬性,表明一條消息或者隊列中所有消息或者隊列的最大存活時間,機關是毫秒。如果一條消息設定了TTL屬性,或者進入了設定TTL的隊列,如果這條消息在TTL内的時間未被消費則該條消息則變成死信,如果配置了消息的TTL和隊列的TTL則較小的那個值會被使用。

設定TTL的2種方式:

  1. 建立隊列時設定隊列的​

    ​x-message-ttl​

    ​屬性:
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);      
  1. 針對每條消息設定TTL:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
channel.basicPublish(exchangeName, routingKey, mandatory, builder.build(), "msg body".getBytes());      

差別:

如果設定隊列的TTL,如果消息過期則被隊列丢棄;而第二種即使消息過期也不會馬上被丢棄, 因為消息是否過期是在即将投遞到消費者之前被判定的。此外,如果不設定TTL則表示消息永遠不會過期,消息過期則變成死信。

延遲隊列

用來存放需要在指定時間被處理的元素的隊列,延遲隊列中的元素都是帶時間屬性(TTL)的

釋出模式

  • 單對單:單發送、單接收;
  • 單對多:一個發送端,多個接收端,如分布式的任務派發;
  • 釋出訂閱模式:
  • 按路由規則發送接收:
  • 主題:Exchange Type 為 topic,發送消息時需要指定交換機及 Routing Key,消費者的消息隊列綁定到該交換機并比對到 Routing Key 實作消息的訂閱,訂閱後則可接收消息。隻有消費者将隊列綁定到該交換機且指定的 Routing Key 符合比對規則,才能收到消息。Routing Key 可以設定成通配符,如:​

    ​*或 #​

    ​​(*表示比對 Routing Key 中的某個單詞,# 表示任意的 Routing Key 的消息都能被收到)。如果 Routing Key 由多個單詞組成,則單詞之間用 ​

    ​.​

    ​ 來分隔。
  • RPC(即遠端存儲調用)

命名規範:

交換機名的命名建議:Ex{AppID}.{自定義 ExchangeName},隊列名的命名建議:MQ{AppID}.{自定義 QueueName} 。

用戶端

Java 用戶端

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>      

spring 內建 RabbitMQ

<beans>
    <!-- 連接配接工廠,提供channel,單點模式下隻需配置rabbitmq server的host和port -->
    <rabbit:connection-factory
            id="connectionFactory"
            addresses="${rabbitmq.addresses}"
            username="${rabbitmq.username}"
            password="${rabbitmq.password}"
            publisher-confirms="true"
            connection-timeout="1000"
            cache-mode="CHANNEL"/>
    <!-- 管理元件 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- 存放消費失敗消息的queue -->
    <rabbit:queue name="alert.queue.dead"/>

    <!-- 同步消息發送元件,使用jsonMessageConverter -->
    <bean id="rabbitTemplate" class="com.demo.RabbitTemplate">
        <constructor-arg ref="connectionFactory"/>
        <property name="messageConverter" ref="jsonConverter"/>
        <property name="retryTemplate" ref="retryTemplate"/>
        <property name="recoveryCallback" ref="recoveryCallback"/>
    </bean>

    <!-- 異步消息發送元件,使用jsonMessageConverter -->
    <bean id="asyncRabbitTemplate" class="com.demo.RabbitTemplate" lazy-init="true">
        <constructor-arg ref="connectionFactory"/>
        <property name="messageConverter" ref="jsonConverter"/>
        <property name="retryTemplate" ref="retryTemplate"/>
        <property name="recoveryCallback" ref="recoveryCallback"/>
        <property name="sync" value="false"/>
        <property name="taskExecutor" ref="rabbitProduceExecutor"/>
    </bean>

    <!-- 消息所有重發失敗後的回調方法 -->
    <bean id="recoveryCallback"
        class="com.demo.RabbitRecoveryCallback"/>

    <!-- 消費重試Advice -->
    <bean id="retryAdvice" class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
        <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
        <property name="retryOperations" ref="retryTemplate"/>
    </bean>
    <bean id="rejectAndDontRequeueRecoverer" class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer"/>

    <!-- 使用json進行消息序列化和反序列化 -->
    <bean id="jsonConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

    <!-- 重試政策 -->
    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="50"/>
                <property name="multiplier" value="10.0"/>
                <property name="maxInterval" value="5000"/>
            </bean>
        </property>
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="4"/>
            </bean>
        </property>
    </bean>

<!--為了獲得盡可能好的性能,此處參數配置需要經過反複測試得到-->
    <!-- 異步消息發送線程池 -->
    <bean id="rabbitProduceExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
          lazy-init="true">
        <property name="corePoolSize" value="3"/>
        <property name="maxPoolSize" value="50"/>
        <property name="queueCapacity" value="1000"/>
        <property name="keepAliveSeconds" value="300"/>
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/>
        </property>
    </bean>
</beans>      

spring boot 內建 RabbitMQ

可靠性

RabbitMQ叢集

參考