天天看點

rocketmq入門筆記

異步

原來的下單場景隻是使用者支付即可結束,現在需要發送成功短信,給使用者增加積分,訂閱物流資訊等等,這就使得使用者的 下單時間大大加長,這樣就可以使用消息隊列,把各個動作發到消息隊列,每個服務再去拉取消息隊列中的東西進行處理.大大減少時間

解耦:增加積分,發送短信這些可以單獨拆分出來,需要使用直接發送到知道的消息隊列就行,你隻需要關注你目前的業務

削峰: 如果使用線程池來解決,一個服務一個線程在高峰期你的mysql或者redis可能撐不住,使用mq就可以限制主機每次隻拉取多少條進行處理

可用性降低

引入了mq,一旦mq當機對業務有影響

複雜度提高

資料鍊路變得複雜,如何保證順序性,不重複消費

一緻性問題

使用者支付了,增加積分出錯該怎麼處理

nameserver 相當于注冊中心,連接配接從這裡取ip

broker 消息倉庫,裡面有topic與隊列

product,consumer生産者消費者

基本的環境<code>yum install java-1.8.0-openjdk-devel.x86_64 wget vim unzip -y</code>

下載下傳mq安裝包<code>wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip</code>

解壓縮<code>unzip rocketmq-all-4.7.1-bin-release.zip -d /usr/local/</code>

啟動nameserver服務

<code>vim bin/runserver.sh</code>

預設堆初始化最大都是4g,新生代2g,測試機沒這麼記憶體,不修改無法啟動,改為256m,新生代128m<code>JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"</code>

背景啟動<code>nohup bin/mqnamesrv &gt; n1.out &amp;</code>

啟動broker服務

<code>vim bin/runbroker.sh</code>

預設堆初始化最大都是8g,新生代4g,測試機沒這麼記憶體,不修改無法啟動,改為512m,新生代256m<code>JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"</code>

暴露namserver位址<code>echo 'export NAMESRV_ADDR=localhost:9876' &gt;&gt; ~/.bash_profile</code>

背景啟動<code>nohup bin/runbroker.sh &gt;n2.out &amp;</code>

日志驗證

n1.out <code>The Name Server boot success. serializeType=JSON</code>

n2.out <code>The broker[localhost.localdomain, 192.168.147.134:10911] boot success. serializeType=JSON and name server is localhost:9876</code>

發送接收測試

發送<code>bin/tools.sh org.apache.rocketmq.example.quickstart.Producer</code>

接收<code>bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer</code>

關閉

關閉nameserver服務<code>bin/mqshutdown namesrv</code>

關閉broker服務<code>bin/mqshutdown broker</code>

4種高可用叢集

多master模式

優點:配置簡單,性能最高

缺點:單個當機,這台機器上違背消費的消息不可訂閱

多master多salve 異步複制

優點:消息丢失少(異步複制),消息實時性不受到影響,master當機可以從slave上消費,性能與多master基本一緻

缺點:master當機下會丢失少量消息

多master多salve 同步雙寫

優點:master當機,消息無延遲,可用性高

缺點:性能有所丢失

dledger模式:4.5版本之前采用master-slave架構部署但是master挂掉都slave無法自動晉升為master,這種模式可以将多個master-slave組成一個組,當組内master挂了将選舉一個master繼續服務

修改vim conf/2m-2s-async/broker-a.properties配置檔案

将broker-a.properties寫入到broker-b-s.properties修改brokerName,brokerId,brokerRole和幾個檔案存儲路徑,同一台虛拟機注意端口号也需要修改

克隆目前虛拟機,修改broker-a-s.properties,broker-b.properties檔案

修改host檔案<code>vim /etc/hosts</code>

啟動兩台nameserver<code>nohup bin/mqnamesrv &gt;n1.out &amp;</code>

啟動broker,使用-c指定配置檔案<code>nohup bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &gt;nb.out &amp;</code>

關閉防火牆或者開放9876,兩個broker服務的端口<code>firewall-cmd --zone=public --add-port=9876/tcp --add-port=10911/tcp --add-port=11011/tcp --permanent``firewall-cmd --reload</code>

四個broker服務都啟動後驗證叢集<code>bin/mqadmin clusterList -n work1:9876</code>

這裡可能會報錯<code>Caused by: java.security.NoSuchAlgorithmException: Algorithm HmacSHA1 not available</code>

解決方法:出處:一篇文章徹底解決RocketMq的疑難雜症之:org.apache.rocketmq.client.exception.MQClientException: No route info of thi<code>cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/jre/lib/ext/sunjce_provider.jar /usr/local/rocketmq-all-4.7.1-bin-release/lib</code>具體以實際目錄為準

成功:兩主兩從

rocketmq入門筆記

項目位址rocketmq-dashboard

項目克隆<code>git clone https://github.com/apache/rocketmq-dashboard.git</code>

打開rocketmq-console導入idea,修改application.properties檔案<code>rocketmq.config.namesrvAddr=work1:9876;work2:9876</code>以實際情況修改

打包項目上傳jar包,啟動<code>nohup java -jar rocketmq-console-ng-2.0.0.jar &amp;</code>

打開浏覽器通路目前伺服器8080端口

rocketmq入門筆記

快速示範

在<code>bin/dledger/fast-try.sh</code>快速示範的腳本,但腳本給一個broker的記憶體是1g,虛拟機沒有這麼大修改一下

這裡我修改為256m

啟動<code>bin/dledger/fast-try.sh start</code>

檢視叢集情況<code>bin/mqadmin clusterList -n 127.0.0.1:9876</code>

rocketmq入門筆記

查詢master節點程序号并把它kill,看slave是否能轉為master

rocketmq入門筆記

實際搭建

配置檔案增加一下幾條

叢集搭建成功

rocketmq入門筆記

直接把135主機關機了

切換成功

rocketmq入門筆記

producer生産消息,consumer消費消息,broker存儲消息,每個broker對于一台伺服器,每個broker可以存儲多個opic消息,每個topic消息也可以分片存儲于不同的broker上,message queue用于存儲多個消息的實體位址,每個topic消息存儲于多個message queue中

producer負責生産消費,将消費者消息發送到broker上,有多種發送方式:同步發送,異步發送,順序發送,單向發送,同步與異步需要broker傳回确認消息,單向發送不需要。同一類producer組成一個集合為生産組發送同一類消息且邏輯一緻,如果有異常,broker伺服器會聯系同一生産者組送出或復原

consumer消息者形式分為兩種:

拉取式:主動式消費,消費者調用拉取的方法

推動式消費:broker有資料就會推給消費者

消費者組必須訂閱同一個topic,消息模式兩種:

叢集消費模式:平攤消費

廣播消費模式:共享消費

每個topic若幹個消息,每個消息隻能有一個主題,同一個topic下的資料分片儲存到不同的broker,每個分片機關是messageQueue

幾個子產品

rocketmq入門筆記

remoting module:處理來自clients的請求

client manager:負責管理用戶端和維護消費者的topic訂閱資訊

store service:處理消息的存儲查詢功能

ha service:高可用服務,負責master與slave的資料同步

index service:索引服務,以提高查詢

普通叢集

每個節點固定角色,master負責響應用戶端請求并存儲消息,slave負責同步資料并響應用戶端部分讀請求

dledger高可用叢集

dledger

接管broker的commitlog消息存儲

選舉leader節點

完成消息同步

多副本消息同步

leader收到消息會将消息标記為uncommited狀态,發給follower,follower收到消息後需要給leader傳回一個ack,如果有超過半數的follower傳回ack就會把消息改為commited狀态,發給follower

leader選舉機制

每個節點有三個狀态,leader,follower,candidate(候選人)

每個時間點叫做term

叢集啟動時,每個節點都是follower,叢集内部發送一個timeout信号,follower轉為候選人,發起投票後收到半數以上的投票晉升為leader,

選舉過程,叢集啟動,三個節點都是follower,三個節點都給自己投票,term都是1,三個節點随機休眠,a啟動term加一為2,第二個節點醒來,發現a的term比自己大,承認a是leader,c同理

充當路由消息的提供者,broker會在啟動時向nameserver注冊自己的服務資訊,後續通過心跳維護目前服務的可用性,生産者或消費者通過名字服務查找各主題消息相應的broker ip清單

每個消息都必須擁有一個topic,每個消息擁有唯一的message id,且可以攜帶業務辨別key, 可以為消息設定一個tag标簽

時間

mq收到消息标記為uncommit狀态發給follower,follower收到消息,發給leader一個ack,超過半數follower傳回ack,消息改為commit狀态,存儲,狀态同步給follower

mqpush消息給消費者,等待消費者ack響應,标記為已消費,沒有标記消息會重複推送

mq會定期删除一些過期的消息

存儲媒體:磁盤檔案(采用順序讀寫,保證存儲的速度,采用mmap的方式,省去上下文切換,提高速度)

commitlog:存儲消息中繼資料,每個檔案1個G

consumerQueue:消息隊列,儲存commitlog的索引

indexFile:提供通過key或時間來查詢消息的方法

同步刷盤:消息寫入機器的記憶體時,通知刷盤線程刷盤,等待刷盤線程寫入完成後喚醒線程,傳回寫入完成

優點:穩定安全

缺點:性能不如異步

異步刷盤:消息寫入記憶體後,傳回寫入完成,當記憶體累計到一定程度是統一觸發刷盤操作

優點:吞吐量大

缺點:一旦伺服器斷電丢失部分消息

同步複制:生産者發送消息,隻有master與slave(半數slave)寫入成功才回報生産者寫入成功

異步複制:生産者發送消息,隻要master寫入消息成功,就回報生産者寫入成功,再異步将消息同步到slave

生産者負載均衡:

生産者發送消息時,擷取目前topic下所有broker集合,采用取模遞增算法将消息往不同的broker上發送

消費者負載均衡

叢集模式:六種配置設定算法

AllocateMachineRoomNearby:同機房的消費者與broker配置設定一起

AllocateMessageQueueAveragely:平均配置設定,将所有消息隊列平均配置設定給消費者,先算數後配置設定

AllocateMessageQueueAveragelyByCircle:先輪流給消費者配置設定一個隊列,後面再增加

AllocateMessageQueueByConfig:直接指定所有隊列

AllocateMessageQueueByMachineRoom:按邏輯機房進行配置設定

AllocateMessageQueueConsistentHash:

廣播模式:每個消費者配置設定所有的隊列

廣播模式下不存在消息重試,會直接消費下一條

如何重試

消息監聽器中配置

傳回Action.ReconsumeLater

傳回null

抛出異常

不重試傳回Action.CommitMessage

重試處理

重試的消息會進入“%RETRY%”+ConsumeGroup隊列,最多16次,16次後會進入死信隊列,可配置例如20次,16次後酶促間隔2h

16次每次間隔10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h

messageId

老版本中,無論重試多少次messageId是相同的,4.7.1中每次重試messageId會重建

配置覆寫

最大重試次數對同一個消費組執行個體有效,最後啟動的消費者會覆寫之前的配置

一個死信隊列對于一個消費組,而不是一個消費者

一個消費組不需要死信隊列是不會建立死信隊列的

一個死信隊列包含這個消費組所有無法消費的消息,不區分主題

消息無法再被消費者正常消費

預設存儲3天,不管是否消費被删除

預設死信隊列中的消息無法讀取,需要将權限配置為6

當出現消費者對某條消息重複消費的情況時,重複消費的結果與消費一次的結果是相同的,并且多次消費并未對業務系統産生任何負面影響,那麼這整個過程就可實作消息幂等。支付時重複送出了多次但最後還是隻支付了一次的錢

三種實作語義

at most once:每條消息最多消費一次

at least once:每條消息至少消費一次

exactly one:确定消費一次

rocketmq支援at least once語義

消息重複情況

發送重複:消息發送到服務端并且持久化了,網絡斷開或者當機了,生産者判斷發送失敗了會造次發送

投遞重複:消費者收到消息并完成業務處理了,準備發送消息接收時當機了,服務端在恢複後會再次發送一遍這個消息

負載均衡時消息重複:broker服務重新開機,擴容,縮容會觸發rebalance造成消費者收到重複的消息

解決:

業務唯一辨別:例如訂單号

利用資料庫唯一索引或主鍵索引

利用redis判斷

dledger模式不支援批量發送/更新v4.8+

<code>consumer.setMessageModel(MessageModel.BROADCASTING);</code>

表達式過濾

consumer.subscribe("filter-topic", "TAG1 || TAG2");

sql過濾

需要配置<code>enablePropertyFilter=true</code>

<code>message1.putUserProperty("a", "1");</code>

<code>consumer.subscribe("filter-topic", MessageSelector.bySql("TAGS IN ('TAG1','TAG2') AND a between 0 and 1 "));</code>

基本文法<code>&gt;,&lt;,&gt;=,between,in,and,or,not</code>等

代碼

流程

發送消息到服務端,這個消息暫存在服務端,不會被消費者讀取到

持久化成功後會傳回生産者一個ack,确認消息是否成功

成功回調執行executeLocalTransaction方法,執行本地事務,持久化到資料庫類的操作,這塊的復原自行處理,最終傳回本地事務的執行結果

根據傳回結果進行操作,commit的話會将目前消息移動到實際的topic下,復原就删除消息

如果本地事務傳回unknown,服務端會定時調用checkLocalTransaction方法進行查詢,最多15次

根據checkLocalTransaction方法進行執行復原或者送出

開啟權限控制

配置檔案

使用RocketMQTemplate進行發送消息,相關屬性都以rocketmq_開頭

topic:tags

源碼位址 源碼位址 使用4.7.1版本源碼

在項目根目錄下建立conf檔案夾,複制distribution下broker.conf,logback_broker.xml,logback_nameserv.xml三個檔案到conf下

在本機添加環境變量ROCKETMQ_HOME指向項目根目錄

啟動nameser

修改conf目錄下的broker.conf 添加namesrvAddr,storePathRootDir,storePathRootDir,storePathCommitLog,storePathConsumeQueue,storePathIndex,storeCheckpoint,abortFile等參數具體可參考上方配置

啟動broker 配置啟動參數-c broker.conf檔案位址

配置資訊:建立nameseverconfig與nettyserverconfig

初始化,啟動,監聽9876端口,提供給用戶端拉取路由資訊

建立處理請求的線程與定時掃描的線程(10s掃描一次,判斷最後最後更新時間+2分鐘,超出會删除這個broker并關閉連接配接)

啟動了很多元件

注冊到nameserver,每30s(可以配置修改但最長為60s)發送一次心跳

DefaultMQProducerImpl:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl

判斷組名是否符合規定

啟動各種定時任務,緩存nameserver上所有的主題,與broker建立心跳

發送消息采用索引自增取模的方式進行

org.apache.rocketmq.store.DefaultMessageStore#putMessage

使用零拷貝追加到commitlog,同步或異步刷盤,主從同步

定時任務:每10s啟動啟動一次,

作者: JaminYe

出處:https://www.cnblogs.com/JaminYe/p/15559170.html/

版權聲明:本文原創文章,遵循 CC 4.0 BY-SA 版權協定,轉載請附上原文出處連結和本聲明。