天天看點

【RocketMQ】主從模式下的消費進度管理

作者:NEDHOME
【RocketMQ】主從模式下的消費進度管理

from:cnblogs.com/shanml/p/16989785.htm

消費者在啟動的時候,會建立消息拉取API對象PullAPIWrapper,調用pullKernelImpl方法向Broker發送拉取消息的請求,那麼在主從模式下消費者是如何選擇向哪個Broker發送拉取請求的?

進入pullKernelImpl方法中,可以看到會調用recalculatePullFromWhichNode方法選擇一個Broker:

public class PullAPIWrapper {
    public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        
         // 調用recalculatePullFromWhichNode方法擷取Broker ID,再調用findBrokerAddressInSubscribe根據ID擷取Broker的相關資訊
         FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
         // ...

         if (findBrokerResult != null) {
            // ...

            // 擷取Broker位址
            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }
            // 發送消息拉取請求
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);

            return pullResult;
         }
    }
}
           

在recalculatePullFromWhichNode方法中,會從pullFromWhichNodeTable中根據消息隊列擷取一個建議的Broker ID,如果擷取為空就傳回Master節點的Broker ID,ROCKETMQ中Master角色的Broker ID為0,既然從pullFromWhichNodeTable中可以知道從哪個Broker拉取資料,那麼pullFromWhichNodeTable中的資料又是從哪裡來的?

public class PullAPIWrapper {
    // KEY為消息隊列,VALUE為建議的Broker ID
    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    
    public long recalculatePullFromWhichNode(final MessageQueue mq) {
        if (this.isConnectBrokerByUser()) {
            return this.defaultBrokerId;
        }
        // 從pullFromWhichNodeTable中擷取建議的broker ID
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (suggest != null) {
            return suggest.get();
        }
        // 傳回Master Broker ID
        return MixAll.MASTER_ID;
    }
}
           
【RocketMQ】主從模式下的消費進度管理

通過調用關系可知,在updatePullFromWhichNode方法中更新了pullFromWhichNodeTable的值,而updatePullFromWhichNode方法又是被processPullResult方法調用的,消費者向Broker發送拉取消息請求後,Broker對拉取請求進行處理時會設定一個broker ID(後面會講到),建議下次從這個Broker拉取消息,

消費者對拉取請求傳回的響應資料進行處理時會調用processPullResult方法,在這裡将建議的BrokerID取出,調用updatePullFromWhichNode方法将其加入到了pullFromWhichNodeTable中:

public class PullAPIWrapper {
    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    
    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;
        // 将拉取消息請求傳回的建議Broker ID,加入到pullFromWhichNodeTable中
        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

        // ...
    }

    public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (null == suggest) {
            // 向pullFromWhichNodeTable中添加資料
            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
        } else {
            suggest.set(brokerId);
        }
    }
}
           

接下來去看下是根據什麼條件決定選擇哪個Broker的。

傳回建議的BrokerID

Broker在處理消費者拉取請求時,會調用PullMessageProcessor的processRequest方法,首先會調用MessageStore的getMessage方法擷取消息内容,在傳回的結果GetMessageResult中設定了一個是否建議從Slave節點拉取的屬性(這個值的設定稍後再說),會根據是否建議從slave節點進行以下處理:

  1. 如果建議從slave節點拉取消息,會調用subscriptionGroupConfig訂閱分組配置的getWhichBrokerWhenConsumeSlowly方法擷取從節點将ID設定到響應中,否則下次依舊建議從主節點拉取消息,将MASTER節點的ID設定到響應中;
  2. 判斷目前Broker的角色,如果是slave節點,并且配置了不允許從slave節點讀取資料(SlaveReadEnable = false),此時依舊建議從主節點拉取消息,将MASTER節點的ID設定到響應中;
  3. 如果開啟了允許從slave節點讀取資料(SlaveReadEnable = true),有以下兩種情況:
  • 如果建議從slave節點拉消息,從訂閱分組配置中擷取從節點的ID,将ID設定到響應中;
  • 如果不建議從slave節點拉取消息,從訂閱分組配置中擷取設定的Broker Id;

    當然,如果未開啟允許從Slave節點讀取資料,下次依舊建議從Master節點拉取;

訂閱分組配置

mqadmin指令的-i參數可以指定從哪個Broker消費消息(subscriptionGroupConfig的getBrokerId傳回的值),-w參數可以指定建議從slave節點消費的時候,從哪個slave消費(subscriptionGroupConfig的getWhichBrokerWhenConsumeSlowly方法傳回的值):

usage: mqadmin updateSubGroup [-a <arg>] [-b <arg>] [-c <arg>] [-d <arg>] -g <arg> [-h] [-i <arg>] [-m <arg>]
       [-n <arg>] [-q <arg>] [-r <arg>] [-s <arg>] [-w <arg>]
 -i,--brokerId <arg>                       consumer from which broker id
 -w,--whichBrokerWhenConsumeSlowly <arg>   which broker id when consume slowly
           
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
        throws RemotingCommandException {
        // ...

        // 根據拉取偏移量擷取消息
        final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
        if (getMessageResult != null) {
            response.setRemark(getMessageResult.getStatus().name());
            responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
            responseHeader.setMinOffset(getMessageResult.getMinOffset());
            responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
            // 是否建議從從節點拉取消息
            if (getMessageResult.isSuggestPullingFromSlave()) {
                // 選擇一個從節點
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
            } else {
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }
            // 判斷Broker的角色
            switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    break;
                case SLAVE:
                    // 如果不允許從從節點讀取資料,設定為MasterID
                    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                    }
                    break;
            }
            // 如果開啟了允許從從節點讀取資料
            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                // 如果建議從從節點拉消息
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    // 擷取從節點
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                }
                else {
                    // 擷取指定的broker
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                }
            } else {
                // 使用Master節點
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }
        } else {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("store getMessage return null");
        }
    }
}
           

是否建議從Slave節點拉取的設定

DefaultMessageStore的getMessage方法中用于擷取消息内容,并會根據消費者的拉取進度判斷是否建議下次從Slave節點拉取消息,判斷過程如下:

  1. diff:目前CommitLog最大的偏移量減去本次拉取消息的最大實體偏移量,表示剩餘未拉取的消息;
  2. memory:消息在PageCache中的總大小,計算方式是總實體記憶體 * 消息存儲在記憶體中的閥值(預設為40)/100,也就是說MQ會緩存一部分消息在作業系統的PageCache中,加速通路;
  3. 如果diff大于memory,表示未拉取的消息過多,已經超出了PageCache緩存的資料的大小,還需要從磁盤中擷取消息,是以此時會建議下次從Slave節點拉取;
public class DefaultMessageStore implements MessageStore {

    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        // ...
        // 目前CommitLog的最大偏移量
        final long maxOffsetPy = this.commitLog.getMaxOffset();

        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQueue();
            maxOffset = consumeQueue.getMaxOffsetInQueue();

            if (maxOffset == 0) {
              // ...
            } else {
                // 根據消費進度擷取消息隊列
                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        // ...
                        // CommitLog最大偏移量減去本次拉取消息的最大實體偏移量
                        long diff = maxOffsetPy - maxPhyOffsetPulling;
                        // 計算消息在PageCache中的總大小(總實體記憶體 * 消息存儲在記憶體中的閥值/100)
                        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                            * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                        // 是否建議下次去從節點拉取消息
                        getResult.setSuggestPullingFromSlave(diff > memory);
                    } finally {
                        bufferConsumeQueue.release();
                    }
                } else {
                    // ...
                }
            }
        } else {
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }
        // ...
        return getResult;
    }
}
           

總結

消費者在啟動後需要向Broker發送拉取消息的請求,Broker收到請求後會根據消息的拉取進度,傳回一個建議的BrokerID,并設定到響應中傳回,消費者處理響應時将建議的BrokerID放入pullFromWhichNodeTable,下次拉去消息的時候從pullFromWhichNodeTable中取出,并向其發送請求拉取消息。

消費進度持久化

上面講解了主從模式下如何選擇從哪個Broker拉取消息,接下來看下消費進度的持久化,因為廣播模式下消費進度儲存在每個消費者端,叢集模式下消費進度儲存在Broker端,是以接下來以叢集模式為例。

在【RocketMQ】消息的拉取一文中可知,叢集模式下主要是通過RemoteBrokerOffsetStore進行消費進度管理的,在持久化方法persistAll中會調用updateConsumeOffsetToBroker更新Broker端的消費進度:

public class RemoteBrokerOffsetStore implements OffsetStore {
    @Override
    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;

        final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();

        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            AtomicLong offset = entry.getValue();
            if (offset != null) {
                if (mqs.contains(mq)) {
                    try {
                        // 向Broker發送請求更新消費進度
                        this.updateConsumeOffsetToBroker(mq, offset.get());
                        log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                            this.groupName,
                            this.mQClientFactory.getClientId(),
                            mq,
                            offset.get());
                    } catch (Exception e) {
                        log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                    }
                } else {
                    unusedMQ.add(mq);
                }
            }
        }
        // ...
    }
}
           

由于updateConsumeOffsetToBroker方法中先調用了findBrokerAddressInSubscribe方法擷取Broker的資訊,是以這裡先看findBrokerAddressInSubscribe方法是如何選擇Broker的,它需要傳入三個參數,分别為:Broker名稱、Broker ID、是否隻查找參數中傳入的那個BrokerID,方法的處理邏輯如下:

  1. 首先從brokerAddrTable中根據Broker的名稱擷取所有的Broker集合(主從模式下他們的Broker名稱一緻,但是ID不一緻),KEY為BrokerID,VALUE為Broker的位址;
  2. 從Broker集合中根據參數中傳入的ID擷取broker位址;
  3. 判斷參數中傳入的BrokerID是否是主節點,記錄在slave變量中;
  4. 判斷擷取的Broker位址是否為空,記錄在found變量中;
  5. 如果根據BrokerId擷取的位址為空并且參數中傳入的BrokerId為從節點,繼續輪詢擷取下一個Broker,并判斷位址是否為空;
  6. 如果此時位址依舊為空并且onlyThisBroker傳入的false(也就是不必須選擇參數中傳入的那個BrokerID),此時擷取map集合中的第一個節點;
  7. 判斷擷取到的Broker位址是否為空,不為空封裝結果傳回,否則傳回NULL;
public class MQClientInstance {
   public FindBrokerResult findBrokerAddressInSubscribe(
        final String brokerName, // Broker名稱
        final long brokerId, // Broker ID
        final boolean onlyThisBroker // 是否隻查找參數中傳入的那個BrokerID
    ) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;
        // 擷取所有的Broker ID
        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            brokerAddr = map.get(brokerId);
            // 是否是從節點
            slave = brokerId != MixAll.MASTER_ID;
            // 位址是否為空
            found = brokerAddr != null;
            // 如果位址為空并且是從節點
            if (!found && slave) {
                // 擷取下一個Broker
                brokerAddr = map.get(brokerId + 1);
                found = brokerAddr != null;
            }
            // 如果位址為空
            if (!found && !onlyThisBroker) {
                // 擷取集合中的第一個節點
                Entry<Long, String> entry = map.entrySet().iterator().next();
                // 擷取位址
                brokerAddr = entry.getValue();
                // 是否是從節點
                slave = entry.getKey() != MixAll.MASTER_ID;
                // 置為true
                found = true;
            }
        }

        if (found) {
            // 傳回資料
            return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
        }

        return null;
    }
}
           

回到updateConsumeOffsetToBroker方法,先看第一次調用findBrokerAddressInSubscribe方法擷取Broker資訊,傳入的三個參數分别為:Broker名稱、Master節點的ID、true,根據上面講解的findBrokerAddressInSubscribe方法裡面的查找邏輯,如果查找到Master節點的資訊,就正常傳回,如果此時Master當機未能正常查找到,由于傳入的Master節點的ID并且onlyThisBroker置為true,是以會查找失敗傳回NULL。

如果第一次調用為空,會進行第二次調用,與第一次調用不同的地方是第三個參數置為了false,也就是說不是必須選擇參數中指定的那個Broker,此時依舊優先查找Master節點,如果Master節點未查找到,由于onlyThisBroker置為了false,會疊代集合選擇第一個節點傳回,此時傳回的有可能是從節點。

總結:消費者會優先選擇向主節點發送請求進行消費進度儲存,假如主節點當機等原因未能擷取到主節點的資訊,會疊代集合選擇第一個節點傳回,是以消費者也可以向從節點發送請求進行進度儲存,待主節點恢複後,依舊優先選擇主節點。

public class RemoteBrokerOffsetStore implements OffsetStore {

    private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException {
        // 更新消費進度
        updateConsumeOffsetToBroker(mq, offset, true);
    }

    @Override
    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException {
        // 第一次調用findBrokerAddressInSubscribe方法擷取Broker資訊,三個參數分别為:Broker名稱、Master節點的ID、true
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
        // 如果擷取為空,進行第二次調用
        if (null == findBrokerResult) {
            // 三個參數分别為:Broker名稱、Master節點的ID、false
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
        }
        if (findBrokerResult != null) {
            // 設定請求頭
            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setConsumerGroup(this.groupName);
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setCommitOffset(offset);
            // 發送儲存消費進度的請求
            if (isOneway) {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            } else {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            }
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    }
}
           

主從模式下的消費進度同步

BrokerController在構造函數中,執行個體化了SlaveSynchronize,并在start方法中調用了handleSlaveSynchronize方法處理從節點的資料同步,

如果目前的Broker是從節點,會注冊定時任務,定時調用SlaveSynchronize的syncAll方法進行資料同步:

public class BrokerController {

    private final SlaveSynchronize slaveSynchronize;

    public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig
    ) {
        // ...

        this.slaveSynchronize = new SlaveSynchronize(this);

        //...
    }

    public void start() throws Exception {
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            // 處理從節點的同步
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            this.registerBrokerAll(true, false, true);
        }
    }

    private void handleSlaveSynchronize(BrokerRole role) {
        // 如果是SLAVE節點
        if (role == BrokerRole.SLAVE) {
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
            // 設定定時任務,定時進行資料同步
            slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 同步資料
                        BrokerController.this.slaveSynchronize.syncAll();
                    }
                    catch (Throwable e) {
                        log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                    }
                }
            }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
        } else {
            //handle the slave synchronise
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
        }
    }

}
           

在SlaveSynchronize的syncAll方法中,又調用了syncConsumerOffset方法同步消費進度:

  1. 向主節點發送請求擷取消費進度資料;
  2. 從節點将擷取到的消費進度資料進行持久化;
public class SlaveSynchronize {
    public void syncAll() {
        this.syncTopicConfig();
        // 同步消費進度
        this.syncConsumerOffset();
        this.syncDelayOffset();
        this.syncSubscriptionGroupConfig();
    }

    private void syncConsumerOffset() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                // 向主節點發送請求擷取消費進度資訊
                ConsumerOffsetSerializeWrapper offsetWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
                // 設定資料
                this.brokerController.getConsumerOffsetManager().getOffsetTable()
                    .putAll(offsetWrapper.getOffsetTable());
                // 将擷取到的消費進度資料進行持久化
                this.brokerController.getConsumerOffsetManager().persist();
                log.info("Update slave consumer offset from master, {}", masterAddrBak);
            } catch (Exception e) {
                log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
            }
        }
    }
}
           

參考

丁威、周繼鋒《RocketMQ技術内幕》

繼續閱讀