天天看點

CousumeQueue中tag的作用

問題的提出

存在就是有意義的,那麼ConsumeQueue中存消息tag的hashcode是什麼目的呢?

查到的資料是用于消息的過濾,因為Consumer可以根據主題和tag消費消息

.subscribe("TopicTest", "TagA");      

那麼在消息過濾是在broker還是Conumser呢?按照常理是在broker,因為在broker可以減少流量,實際情況是在broker過濾大部分,Consumer過濾一小部分

ConsumeQueue的結構

ConsumeQueue存的是主題的邏輯資訊,如下圖所示,代表一條記錄。其中記錄的資訊存儲在commitLog中,位置是CommitLog Offset。

CousumeQueue中tag的作用

Offset用于标記消息在CommitLog中的位置

Size用于标記消息的大小

HashCode用于過濾消息

源碼跟蹤

SubscriptionData的建構(Consumer啟動)

Consumer一般會有訂閱的主題和tag

consumer.subscribe("TopicTest", "TagA");      

跟進去會跟到FilterAPI的buildSubscriptionData方法

public static SubscriptionData buildSubscriptionData(String topic, String subString) throws Exception {
        SubscriptionData subscriptionData = new SubscriptionData();
        subscriptionData.setTopic(topic);
        subscriptionData.setSubString(subString);

        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
        } else {
            String[] tags = subString.split("\\|\\|");
            if (tags.length > 0) {
                for (String tag : tags) {
                    if (tag.length() > 0) {
                        String trimString = tag.trim();
                        if (trimString.length() > 0) {
                            //添加tag的set
                            //添加tag的set
                            //添加tag的set
                            subscriptionData.getTagsSet().add(trimString);
                            
                            //添加tag的hashcode的set
                            //添加tag的hashcode的set
                            //添加tag的hashcode的set
                            subscriptionData.getCodeSet().add(trimString.hashCode());
                        }
                    }
                }
            } else {
                throw new Exception("subString split error");
            }
        }

        return subscriptionData;
    }      

總結:SubscriptionData包含了tag清單和tag的hashcode清單

broker過濾消息

首先Consumer給broker發送消息,請求code是 RequestCode.PULL_MESSAGE ,是以我們可以跟borker裡對這個請求碼的處理的processor,最後定位到

PullMessageProcessor###processRequest方法,方法裡有如下的代碼

final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);      

跟DefaultMessageStore###getMessage方法

public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        //省略        
        
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //擷取消息的偏移量
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                             //擷取消息的大小
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            //擷取消息的tag的hashcode
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

                            maxPhyOffsetPulling = offsetPy;

                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }


                            //省略
                            //省略
                            //省略
                            

                             //檢視消息tag是否比對,此時在broker實作過濾
                             //檢視消息tag是否比對,此時在broker實作過濾
                             //檢視消息tag是否比對,此時在broker實作過濾
                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                continue;
                            }
                              

                          //省略
                          //省略
                          //省略
        return getResult;
    }      

跟進比對方法,此時能發現過濾方法是看subscriptionData裡是否有包含tagsCode

//ExpressionMessageFilter###

```java
 @Override
    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
        
            //省略
            //省略
            //省略
        
            //訂閱主題裡是否包含這個hashcode
            return subscriptionData.getCodeSet().contains(tagsCode.intValue());
        } else {
           //省略
    }      

總結:broker是根據subscriptionData裡的tag的hashcode清單去過濾消息,判斷從ConsumeQueue中讀取的tag的hashcode是否在subscriptionData裡的tag的hashcode清單中。

Consumer過濾消息

Consumer開始跟的地方在DefaultMQPushConsumerImpl###pullMessage方法裡有一個PullCallback,此方法是一個給broker發送拉取消息後的一個回調方法

PullCallback pullCallback = new PullCallback() {
@Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

         //省略
}      

跟一下PullAPIWrapper###processPullResult方法

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
       
              //省略


          
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                    //Consumer端過濾消息
                    //Consumer端過濾消息
                    //Consumer端過濾消息
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }

            
           
           //省略

        return pullResult;
    }      

總結:broker端的消息過濾是通過看subscriptionData裡的tag清單是否含有目前消息的tag

總結:broker和Consuemr都會過濾

參考