1.環境參考
benchmark環境搭建:參考單機快速搭建單broker環境
被壓測環境:rocketmq的dledger叢集
2.源碼位置
https://github.com/apache/rocketmq/tree/master/example/src/main/java/org/apache/rocketmq/example/benchmark
3.工具清單
consumer.sh:消息消費的benchmark工具
producer.sh: 消息生産benchmark工具(同步非批處理模式)
3.1 producer.sh
3.1.1 幫助
sh producer.sh -h
usage: benchmarkProducer [-h] [-k <arg>] [-n <arg>] [-s <arg>] [-t <arg>] [-w <arg>]
-h,--help Print help
-k,--keyEnable <arg> Message Key Enable, Default: false //是否使用message key,true則使用timestamp作為message的key
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 //指定nameserver位址
-s,--messageSize <arg> Message Size, Default: 128 //指定消息大小,預設128位元組
-t,--topic <arg> Topic name, Default: BenchmarkTest //指定topic,預設使用BenchmarkTest,如果指定其他記得先建立對應的topic
-w,--threadCount <arg> Thread count, Default: 64 //開啟的發送生産消息的線程數
3.1.2 源碼重要片段
//預設生産組為:benchmark_producer
final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer");
//如果keyEnable為True,則會以時間戳作為message的key
if (keyEnable) {
msg.setKeys(String.valueOf(beginTimestamp / 1000));
}
//設定producer用于發送消息的線程池大小,-w的值
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
3.1.3 例子
指定nameserver進行生産消息壓測
sh producer.sh -n xxx.xxx.xxx.xxx:9876

SendTPS:生産消息的TPS
Max RT:最大響應時間(毫秒)
Average RT:平均響應時間(毫秒)
Send Failed:發送失敗的總請求數
Response Failed:傳回失敗的總響應數
這裡剛開始有發生失敗的原因是由于producer剛啟動,短期内對broker造成了壓力。在實際使用producer的時候,應該對發送失敗的情況進行重新消息重發。
可以看到控制台裡Produce Message TPS為3000多,其中slave是從master同步消息的TPS(備份master的消息資料),master才是實際接收的生産消息TPS。
3.2 consumer.sh
3.2.1 幫助
sh consumer.sh -h
usage: benchmarkConsumer [-e <arg>] [-f <arg>] [-g <arg>] [-h] [-n <arg>] [-p <arg>] [-r <arg>] [-t <arg>]
-e,--expression <arg> filter expression content file path.ie: ./test/expr //配合filter參數使用,過濾的條件表達式
-f,--filterType <arg> TAG, SQL92 //過濾方式
-g,--group <arg> Consumer group name, Default: benchmark_consumer //指定消費組,預設為benchmark_consumer
-h,--help Print help
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 //指定nameserver位址
-p,--group prefix enable <arg> Consumer group name, Default: false //是否給消費組添加字尾,預設會給指定的消費組後添加字尾,預設應該是true(提示有問題)
-r,--fail rate <arg> consumer fail rate, default 0 //指定消費失敗率,隻要沒有超過消費失敗率,消費失敗都會重試
-t,--topic <arg> Topic name, Default: BenchmarkTest //指定topic,預設使用BenchmarkTest,如果指定其他記得先建立對應的topic
3.2.2 源碼重要片段
#根據指定的消費group生成消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
#指定nameserver位址
if (commandLine.hasOption('n')) {
String ns = commandLine.getOptionValue('n');
consumer.setNamesrvAddr(ns);
}
#如果沒有指定isSuffixEnable,即-p指定的數值,則會給消費組加上字尾
if (Boolean.parseBoolean(isSuffixEnable)) {
group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
}
#指定topic的消息過濾器,隻消費符合條件的消息
#SQL92文法
#TAG語言
if (filterType == null || expression == null) {
consumer.subscribe(topic, "*");
} else {
if (ExpressionType.TAG.equals(filterType)) {
String expr = MixAll.file2String(expression);
System.out.printf("Expression: %s%n", expr);
consumer.subscribe(topic, MessageSelector.byTag(expr));
} else if (ExpressionType.SQL92.equals(filterType)) {
String expr = MixAll.file2String(expression);
System.out.printf("Expression: %s%n", expr);
consumer.subscribe(topic, MessageSelector.bySql(expr));
} else {
throw new IllegalArgumentException("Not support filter type! " + filterType);
}
}
#如果目前消費比例小于failRate,會稍後進行重試消費,否則直接跳過
if (ThreadLocalRandom.current().nextDouble() < failRate) {
statsBenchmarkConsumer.getFailCount().incrementAndGet();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
3.2.3 例子
#從BenchmarkTest消費消息,這裡會自動給消費組:test2加上一個字尾
sh consumer.sh -t BenchmarkTest -n nameserver:9876 -g test2
TPS: 消費的TPS
FAIL:消費失敗的總數
AVG(B2C):broker到Consumer的平均響應時間(毫秒)
AVG(S2C):nameserver到Consumer的平均響應時間(毫秒)
MAX(B2C): broker到Consumer的最大響應時間(毫秒)
MAX(S2C): nameserver到Consumer的最大響應時間(毫秒)
可以看到控制台裡Consumer Message TPS為6w多,遠大于producer的tps,且消費隻從Master請求消息。
傳送門:2021最新測試資料與大廠招聘合集
部落客:測試生财(一個不為996而996的測開碼農)
座右銘:專注測試開發與自動化運維,努力讀書思考寫作,為内卷的人生奠定财務自由。
内容範疇:技術提升,職場雜談,事業發展,閱讀寫作,投資理财,健康人生。
csdn:https://blog.csdn.net/ccgshigao
部落格園:https://www.cnblogs.com/qa-freeroad/
51cto:https://blog.51cto.com/14900374
微信公衆号:測試生财(定期分享獨家内容和資源)
![]()
RocketMQ系列:rocketmq的benchmark工具1.環境參考2.源碼位置3.工具清單