參數解析系列不會寫太多,看到哪兒寫到哪兒
根據RocketMQ3.2.4的開發手冊定義,compressMsgBodyOverHowmuch 是指 “消息 Body 超過多大開始壓縮(Consumer
收到消息會自動解壓縮),機關位元組”,預設是1024*4即4096
此參數在 FiltersrvConfig.java和DefaultMQProducer.java 中均有定義
前者是DefaultRequestProcessor.java調用 FiltersrvController時使用,後者是 DefaultMQProducerImpl.java使用。
DefaultMQProducerImpl.java
//org.apache.rocketmq.filtersrv.processor.DefaultRequestProcessor.java
private ByteBuffer messageToByteBuffer(final MessageExt msg) throws IOException {
int sysFlag = MessageSysFlag.clearCompressedFlag(msg.getSysFlag());
if (msg.getBody() != null) {
if (msg.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch()) {
byte[] data = UtilAll.compress(msg.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel());
if (data != null) {
msg.setBody(data);
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}
}
}
//此方法還有代碼未寫完 略。。。
}
DefaultMQProducerImpl.java的調用
//org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.java
private boolean tryToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) {
//batch dose not support compressing right now
return false;
}
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = UtilAll.compress(body, zipCompressLevel);
if (data != null) {
msg.setBody(data);
return true;
}
} catch (IOException e) {
log.error("tryToCompressMessage exception", e);
log.warn(msg.toString());
}
}
}
return false;
}
由上可知 完成壓縮消息的是
UtilAll.compress(final byte[] src, final int level)
方法
再看其定義
//org.apache.rocketmq.common.UtilAll.java
public static byte[] compress(final byte[] src, final int level) throws IOException {
byte[] result = src;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater);
try {
deflaterOutputStream.write(src);
deflaterOutputStream.finish();
deflaterOutputStream.close();
result = byteArrayOutputStream.toByteArray();
} catch (IOException e) {
defeater.end();
throw e;
} finally {
try {
byteArrayOutputStream.close();
} catch (IOException ignored) {
}
defeater.end();
}
return result;
}
此處預設level=5(在FiltersrvConfig.java中有定義
綜上所述,當配置參數compressMsgBodyOverHowmuch後,RocketMQ按照規則判定消息body大小時,使用 jdk自帶的java.util.zip.Deflater 對body進行壓縮處理。