異步
原來的下單場景隻是使用者支付即可結束,現在需要發送成功短信,給使用者增加積分,訂閱物流資訊等等,這就使得使用者的 下單時間大大加長,這樣就可以使用消息隊列,把各個動作發到消息隊列,每個服務再去拉取消息隊列中的東西進行處理.大大減少時間
解耦:增加積分,發送短信這些可以單獨拆分出來,需要使用直接發送到知道的消息隊列就行,你隻需要關注你目前的業務
削峰: 如果使用線程池來解決,一個服務一個線程在高峰期你的mysql或者redis可能撐不住,使用mq就可以限制主機每次隻拉取多少條進行處理
可用性降低
引入了mq,一旦mq當機對業務有影響
複雜度提高
資料鍊路變得複雜,如何保證順序性,不重複消費
一緻性問題
使用者支付了,增加積分出錯該怎麼處理
nameserver 相當于注冊中心,連接配接從這裡取ip
broker 消息倉庫,裡面有topic與隊列
product,consumer生産者消費者
基本的環境<code>yum install java-1.8.0-openjdk-devel.x86_64 wget vim unzip -y</code>
下載下傳mq安裝包<code>wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip</code>
解壓縮<code>unzip rocketmq-all-4.7.1-bin-release.zip -d /usr/local/</code>
啟動nameserver服務
<code>vim bin/runserver.sh</code>
預設堆初始化最大都是4g,新生代2g,測試機沒這麼記憶體,不修改無法啟動,改為256m,新生代128m<code>JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"</code>
背景啟動<code>nohup bin/mqnamesrv > n1.out &</code>
啟動broker服務
<code>vim bin/runbroker.sh</code>
預設堆初始化最大都是8g,新生代4g,測試機沒這麼記憶體,不修改無法啟動,改為512m,新生代256m<code>JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"</code>
暴露namserver位址<code>echo 'export NAMESRV_ADDR=localhost:9876' >> ~/.bash_profile</code>
背景啟動<code>nohup bin/runbroker.sh >n2.out &</code>
日志驗證
n1.out <code>The Name Server boot success. serializeType=JSON</code>
n2.out <code>The broker[localhost.localdomain, 192.168.147.134:10911] boot success. serializeType=JSON and name server is localhost:9876</code>
發送接收測試
發送<code>bin/tools.sh org.apache.rocketmq.example.quickstart.Producer</code>
接收<code>bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer</code>
關閉
關閉nameserver服務<code>bin/mqshutdown namesrv</code>
關閉broker服務<code>bin/mqshutdown broker</code>
4種高可用叢集
多master模式
優點:配置簡單,性能最高
缺點:單個當機,這台機器上違背消費的消息不可訂閱
多master多salve 異步複制
優點:消息丢失少(異步複制),消息實時性不受到影響,master當機可以從slave上消費,性能與多master基本一緻
缺點:master當機下會丢失少量消息
多master多salve 同步雙寫
優點:master當機,消息無延遲,可用性高
缺點:性能有所丢失
dledger模式:4.5版本之前采用master-slave架構部署但是master挂掉都slave無法自動晉升為master,這種模式可以将多個master-slave組成一個組,當組内master挂了将選舉一個master繼續服務
修改vim conf/2m-2s-async/broker-a.properties配置檔案
将broker-a.properties寫入到broker-b-s.properties修改brokerName,brokerId,brokerRole和幾個檔案存儲路徑,同一台虛拟機注意端口号也需要修改
克隆目前虛拟機,修改broker-a-s.properties,broker-b.properties檔案
修改host檔案<code>vim /etc/hosts</code>
啟動兩台nameserver<code>nohup bin/mqnamesrv >n1.out &</code>
啟動broker,使用-c指定配置檔案<code>nohup bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties >nb.out &</code>
關閉防火牆或者開放9876,兩個broker服務的端口<code>firewall-cmd --zone=public --add-port=9876/tcp --add-port=10911/tcp --add-port=11011/tcp --permanent``firewall-cmd --reload</code>
四個broker服務都啟動後驗證叢集<code>bin/mqadmin clusterList -n work1:9876</code>
這裡可能會報錯<code>Caused by: java.security.NoSuchAlgorithmException: Algorithm HmacSHA1 not available</code>
解決方法:出處:一篇文章徹底解決RocketMq的疑難雜症之:org.apache.rocketmq.client.exception.MQClientException: No route info of thi<code>cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64/jre/lib/ext/sunjce_provider.jar /usr/local/rocketmq-all-4.7.1-bin-release/lib</code>具體以實際目錄為準
成功:兩主兩從

項目位址rocketmq-dashboard
項目克隆<code>git clone https://github.com/apache/rocketmq-dashboard.git</code>
打開rocketmq-console導入idea,修改application.properties檔案<code>rocketmq.config.namesrvAddr=work1:9876;work2:9876</code>以實際情況修改
打包項目上傳jar包,啟動<code>nohup java -jar rocketmq-console-ng-2.0.0.jar &</code>
打開浏覽器通路目前伺服器8080端口
快速示範
在<code>bin/dledger/fast-try.sh</code>快速示範的腳本,但腳本給一個broker的記憶體是1g,虛拟機沒有這麼大修改一下
這裡我修改為256m
啟動<code>bin/dledger/fast-try.sh start</code>
檢視叢集情況<code>bin/mqadmin clusterList -n 127.0.0.1:9876</code>
查詢master節點程序号并把它kill,看slave是否能轉為master
實際搭建
配置檔案增加一下幾條
叢集搭建成功
直接把135主機關機了
切換成功
producer生産消息,consumer消費消息,broker存儲消息,每個broker對于一台伺服器,每個broker可以存儲多個opic消息,每個topic消息也可以分片存儲于不同的broker上,message queue用于存儲多個消息的實體位址,每個topic消息存儲于多個message queue中
producer負責生産消費,将消費者消息發送到broker上,有多種發送方式:同步發送,異步發送,順序發送,單向發送,同步與異步需要broker傳回确認消息,單向發送不需要。同一類producer組成一個集合為生産組發送同一類消息且邏輯一緻,如果有異常,broker伺服器會聯系同一生産者組送出或復原
consumer消息者形式分為兩種:
拉取式:主動式消費,消費者調用拉取的方法
推動式消費:broker有資料就會推給消費者
消費者組必須訂閱同一個topic,消息模式兩種:
叢集消費模式:平攤消費
廣播消費模式:共享消費
每個topic若幹個消息,每個消息隻能有一個主題,同一個topic下的資料分片儲存到不同的broker,每個分片機關是messageQueue
幾個子產品
remoting module:處理來自clients的請求
client manager:負責管理用戶端和維護消費者的topic訂閱資訊
store service:處理消息的存儲查詢功能
ha service:高可用服務,負責master與slave的資料同步
index service:索引服務,以提高查詢
普通叢集
每個節點固定角色,master負責響應用戶端請求并存儲消息,slave負責同步資料并響應用戶端部分讀請求
dledger高可用叢集
dledger
接管broker的commitlog消息存儲
選舉leader節點
完成消息同步
多副本消息同步
leader收到消息會将消息标記為uncommited狀态,發給follower,follower收到消息後需要給leader傳回一個ack,如果有超過半數的follower傳回ack就會把消息改為commited狀态,發給follower
leader選舉機制
每個節點有三個狀态,leader,follower,candidate(候選人)
每個時間點叫做term
叢集啟動時,每個節點都是follower,叢集内部發送一個timeout信号,follower轉為候選人,發起投票後收到半數以上的投票晉升為leader,
選舉過程,叢集啟動,三個節點都是follower,三個節點都給自己投票,term都是1,三個節點随機休眠,a啟動term加一為2,第二個節點醒來,發現a的term比自己大,承認a是leader,c同理
充當路由消息的提供者,broker會在啟動時向nameserver注冊自己的服務資訊,後續通過心跳維護目前服務的可用性,生産者或消費者通過名字服務查找各主題消息相應的broker ip清單
每個消息都必須擁有一個topic,每個消息擁有唯一的message id,且可以攜帶業務辨別key, 可以為消息設定一個tag标簽
時間
mq收到消息标記為uncommit狀态發給follower,follower收到消息,發給leader一個ack,超過半數follower傳回ack,消息改為commit狀态,存儲,狀态同步給follower
mqpush消息給消費者,等待消費者ack響應,标記為已消費,沒有标記消息會重複推送
mq會定期删除一些過期的消息
存儲媒體:磁盤檔案(采用順序讀寫,保證存儲的速度,采用mmap的方式,省去上下文切換,提高速度)
commitlog:存儲消息中繼資料,每個檔案1個G
consumerQueue:消息隊列,儲存commitlog的索引
indexFile:提供通過key或時間來查詢消息的方法
同步刷盤:消息寫入機器的記憶體時,通知刷盤線程刷盤,等待刷盤線程寫入完成後喚醒線程,傳回寫入完成
優點:穩定安全
缺點:性能不如異步
異步刷盤:消息寫入記憶體後,傳回寫入完成,當記憶體累計到一定程度是統一觸發刷盤操作
優點:吞吐量大
缺點:一旦伺服器斷電丢失部分消息
同步複制:生産者發送消息,隻有master與slave(半數slave)寫入成功才回報生産者寫入成功
異步複制:生産者發送消息,隻要master寫入消息成功,就回報生産者寫入成功,再異步将消息同步到slave
生産者負載均衡:
生産者發送消息時,擷取目前topic下所有broker集合,采用取模遞增算法将消息往不同的broker上發送
消費者負載均衡
叢集模式:六種配置設定算法
AllocateMachineRoomNearby:同機房的消費者與broker配置設定一起
AllocateMessageQueueAveragely:平均配置設定,将所有消息隊列平均配置設定給消費者,先算數後配置設定
AllocateMessageQueueAveragelyByCircle:先輪流給消費者配置設定一個隊列,後面再增加
AllocateMessageQueueByConfig:直接指定所有隊列
AllocateMessageQueueByMachineRoom:按邏輯機房進行配置設定
AllocateMessageQueueConsistentHash:
廣播模式:每個消費者配置設定所有的隊列
廣播模式下不存在消息重試,會直接消費下一條
如何重試
消息監聽器中配置
傳回Action.ReconsumeLater
傳回null
抛出異常
不重試傳回Action.CommitMessage
重試處理
重試的消息會進入“%RETRY%”+ConsumeGroup隊列,最多16次,16次後會進入死信隊列,可配置例如20次,16次後酶促間隔2h
16次每次間隔10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h
messageId
老版本中,無論重試多少次messageId是相同的,4.7.1中每次重試messageId會重建
配置覆寫
最大重試次數對同一個消費組執行個體有效,最後啟動的消費者會覆寫之前的配置
一個死信隊列對于一個消費組,而不是一個消費者
一個消費組不需要死信隊列是不會建立死信隊列的
一個死信隊列包含這個消費組所有無法消費的消息,不區分主題
消息無法再被消費者正常消費
預設存儲3天,不管是否消費被删除
預設死信隊列中的消息無法讀取,需要将權限配置為6
當出現消費者對某條消息重複消費的情況時,重複消費的結果與消費一次的結果是相同的,并且多次消費并未對業務系統産生任何負面影響,那麼這整個過程就可實作消息幂等。支付時重複送出了多次但最後還是隻支付了一次的錢
三種實作語義
at most once:每條消息最多消費一次
at least once:每條消息至少消費一次
exactly one:确定消費一次
rocketmq支援at least once語義
消息重複情況
發送重複:消息發送到服務端并且持久化了,網絡斷開或者當機了,生産者判斷發送失敗了會造次發送
投遞重複:消費者收到消息并完成業務處理了,準備發送消息接收時當機了,服務端在恢複後會再次發送一遍這個消息
負載均衡時消息重複:broker服務重新開機,擴容,縮容會觸發rebalance造成消費者收到重複的消息
解決:
業務唯一辨別:例如訂單号
利用資料庫唯一索引或主鍵索引
利用redis判斷
dledger模式不支援批量發送/更新v4.8+
<code>consumer.setMessageModel(MessageModel.BROADCASTING);</code>
表達式過濾
consumer.subscribe("filter-topic", "TAG1 || TAG2");
sql過濾
需要配置<code>enablePropertyFilter=true</code>
<code>message1.putUserProperty("a", "1");</code>
<code>consumer.subscribe("filter-topic", MessageSelector.bySql("TAGS IN ('TAG1','TAG2') AND a between 0 and 1 "));</code>
基本文法<code>>,<,>=,between,in,and,or,not</code>等
代碼
流程
發送消息到服務端,這個消息暫存在服務端,不會被消費者讀取到
持久化成功後會傳回生産者一個ack,确認消息是否成功
成功回調執行executeLocalTransaction方法,執行本地事務,持久化到資料庫類的操作,這塊的復原自行處理,最終傳回本地事務的執行結果
根據傳回結果進行操作,commit的話會将目前消息移動到實際的topic下,復原就删除消息
如果本地事務傳回unknown,服務端會定時調用checkLocalTransaction方法進行查詢,最多15次
根據checkLocalTransaction方法進行執行復原或者送出
開啟權限控制
配置檔案
使用RocketMQTemplate進行發送消息,相關屬性都以rocketmq_開頭
topic:tags
源碼位址 源碼位址 使用4.7.1版本源碼
在項目根目錄下建立conf檔案夾,複制distribution下broker.conf,logback_broker.xml,logback_nameserv.xml三個檔案到conf下
在本機添加環境變量ROCKETMQ_HOME指向項目根目錄
啟動nameser
修改conf目錄下的broker.conf 添加namesrvAddr,storePathRootDir,storePathRootDir,storePathCommitLog,storePathConsumeQueue,storePathIndex,storeCheckpoint,abortFile等參數具體可參考上方配置
啟動broker 配置啟動參數-c broker.conf檔案位址
配置資訊:建立nameseverconfig與nettyserverconfig
初始化,啟動,監聽9876端口,提供給用戶端拉取路由資訊
建立處理請求的線程與定時掃描的線程(10s掃描一次,判斷最後最後更新時間+2分鐘,超出會删除這個broker并關閉連接配接)
啟動了很多元件
注冊到nameserver,每30s(可以配置修改但最長為60s)發送一次心跳
DefaultMQProducerImpl:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
判斷組名是否符合規定
啟動各種定時任務,緩存nameserver上所有的主題,與broker建立心跳
發送消息采用索引自增取模的方式進行
org.apache.rocketmq.store.DefaultMessageStore#putMessage
使用零拷貝追加到commitlog,同步或異步刷盤,主從同步
定時任務:每10s啟動啟動一次,
作者: JaminYe
出處:https://www.cnblogs.com/JaminYe/p/15559170.html/
版權聲明:本文原創文章,遵循 CC 4.0 BY-SA 版權協定,轉載請附上原文出處連結和本聲明。