問題的提出
存在就是有意義的,那麼ConsumeQueue中存消息tag的hashcode是什麼目的呢?
查到的資料是用于消息的過濾,因為Consumer可以根據主題和tag消費消息
.subscribe("TopicTest", "TagA");
那麼在消息過濾是在broker還是Conumser呢?按照常理是在broker,因為在broker可以減少流量,實際情況是在broker過濾大部分,Consumer過濾一小部分
ConsumeQueue的結構
ConsumeQueue存的是主題的邏輯資訊,如下圖所示,代表一條記錄。其中記錄的資訊存儲在commitLog中,位置是CommitLog Offset。
Offset用于标記消息在CommitLog中的位置
Size用于标記消息的大小
HashCode用于過濾消息
源碼跟蹤
SubscriptionData的建構(Consumer啟動)
Consumer一般會有訂閱的主題和tag
consumer.subscribe("TopicTest", "TagA");
跟進去會跟到FilterAPI的buildSubscriptionData方法
public static SubscriptionData buildSubscriptionData(String topic, String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
//添加tag的set
//添加tag的set
//添加tag的set
subscriptionData.getTagsSet().add(trimString);
//添加tag的hashcode的set
//添加tag的hashcode的set
//添加tag的hashcode的set
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
總結:SubscriptionData包含了tag清單和tag的hashcode清單
broker過濾消息
首先Consumer給broker發送消息,請求code是 RequestCode.PULL_MESSAGE ,是以我們可以跟borker裡對這個請求碼的處理的processor,最後定位到
PullMessageProcessor###processRequest方法,方法裡有如下的代碼
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
跟DefaultMessageStore###getMessage方法
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
//省略
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//擷取消息的偏移量
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
//擷取消息的大小
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
//擷取消息的tag的hashcode
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
//省略
//省略
//省略
//檢視消息tag是否比對,此時在broker實作過濾
//檢視消息tag是否比對,此時在broker實作過濾
//檢視消息tag是否比對,此時在broker實作過濾
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
//省略
//省略
//省略
return getResult;
}
跟進比對方法,此時能發現過濾方法是看subscriptionData裡是否有包含tagsCode
//ExpressionMessageFilter###
```java
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
//省略
//省略
//省略
//訂閱主題裡是否包含這個hashcode
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
} else {
//省略
}
總結:broker是根據subscriptionData裡的tag的hashcode清單去過濾消息,判斷從ConsumeQueue中讀取的tag的hashcode是否在subscriptionData裡的tag的hashcode清單中。
Consumer過濾消息
Consumer開始跟的地方在DefaultMQPushConsumerImpl###pullMessage方法裡有一個PullCallback,此方法是一個給broker發送拉取消息後的一個回調方法
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
//省略
}
跟一下PullAPIWrapper###processPullResult方法
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
//省略
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
//Consumer端過濾消息
//Consumer端過濾消息
//Consumer端過濾消息
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
//省略
return pullResult;
}
總結:broker端的消息過濾是通過看subscriptionData裡的tag清單是否含有目前消息的tag