天天看點

消息隊列 RocketMQ

1 什麼是消息隊列(MQ)

消息隊列是一種應用程式對應用程式的通信方法,是分布式系統的重要元件,可以解決一些應用場景的高并發問題,當不需要立即獲得結果,但是并發量又需要進行控制的時候,就需要使用MQ來處理。

2 消息隊列的應用場景

多應用對消息隊列中同一消息進行處理,應用間并發處理消息,相比串行處理,減少處理時間

比如直播平台的送禮操作,一個操作會跟随産生許多其他操作,也就是一個操作的後續業務鍊很長。

消息隊列 RocketMQ

此時為了避免使用者送一次禮就要響應很久,影響送禮心情,就可以隻做主流業務——扣錢,而其他後續業務鍊都儲存在消息隊列裡,逐漸消化,隻保證最終一緻性;而前端實時顯示特效來響應使用者的操作,比如使用者送禮連擊就會産生連擊特效,以此增強給使用者的回報。

假設整個業務鍊需要10秒完成,但是扣錢隻需要200ms,此時通過使用消息隊列,就提升了整體業務的響應效率,是以消息隊列能用來處理這種需要異步處理的場景

具體哪個業務需要提取出來即時處理,哪些功能存到消息隊列裡異步處理,就得看産品經理的需求文檔以及技術主管與之互撕的結果,這裡面的小心思就不屬于後端技術領域了。

比如突然有10W個請求,但是後端隻能同時處理1W個請求,此時就可以把9W個請求放在消息隊列裡,後端逐漸消化這些請求。

也是以,MQ廣泛應用于秒殺或搶購活動中,避免流量過大導緻應用系統挂掉的情況。

多個應用要對同一消息進行處理,比如使用者送禮獲得經驗,送禮了完成任務也獲得經驗,這些操作都要調用處理使用者經驗的接口,此時就可以通過設立請求鍊,也就是使用消息隊列來處理,避免調用接口失敗導緻整個過程失敗。

如果某一操作調用接口失敗了,就把這個操作放回隊列裡重新發送請求,要是一直失敗,就需要人工介入,做人工資訊補償。

一個系統的各個子產品可分為消息隊列、消息生産者和消息消費者,生産者負責産生消息,生産的消息儲存在消息隊列中,消費者(可能有多個)負責對消息隊列中的消息進行處理

例如一個項目,有訂單服務、使用者服務、商品服務,每目前端發送請求到訂單服務,訂單服務收到後立馬傳回響應,并把後續任務放到一個地方,供其他子產品使用,這裡的訂單服務就是生産者,而使用者管理、商品管理去消息隊列中擷取資訊并處理,它們就是消費者。

這種系統中,供生産者儲存資訊的地方就是消息隊列

消息隊列 RocketMQ

3 常見消息隊列

目前使用較多的消息隊列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等,而部分資料庫如Redis、Mysql也可實作消息隊列的功能。

Kafka是LinkedIn開源的分布式釋出-訂閱消息系統,目前歸屬于Apache頂級項目。Kafka主要特點是基于Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用于日志收集和傳輸。0.8版本開始支援複制,不支援事務,對消息的重複、丢失、錯誤沒有嚴格要求,适合産生大量資料的網際網路服務的資料收集業務。

kafka之是以對消息的重複、丢失、錯誤有容忍度,是因為kafka應用于資料吞吐量高的場景,追求極緻的速度。

所有的軟體都不是完美的,要在某友善做到極緻,就要有所損失,追求極緻的速度,就要在功能上讓步,追求功能的完善,就要在速度上讓步。

RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基于AMQP協定來實作。AMQP的主要特征是面向消息、隊列、路由(包括點對點和釋出/訂閱)、可靠性、安全。AMQP協定更多用在企業系統内,對資料一緻性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。

ActiveMQ是由Apache出品,ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實作。它非常快速,支援多種語言的用戶端和協定,而且可以非常容易的嵌入到企業的應用環境中,并有許多進階功能。

RocketMQ是阿裡開源的消息中間件,它是純Java開發,具有高吞吐量、高可用性、适合大規模分布式系統應用的特點。RocketMQ思路起源于Kafka,但并不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優化,目前在阿裡集團被廣泛應用于交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景。

使用RocketMQ的一個好處是,可以很輕松的把項目移到阿裡雲上。

4 消息隊列的基本概念

rocketmq/concept.md at master · apache/rocketmq · GitHub

下面以RocketMQ官方GitHub的文檔說明為例,說明消息隊列的基本概念

RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生産消息,Consumer 負責消費消息,Broker 負責存儲消息。Broker 在實際部署過程中對應一台伺服器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于不同的 Broker。Message Queue 用于存儲消息的實體位址,每個Topic中的消息位址存儲于多個 Message Queue 中。ConsumerGroup 由多個Consumer 執行個體構成。

broker就是我們需要安裝和運作的東西

topic是一類消息的集合,比如贈送禮物、購買商品,是以topic相當于業務分類

“ConsumerGroup 由多個Consumer 執行個體構成”就是指可以有多個消費者,且消費者之間可以分組

負責生産消息,一般由業務系統負責生産消息。一個消息生産者會把業務應用系統裡産生的消息發送到broker伺服器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker傳回确認資訊,單向發送不需要。

生産者負責釋出消息,比如訂單服務負責釋出訂單,而釋出的消息會被broker伺服器接收,發送的方式很多,如果需要broker傳回确認資訊,那麼broker要告訴消息生産者它是否接收到資料。

這些發送方式沒有絕對的好和壞,都有各自的優點,比如單向發送不需要等待,是以快

負責消費消息,一般是背景系統負責異步消費。一個消息消費者會從Broker伺服器拉取消息、并将其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。

消費者負責訂閱消息以及消費消息,兩種消費形式就是主動和被動的差別。

表示一類消息的集合,每個主題包含若幹條消息,每條消息隻能屬于一個主題,是RocketMQ進行消息訂閱的基本機關。

我發送一條消息,這條消息肯定得有自己的主題,比如贈送禮物操作,相關消息就應該訂閱贈送禮物的topic,即每一條消息都要挂在一個topic下,用topic描述一條消息是幹嘛的

消息中轉角色,負責存儲消息、轉發消息。代理伺服器在RocketMQ系統中負責接收從生産者發送來的消息并存儲、同時為消費者的拉取請求作準備。代理伺服器也存儲消息相關的中繼資料,包括消費者組、消費進度偏移和主題和隊列消息等。

broker會記錄生産者存放在這裡的消息,有沒有被消費者拿去消費

名稱服務充當路由消息的提供者。生産者或消費者能夠通過名字服務查找各主題相應的Broker IP清單。多個Namesrv執行個體組成叢集,但互相獨立,沒有資訊交換。

注意點:

“Broker IP清單”,說明一個項目中可以設定多個broker

名字服務相當于Nginx,各個broker就相當于tomcat,由于broker要挂名字服務下,是以名字服務要先啟動

但是叢集之間不進行資料互通,不存在master、slaver的概念,就是普通的叢集

消息隊列 RocketMQ
Consumer消費的一種類型,應用通常主動調用Consumer的拉消息方法從Broker伺服器拉消息、主動權由應用控制。一旦擷取了批量消息,應用就會啟動消費過程。

就是消費者主動調用,通常需要做定時任務,每隔一段時間去查詢一次broker

Consumer消費的一種類型,該模式下Broker收到資料後會主動推送給消費端,該消費模式一般實時性較高。

就是broker主動推送給各個消費者,這種方式實時性自然最高

同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一緻。如果發送的是事務消息且原始生産者在發送之後崩潰,則Broker伺服器會聯系同一生産者組的其他生産者執行個體以送出或回溯消費。

就是說幾個生産者構成一個集合,要是某個生産者挂了,其他生産者頂上

消息隊列 RocketMQ
同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一緻。消費者組使得在消息消費方面,實作負載均衡和容錯的目标變得非常容易。要注意的是,消費者組的消費者執行個體必須訂閱完全相同的Topic。RocketMQ 支援兩種消息模式:叢集消費(Clustering)和廣播消費(Broadcasting)。

就是說幾個消費者構成一個集合,要是某個消費者挂了,其他消費者頂上

消息隊列 RocketMQ
叢集消費模式下,相同Consumer Group的每個Consumer執行個體平均分攤消息。
廣播消費模式下,相同Consumer Group的每個Consumer執行個體都接收全量的消息。
普通順序消費模式下,消費者通過同一個消息隊列( Topic 分區,稱作 Message Queue) 收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。
嚴格順序消息模式下,消費者收到的所有消息均是有順序的。
消息系統所傳輸資訊的實體載體,生産和消費資料的最小機關,每條消息必須屬于一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務辨別的Key。系統提供了通過Message ID和Key查詢消息的功能。

隻需要知道每條消息有唯一的Message ID即可

為消息設定的标志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設定不同标簽。标簽能夠有效地保持代碼的清晰度和連貫性,并優化RocketMQ提供的查詢系統。消費者可以根據Tag實作對不同子主題的不同消費邏輯,實作更好的擴充性。

就是在同一個topic下挂載的消息,繼續細分消息所屬業務的東西,标簽可有可無,僅僅是topic的一個擴充

消息隊列 RocketMQ

5 在linux中安裝RocketMQ

RocketMQ屬于java應用,需要jdk環境

如圖,準備好maven、rocketmq的源碼、rocketmq的web控制台

由于安裝rocketMQ需要編譯源碼,是以需要maven

消息隊列 RocketMQ

将壓縮包放在/opt目錄下,解壓縮包

移動檔案夾到/usr/local目錄下

修改linux系統的環境變量,目的是為了能像win系統那樣,在各處都能使用maven的指令

消息隊列 RocketMQ

來到環境變量的檔案中,在檔案末尾追加如下代碼,export就是linux的環境變量指令

消息隊列 RocketMQ

接着重新重新整理配置檔案

然後進入maven的配置檔案setting.xml

需要在配置檔案中配置鏡像源,如果在公司,公司有自己的遠端倉庫,通常設為自己公司的,如果不在公司或者公司沒有相關伺服器,就設為阿裡雲的鏡像源

将RocketMQ解壓

移動解壓出來的檔案夾到/user/local目錄下

進入rocketMQ的目錄

在rocketMQ目錄下編譯RocketMQ,執行指令後,maven會幫助我們把所有的項目都編譯,并且把依賴都準備好

6 啟動RocketMQ

RocketMQ的預設設定中需要占用虛拟機8G的記憶體,對于生成環境下隻是進行測試的虛拟機來說,有點過大,是以需要手動修改bin目錄下的兩個檔案runbroker.sh和runserver.sh

首先進入編譯後的路徑下

然後使用vim編輯器打開runbroker.sh

預設設定:

消息隊列 RocketMQ

修改後:

不過如圖的設定有點小,可能導緻運作時記憶體溢出,是以要結合個人硬體設施去修改

消息隊列 RocketMQ

接着使用vim編輯器打開runserver.sh

消息隊列 RocketMQ
消息隊列 RocketMQ

首先需要啟動Name Server

使用了nohup指令,簡單來說就是會把日志列印到目前目錄的nohup.out檔案中

消息隊列 RocketMQ

運作後的提示

消息隊列 RocketMQ

此時可以通過下面代碼檢視name server的日志

使用下面代碼就能啟動broker

消息隊列 RocketMQ

可以通過下面的代碼檢視broker的日志

從 Windows 上的開發環境連接配接到虛拟機中的 nameServer 時要經過 Linux 系統的防火牆,而防火牆一般都會有逾時的機制,在網絡連接配接長時間不傳輸資料時,會關閉這個 TCP 的會話,關閉後再讀寫,就有可能導緻異常(RemotingTooMuchRequestException: sendDefaultImpl call timeout)

對于配置差的電腦來說,linux的防火牆也是一個不小的負荷,更容易造成這種逾時情況,是以生産環境下隻是為了測試的話,可以把防火牆關閉

7 測試RocketMQ

安裝RocketMQ時跳過了測試案例,但不代表不需要測試,現在要手動測試RocketMQ

首先将localhost:9876臨時加入環境變量 便于下面的兩行代碼使用這個ip和端口号

然後測試發送消息

可以看出發送的消息是很簡單的單向消息,每一條消息都有自己的消息ID

消息隊列 RocketMQ

接着測試接收資訊

接收到的消息有一個body值,這就是正文,是位元組數組

消息隊列 RocketMQ

轉為字元串,可以看到就是類似hello world的資訊

消息隊列 RocketMQ

8 RocketMQ 監控平台

準備好壓縮包

消息隊列 RocketMQ

首先要解壓檔案

然後移動檔案夾到/user/local下

進入檔案夾,可以看到有許多版本,我們需要進入的是console

消息隊列 RocketMQ

由于後端用springboot項目,是以要修改配置檔案,首先進入檔案夾

消息隊列 RocketMQ

然後使用vim編譯器修改配置檔案

預設application.properties中設定為空,需要修改為目前name server所在端口和ip

消息隊列 RocketMQ

現在可以執行編譯安裝了,編譯完成後可以生成一個jar檔案

執行啟動指令

通過通路虛拟機ip+8085端口,就能進入監控平台

消息隊列 RocketMQ

通過cluster可以看到叢集情況,以及消息的生産和消費情況

消息隊列 RocketMQ

通過topic可以看到目前有多少topic

消息隊列 RocketMQ

通過message可以檢視目前有哪些資訊,可以通過topic進行搜尋

消息隊列 RocketMQ

9 簡單使用MQ

消息隊列 RocketMQ

根據官網文檔,編寫一個簡單執行個體

消息隊列 RocketMQ

常量類

執行個體代碼

注意點

“new DefaultMQProducer( )”時,傳入構造函數的是生産者組名字,這個名字可以随意起名,隻需要保證唯一,可以以系統命名或者組命名,像這種固定值通過寫在常量類中

“new Message( )”時,需要傳入Message的構造函數的是topic和body,其中body是一個位元組數組,是以需要将字元串改為數組,并指定編碼格式

消息隊列 RocketMQ

if判斷時要注意用已知值去比較未知值,避免空指針異常

下面提到的幾種問題的解決方案,已經有很成熟的代碼了,如果以後開發的項目需要使用消息隊列,直接照着網上找到的或者公司給出的代碼用就行了,下面隻是給出常見的解決方案的原理,能看懂就行了,不需要能自己寫出來——反之,真的能自己寫出來,那對于剛畢業的學生來說就很厲害了,随便拿個12~13K不是問題

沒有哪個中間件敢說自己100%沒問題,即便是阿裡雲提供的收費産品也不敢說

消息隊列 RocketMQ

一個産品做的好壞,有時候并不與産品有關,還和産品的戰鬥體系數值、貨币體系有關,要是崩壞了這個産品就沒了,是以一個産品不能出錯,是以必須具備處理可能發生的錯誤的途徑

雖然MQ說消息不會重複,但是我們要自己确定消息不會被重複消費——把消息存在一個地方,每次消費後删除,每次消費前先查找是否有這個消息,如果沒有就說明消息已經被消費,這就避免了重複消費

類似解決重複消費的問題,再建立一個表,把建立的資訊ID記錄下來,每次發送前先去這個表根據資訊id查一下待發送資訊是否已經發送,這就解決了重複發送的問題

上述簡單執行個體中隻有發送成功的操作,實際開發中如果發送失敗,需要寫一個循環重新發送,如果在規定的循環次數内都發送失敗,就要記錄這個資訊到資料庫或者redis中,并通知客服或者是技術人員,等人工檢查并重新開機服務後,手動發送資訊,確定資訊一定能送達——即人工介入,人工資訊補償

要記錄的資訊通常是該業務資訊的唯一進制素,比如訂單id

将發送失敗的資訊記錄到資料庫或reids後,還需要配套一個背景管理系統給客服以及技術人員使用

通過設定定時任務,實作定時掃描表中記錄的失敗資料并通知客服和技術人員,這就是自動糾錯手段

除了自動手段,通常還需要配置人工回報通道,也就是客服,當使用者找到客服時,客服通過這個背景管理系統查業務資訊,比如訂單id,發現确實存在業務資訊發送失敗的情況後,就會通知技術人員,等技術人員解決完畢,客服再手動點選背景管理系統的功能按鈕,重新建立訂單,解決問題。

如果隻配置了定時任務進行定時掃描和通知,那麼可以把資料放在redis表中,但如果還配備了人工管道,那麼還是把資料放在mysql中吧,處理完畢就把訂單狀态改掉

下面是為了解決上述三個問題的修正代碼

首先準備一個常量類,記錄要使用的reids的庫名,這個庫用于儲存生産的消息id

然後修改簡單執行個體的代碼

上面代碼中還差一點功能沒寫,有點麻煩,下面直接說思路

現在已經把生産的消息儲存到redis指定的庫中,以後要消費消息前,先來到這個redis庫中直接執行删除鍵值對的操作,如果删除成功則繼續執行後面的消息消費操作,如果删除失敗則說明消息已經被消費,這就避免了重複消費的問題

如果消息消費的過程中失敗復原,那就把這個消息的id重新存回redis中

消息隊列 RocketMQ

這種消息模式會擷取消息的回執,確定消息送達,具體代碼見上面的簡單執行個體,發送的方法如下

不等待傳回結果,不需要確定是否發送成功

常用于儲存不重要的資料,比如收集使用者經常點選的功能,一段時間後統計一下哪些是高頻功能哪些是很少用的功能,不好用的功能就删除或者折疊

發送的方法如下

具體代碼如下

異步發送消息的方法通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待broker的響應。

異步發送資訊的執行速度極快,上述代碼如果去掉CountDownLatch類,那麼運作結果如下圖,兩條記錄之間隻間隔了2毫秒,這說明for循環之間的10條消息被異步執行了,是以兩次logger輸出的時間間隔很近,但是一旦代碼執行完畢,主線程就會被釋放,那麼就會導緻由主線程建立的子線程——for循環也會被釋放,這就會導緻10條消息沒有被發送完就被中斷了

消息隊列 RocketMQ

為了解決主線程被提前釋放的問題,就要主線程執行等待操作,但是并不能人為寫死主線程的等待時間,是以就要使用CountDownLatch類,這個類允許一個或多個線程等待直到在其他線程中執行的一組操作完成,這個類的構造函數需要傳入一個值作為計數器依據,這個值要與消息數相同,這裡的for循環10次,那麼計數器的值就是10

“countDownLatch.await(5,TimeUnit.SECONDS)”代表所有資訊發送完畢後,再等待5秒鐘,然後再執行這條代碼之後的代碼

消息隊列 RocketMQ

消費者要去消息隊列中擷取資料,整個過程包括執行個體化消費者對象、設定nameserver位址、訂閱topic和tag、建立監聽、設定單次推送的資訊數量上限以及各種操作失敗時的復原代碼、防止重複消費的代碼

具體代碼如下,部分代碼簡單的用文字說明(反正實際開發也不寫這種底層代碼)

訂閱topic和tag時,使用 * 比對所有的tag,使用 tagA || tagB || tagC 監聽多個指定的tag消息,通過調用多個.subscribe()方法來訂閱多個topic

訂閱消息後,使用registerMessageListener方法建立監聽,并實作MessageListenerConcurrently接口,重寫consumeMessage方法

consumeMessage方法的傳回值是一個枚舉類,其實就是根據操作結果告訴MQ,是“操作成功”還是“稍後重試”

消息隊列 RocketMQ

從list數組中取值時,即便消息内容是對象也可以轉為byte數組,隻要對象的類實作了序列化接口

如果消費消息的過程中報錯,需要将消息id重新放回redis存放生成的消息的庫中,因為redis沒辦法跟着spring事務復原,隻能手動復原

使用setConsumeMessageBatchMaxSize方法可以設定MQ每次給這個消費者對象發送的資料的上限條數

繼續閱讀