下載下傳代碼:git clone https://github.com/tgou/RocketMQ 1.
打包:mvn -Dmaven.test.skip=trueclean package install assembly:assembly-U 2.
該配置首先從java屬性rocketmq.home.dir中擷取,如果沒有擷取到從系統環境變量ROCKETMQ_HOME中擷取 1)
雖然該配置是可選,但是前提是已經配置了rocketmq.home.dir或者ROCKETMQ_HOME 2)
rocketmqHome=/home/admin/XX 可選) i.
該配置首先從java屬性rocketmq.namesrv.addr中擷取,如果沒有擷取到從系統環境變量NAMESRV_ADDR中擷取 1)
該配置也可以從啟動broker的參數中配置:-n "127.0.0.1:9876" 2)
雖然該配置是可選,但是前提是已配置上述三種方法之一 3)
namesrvAddr=127.0.0.1:9876;192.168.0.3:9876(可選) ii.
該配置預設從系統可用位址中選擇一個 1)
在某些場景可以手動配置ip,例如程式運作在虛拟機中外部無法通路預設位址,或者用來解決docker本地ip外部無法通路的問題 2)
brokerIP1=127.0.0.1(可選) iii.
broker的ha位址,其它用途同上,不過一般不需要配置 1)
brokerIP2=127.0.0.1(可選) iv.
該選項雖然有預設值,但是線上環境最好填寫,要不然叢集注冊會有問題 1)
屬于同一個主備配置的broker的brokerName要一樣 2)
該選項預設取目前機器位址,如果取不到預設是"DEFAULT_BROKER" 3)
brokerName=broker_aaa_bb(必填) v.
該選項預設是"DEFAULT_CLUSTER" 1)
當需要通過同一個namesrv管理多個叢集的時候,不同叢集配置不同的值 2)
brokerClusterName=DEFAULT_CLUSTER(可選) vi.
該選項預設是0,代表主 1)
當配置主備的時候,備庫需要遞增,例如1,2等 2)
brokerId=0(必填) vii.
預設8,表示預設為每個topic建立的queue的數量 1)
defaultTopicQueueNums=8(可選) viii.
當topic不存在的時候自動建立topic,預設為true 1)
線上最好關閉,有利于管理topic 2)
autoCreateTopicEnable=true ix.
當訂閱組不存在的時候,自動建立,預設為true 1)
線上最好關閉,便于管理消息訂閱組 2)
autoCreateSubscriptionGroup=true x.
是否要拒絕事務消息,預設為false 1)
當broker不希望支援事務消息的時候,可以設定為true 2)
rejectTransactionMessage=false xi.
是否通過域名系統獲得namesrv的位址,預設為false 1)
當為了提高namesrv位址的靈活性,可以設定為true,當打開該選項的時候,上邊namesrvAddr配置中所述的都可以不配置 2)
fetchNamesrvAddrByAddressServer=false xii.
broker配置 a.
預設位址為java屬性的user.home + "store",也就是存儲在目前使用者目錄的store目錄下 1)
storePathRootDir=/home/admin/store(可選) i.
預設位址為目前使用者目錄user.home + "store" + "commitlog" 1)
storePathCommitLog=/home/admin/store/commitlog ii.
commitlog刷盤的間隔,預設為1000毫秒,即1秒 1)
flushIntervalCommitLog=1000(可選) iii.
是否是定時刷盤,預設為false,也就是實時刷盤,實時刷盤是指有資料寫入就會觸發刷盤邏輯,如果滿足刷頁條件就刷盤 1)
flushCommitLogTimed=false(可選) iv.
何時觸發删除檔案,預設是淩晨4點删除檔案 1)
該時間是伺服器時間,配置伺服器壓力最低的時間就可以 2)
deleteWhen="04"(可選) v.
檔案保留時間,機關小時 1)
fileReservedTime=72(可選) vi.
預設值是1024 * 512 = 524288,也就是512k 1)
maxMessageSize=524288(可選) vii.
最大被拉取的消息位元組數,消息在記憶體,預設256k 1)
maxTransferBytesOnMessageInMemory=(可選) viii.
最大被拉取的消息個數,消息在記憶體,預設32個 1)
maxTransferCountOnMessageInMemory=32(可選) ix.
最大被拉取的消息位元組數,消息在磁盤,預設64k 1)
maxTransferBytesOnMessageInDisk=(可選) x.
最大被拉取的消息個數,消息在磁盤,預設8個 1)
maxTransferCountOnMessageInDisk=8(可選) xi.
命中消息在記憶體的最大比例 1)
accessMessageInMemoryMaxRatio=40(可選) xii.
是否開啟消息索引功能 1)
messageIndexEnable=true(可選) xiii.
是否使用安全的消息索引功能,即可靠模式。可靠模式下,異常當機恢複慢,非可靠模式下,異常當機恢複快 1)
messageIndexSafe=false xiv.
store存儲配置 b.
普通配置 - 該配置都寫在properties檔案中 3.
打包與配置
分區 rocketmq 的第 1 頁
是否使用安全的消息索引功能,即可靠模式。可靠模式下,異常當機恢複慢,非可靠模式下,異常當機恢複快 1)
如果不設定,則從NameServer擷取Master HA服務位址 1)
haMasterAddress=127.0.0.1(可選) xv.
預設是ASYNC_MASTER,異步複制master 1)
還有SYNC_MASTER - 同步雙寫master;SLAVE - slave伺服器 2)
brokerRole=ASYNC_MASTER(可選) xvi.
預設是ASYNC_FLUSH,異步刷盤 1)
還有SYNC_FLUSH,同步刷盤 2)
flushDiskType=ASYNC_FLUSH(可選) xvii.
磁盤空間超過90%警戒水位,自動開始删除檔案 1)
cleanFileForciblyEnable=true xviii.
分區 rocketmq 的第 2 頁
export ROCKETMQ_HOME=/home/rocketmq a.
将bin目錄下的檔案設定為可執行,chmod +x mq* b.
設定環境變量 1.
配置jvm參數,在runserver.sh中 a.
運作 nohup mqnamesrv& b.
線上日志要配置監控 c.
啟動namesrv 2.
配置jvm參數,在runbroker.sh中 a.
調整配置檔案,根據需求是多m還是ms結構,其中nameSrv的位址可以配置在配置檔案
中,也可以寫到運作指令中
b.
運作 nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-noslave/broker-a.properties& c.
事務啟動運作nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-noslave/broker-
a.properties -tc ../conf/rocketmq.yaml &
d.
CREATE TABLE `t_transaction` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`broker_name` varchar(50) NOT NULL,
`offset` bigint(20) unsigned NOT NULL,
`producer_group` varchar(50) NOT NULL,
`gmt_create` datetime NOT NULL,
PRIMARY KEY (`id`),
KEY `bpo_idx` (`broker_name`,`producer_group`,`offset`),
KEY `bpg_idx` (`broker_name`,`producer_group`,`gmt_create`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
也需要修改rocketmq.yaml配置中的brokerName,跟對應的broker一樣
其中事務消息需要去資料庫建表 e.
啟動broker 3.
安裝
2015年10月22日 15:12
分區 rocketmq 的第 3 頁
sh ${ROCKETMQ_HOME}/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Producer a.
sh ${ROCKETMQ_HOME}/bin/tools.sh
com.alibaba.rocketmq.example.transaction.TransactionProducer
b.
sh ${ROCKETMQ_HOME}/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Consumer c.
運作例子 1.
測試
2015年10月22日 16:57
分區 rocketmq 的第 4 頁
The most commonly used mqadmin commands are: 1.
updateTopic Update or create topic
deleteTopic Delete topic from broker and NameServer.
updateSubGroup Update or create subscription group
deleteSubGroup Delete subscription group from broker.
updateBrokerConfig Update broker's config
topicRoute Examine topic route info
topicStatus Examine topic Status info
brokerStatus Fetch broker runtime status data
queryMsgById Query Message by Id
queryMsgByKey Query Message by Key
queryMsgByOffset Query Message by offset
printMsg Print Message Detail
producerConnection Query producer's socket connection and client version
consumerConnection Query consumer's socket connection, client version and subscription
consumerProgress Query consumers's progress, speed
consumerStatus Query consumer's internal data structure
cloneGroupOffset clone offset from other group.
clusterList List all of clusters
topicList Fetch all topic list from name server
updateKvConfig Create or update KV config.
deleteKvConfig Delete KV config.
wipeWritePerm Wipe write perm of broker in all name server
resetOffsetByTime Reset consumer offset by timestamp(without client restart).
updateOrderConf Create or update or delete order conf
cleanExpiredCQ Clean expired ConsumeQueue on broker.
startMonitoring Start Monitoring
checkMsg Check Message Store
statsAll Topic and Consumer tps stats
syncDocs Synchronize wiki and issue to github.com
See 'mqadmin help <command>' for more information on a specific command.
按照msgId查詢消息:mqadmin queryMsgById -i 0A00020F00002A9F000000000000D352-n
127.0.0.1:9876
2.
下載下傳代碼:git clone https://github.com/tgou/rocketmq-console a.
配置config.properties中的namesrv位址 b.
打包:mvn clean package -Dmaven.test.skip=true c.
放到web容器中運作 d.
安裝web管理界面 3.
運維
2015年10月22日 17:09
分區 rocketmq 的第 5 頁
事務日志表設計 1.
設計要點 2.
dispatch的時候如果是prepare的事務消息,插入db(可以批量提高性能,需要考慮db不可用如何處理) ○
位置在DefaultMessageStore的doDispatch方法中 ○
prepare事務日志插入 •
dispatch的時候如果是rollback事務消息,删除db事務日志 ○
位置在DefaultMessageStore的doDispatch方法中 ○
rollback事務日志删除 •
同上 ○
commit事務日志删除 •
周遊目前broker的XX秒之内的prepare事務日志,并通過offset拿到該消息的詳情,通過producer_group拿到機器ip進行回查 ○
每個broker單線程定時程式回查prepare事務日志 ○
check事務回查 •
由于requestDispatch中的消息消費順序跟commitLog的順序是一樣的,是以在每次事務日志成功寫入db之後更新目前事務日志的checkpoint(此處可以進行批量處
理之後再記錄checkpoint)
○
事務日志復原的時候,拿到所有checkpoint的最小值進行recovery,重新走dispatch流程 ○
事務日志redo •
○ 由于recover消息的代碼是在BrokerController的load期間進行的,是以初始化db的代碼要在load的時候加載,而不能放到start的時候加載
• 事務消息恢複
屬性名 類型 長度 備注
id long 表主鍵,應用中不使用
broker_name varchar 50 配置檔案裡的brokerName,一定要配置,而且不要改
offset bigint broker内commitLog的offset
producer_group varchar 50 生産者配置的group,不要輕易修改,如果更改也要進行版本相容,要不然事務回查會有問題
gmt_create datetime 日志建立時間
事務設計
2015年10月19日 16:12
分區 rocketmq 的第 6 頁