消息隊列
消息(Message)是指在應用間傳送的資料,帶有某種資訊的信号。消息機制的三大要點:消息隊列、消息循環(分發)、消息處理。
消息隊列(Message Queue,MQ)是一種應用間的通信方式,消息發送後可以立即傳回,由消息系統來確定消息的可靠傳遞。消息釋出者隻管把消息釋出到MQ中,消息消費者隻管從MQ中取消息。釋出者和消費者沒有同步接口調用邏輯,即所謂解耦。
作用 & 優勢:
業務解耦、異步(将消息寫入消息隊列,非必要的業務邏輯以異步的方式運作,加快響應速度)、削峰填谷(平滑消費消息)、流量控制、廣播消息、最終一緻性。
缺點:
- 降低系統可用性,增加系統元件;
- 增加系統複雜,比如:叢集部署時的一緻性問題、如何保證消息不被重複消費,如何保證保證消息可靠傳輸;
消息隊列的架構圖
消息隊列從邏輯角度而言,可以分為四大子產品:生産者子產品、存儲子產品、消費者子產品和配置管理子產品,每個子產品均可以叢集方式部署。
生産者子產品:主要負責接收上遊系統發送過來的資料。解決如何處理大量的連接配接請求,并且快速的處理發送過來的資料,将資料以适當的形式進行組裝,然後發送給存儲子產品。
存儲子產品:主要負責存儲生産者子產品發送過來的資料,然後将存儲的資料傳遞給消費者子產品。
該子產品需要解決如何序列化資料并存儲,以及如何讀取序列化的資料,該子產品的性能對消息隊列整體的性能影響較大。一方面,如果序列化及存儲性能很高,那麼接收資料的性能也會很高。另一方面,如果讀取序列化資料的性能很高,那麼發送的性能也會很高,是以,該子產品決定消息隊列的服務能力。
消費者子產品:主要負責消費存儲子產品的資料。該子產品主要解決的是消費邏輯,比如:是廣播消費還是叢集消費等,也可以簡單了解是廣播還是單點傳播。
Config Server:主要作為注冊中心,存儲元件的中繼資料,以及維護發送和消費關系等。
如果進一步将各個子產品分解的話,可以發現每一個子產品都包含若幹子子產品。現分解如下:
生産者子產品:主要有兩個部分構成。一個是接收上遊的資料;另一個是協定處理,負責将接收到的資料根據約定的協定進行解析,然後傳遞給存儲子產品。
存儲子產品:主要包括,資料接收處理、資料辨別生成、資料寫入和資料讀取。該部分主要解決資料如何序列化及如何存儲。
消費者子產品:主要解決消費的邏輯處理,如:廣播模式、叢集模式和分組管理等。
消息中間件
消息中間件,Message Oriented Middleware,MOM,已知開源的有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ等,基本上大同小異,但是各自有着不同的應用場景和特點;Kafka注重的是消息的吞吐量,不保證消息存儲的可靠性以及一緻性,是以多用于日志系統資料的上報; RabbitMQ能保證消息可靠存儲投遞。
RabbitMQ
RabbitMQ,Erlang 語言開發的 AMQP 的開源實作。
AMQP :Advanced Message Queue,進階消息隊列協定。它是應用層協定的一個開放标準,為面向消息的中間件設計,基于此協定的用戶端與消息中間件可傳遞消息,不受産品、開發語言等條件的限制。
RabbitMQ 最初起源于金融系統,用于在分布式系統中存儲轉發消息。特點:
- 可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸确認、發送應答、釋出确認。
- 靈活路由(Flexible Routing) 在消息進入隊列之前,通過 Exchange 來路由/轉發消息的。對于典型的路由功能,RabbitMQ 已經提供一些内置的 Exchange 來實作。針對更複雜的路由功能,可以将多個 Exchange 綁定/組合在一起,也通過插件機制實作自己的 Exchange 。
- 消息叢集(Clustering) 多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。
- 高可用(Highly Available Queues) 支援跨機器叢集,支援隊列安全鏡像備份,即隊列可以在叢集中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
- 多種協定(Multi-protocol) 支援多種消息隊列協定,比如 STOMP、MQTT 等。
- 多語言用戶端(Many Clients)
- 管理界面(Management UI) 提供一個易用的使用者界面,使得使用者可以監控和管理消息 Broker 的許多方面。
- 擴充性:支援負載均衡,動态增減伺服器簡單友善。
- 跟蹤機制(Tracing) 如果消息異常,RabbitMQ 提供消息跟蹤機制。
- 權限管理:靈活的使用者角色權限管理,Virtual Host 是權限控制的最小粒度。
- 插件機制(Plugin System) 提供許多插件,來從多方面進行擴充,也可以編寫自己的插件。
安裝
Windows安裝
需要先安裝erlang開發環境,下載下傳exe程式otp_win64_19.2.exe,一直next到死。
添加環境變量:
ERLANG_HOME=D:\Program Files\erlx.x.x
(erlang安裝路徑);
更新PATH:
%ERLANG_HOME%\bin;
驗證ErLang安裝情況,輸入指令:
erl
然後下載下傳Rabbit-server-3.6.12.exe,一路Next即可完成安裝,自帶安裝成service的腳本。
目錄介紹:
- ebin:一些調用的erlang程式
- etc:配置檔案
- include:依賴檔案
- sbin:一些bat腳本,用來運作、控制、管理rabbitmq
- rabbitmq-server.bat 以應用方式啟動rabbitmq;
- rabbitmq-service.bat 以服務方式啟動rabbitmq;
- rabbitmqctl.bat,rabbitmq管理工具;
- rabbitmq-plugins.bat 擴充插件管理工具
-
plugins:插件
執行指令:
rabbitmq-plugins enable rabbitmq_management
安裝管理插件。
安裝完成之後,登入管理頁面http://127.0.0.1:15672/,預設賬号和密碼:guest/guest;
檢視 RabbitMQ 的日志資訊。C:/Users/<user>/AppData/Roaming/RabbitMQ/log/[email protected]
端口:
- 4369 (epmd)
- 5672, 5671 (AMQP 0-9-1 and 1.0 without and with TLS)
- 25672,這個端口用于Erlang分布節點間和CLI工具溝通,是在動态範圍内配置設定的(預設情況下限制在一個單獨的端口,計算方法:AMQP端口+20000)。
- 15672 (rabbitmq web管理工具插件)
- 61613, 61614 (STOMP 插件)
- 1883, 8883 (if MQTT 插件)
Linux安裝
Mac 安裝
brew install rabbitmq
Max 系統的 brew 自動其所依賴的 Erlang。
常用指令
rabbitmq-server
運作
rabbitmq-server -detached
指令來重新開機服務并背景運作。
rabbitmqctl
rabbitmqctl
,提供幾乎 RabbitMQ 管理所需的一站式解決方案:
rabbitmqctl status
:查詢 RabbitMQ 伺服器的狀态資訊,指令輸出伺服器資訊:如 RabbitMQ 和 Erlang 的版本、OS 名稱、記憶體等;
rabbitmqctl reset
:重置 RabbitMQ 節點;
rabbitmqctl start_app
:啟動 RabbitMQ 應用程式;
rabbitmqctl list_queues
:檢視已聲明的隊列;
rabbitmqctl list_exchanges name type durable auto_delete
:檢視交換器,附加參數,比如列出交換器的名稱、類型、是否持久化、是否自動删除;
rabbitmqctl list_bindings
:檢視綁定;
背景知識
在Erlang 中有兩個概念:節點和應用程式。節點就是 Erlang 虛拟機的每個執行個體,而多個 Erlang 應用程式可以運作在同一個節點之上。節點之間可以進行本地通信(不管是不是運作在同一台伺服器之上)。比如一個運作在節點A上的應用程式可以調用節點B上應用程式的方法,就好像調用本地函數一樣。如果應用程式由于某些原因奔潰,Erlang 節點會自動嘗試重新開機應用程式。
rabbitmqctl -n [email protected] stop
:關閉整個 RabbitMQ 節點可以用參數 stop。預設情況下,不指定
-n
選項時,它會和本地節點通信并訓示其幹淨的關閉;指定
-n(node)
選項後,可以指定關閉遠端節點,node 預設名稱是 rabbit@server;
rabbitmqctl stopapp
:隻想關閉應用程式,同時保持 Erlang 節點運作,該指令将清除所有的隊列。
概念
ConnectionFactory連接配接管理器:使用者與MQ建立的連接配接管理器;
Channel信道:消息推送使用的通道;
Exchange交換機:用于接收、配置設定消息;
Queue隊列:用于存儲生産者的消息;
RoutingKey路由鍵:用于把生産者的資料配置設定(路由)到交換機上;
BingDingKey綁定建:用于把交換器的消息綁定到隊列中。
三種廣播模式
fanout:凡事綁定到此交換機和隊列都可以接受消息;
direct:通過路由鍵和交換機決定唯一的隊列可以接受消息;
topic:所有符合路由鍵所綁定的隊列都可以接受消息。
TTL
TTL是MQ中一個消息或者隊列的屬性,表明一條消息或者隊列中所有消息或者隊列的最大存活時間,機關是毫秒。如果一條消息設定了TTL屬性,或者進入了設定TTL的隊列,如果這條消息在TTL内的時間未被消費則該條消息則變成死信,如果配置了消息的TTL和隊列的TTL則較小的那個值會被使用。
設定TTL的2種方式:
- 建立隊列時設定隊列的
屬性:x-message-ttl
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
- 針對每條消息設定TTL:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
channel.basicPublish(exchangeName, routingKey, mandatory, builder.build(), "msg body".getBytes());
差別:
如果設定隊列的TTL,如果消息過期則被隊列丢棄;而第二種即使消息過期也不會馬上被丢棄, 因為消息是否過期是在即将投遞到消費者之前被判定的。此外,如果不設定TTL則表示消息永遠不會過期,消息過期則變成死信。
延遲隊列
用來存放需要在指定時間被處理的元素的隊列,延遲隊列中的元素都是帶時間屬性(TTL)的
釋出模式
- 單對單:單發送、單接收;
- 單對多:一個發送端,多個接收端,如分布式的任務派發;
- 釋出訂閱模式:
- 按路由規則發送接收:
- 主題:Exchange Type 為 topic,發送消息時需要指定交換機及 Routing Key,消費者的消息隊列綁定到該交換機并比對到 Routing Key 實作消息的訂閱,訂閱後則可接收消息。隻有消費者将隊列綁定到該交換機且指定的 Routing Key 符合比對規則,才能收到消息。Routing Key 可以設定成通配符,如:
(*表示比對 Routing Key 中的某個單詞,# 表示任意的 Routing Key 的消息都能被收到)。如果 Routing Key 由多個單詞組成,則單詞之間用 *或 #
來分隔。.
- RPC(即遠端存儲調用)
命名規範:
交換機名的命名建議:Ex{AppID}.{自定義 ExchangeName},隊列名的命名建議:MQ{AppID}.{自定義 QueueName} 。
用戶端
Java 用戶端
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
spring 內建 RabbitMQ
<beans>
<!-- 連接配接工廠,提供channel,單點模式下隻需配置rabbitmq server的host和port -->
<rabbit:connection-factory
id="connectionFactory"
addresses="${rabbitmq.addresses}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
publisher-confirms="true"
connection-timeout="1000"
cache-mode="CHANNEL"/>
<!-- 管理元件 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 存放消費失敗消息的queue -->
<rabbit:queue name="alert.queue.dead"/>
<!-- 同步消息發送元件,使用jsonMessageConverter -->
<bean id="rabbitTemplate" class="com.demo.RabbitTemplate">
<constructor-arg ref="connectionFactory"/>
<property name="messageConverter" ref="jsonConverter"/>
<property name="retryTemplate" ref="retryTemplate"/>
<property name="recoveryCallback" ref="recoveryCallback"/>
</bean>
<!-- 異步消息發送元件,使用jsonMessageConverter -->
<bean id="asyncRabbitTemplate" class="com.demo.RabbitTemplate" lazy-init="true">
<constructor-arg ref="connectionFactory"/>
<property name="messageConverter" ref="jsonConverter"/>
<property name="retryTemplate" ref="retryTemplate"/>
<property name="recoveryCallback" ref="recoveryCallback"/>
<property name="sync" value="false"/>
<property name="taskExecutor" ref="rabbitProduceExecutor"/>
</bean>
<!-- 消息所有重發失敗後的回調方法 -->
<bean id="recoveryCallback"
class="com.demo.RabbitRecoveryCallback"/>
<!-- 消費重試Advice -->
<bean id="retryAdvice" class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
<property name="retryOperations" ref="retryTemplate"/>
</bean>
<bean id="rejectAndDontRequeueRecoverer" class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer"/>
<!-- 使用json進行消息序列化和反序列化 -->
<bean id="jsonConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!-- 重試政策 -->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="50"/>
<property name="multiplier" value="10.0"/>
<property name="maxInterval" value="5000"/>
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="4"/>
</bean>
</property>
</bean>
<!--為了獲得盡可能好的性能,此處參數配置需要經過反複測試得到-->
<!-- 異步消息發送線程池 -->
<bean id="rabbitProduceExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
lazy-init="true">
<property name="corePoolSize" value="3"/>
<property name="maxPoolSize" value="50"/>
<property name="queueCapacity" value="1000"/>
<property name="keepAliveSeconds" value="300"/>
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/>
</property>
</bean>
</beans>