天天看點

RocketMQ源碼解析—Broker處理消息發送請求

閱讀須知

  • 文章中使用注釋的方法會做深入分析

正文

在前面分析Broker的文章,我們看到,Broker其實同樣也是用 NettyRemotingServer 來處理遠端調用,在之前分析NameServer的請求處理源碼中,我們已經分析了在Netty中注冊的各種ChannelHandler的作用,既然是複用的是相同的 NettyRemotingServer 類,那麼 Broker 請求處理流程的不同點在哪裡呢?不知道大家是否還記得,在分析 Broker 初始化的文章中,我們分析了 BrokerController 的 initialize 方法,方法中會調用 BrokerController 的 registerProcessor 方法注冊各種處理器,而我們在分析 NameServer 的請求處理流程中,NameServer 處理請求使用的是預設處理器 DefaultRequestProcessor,是以不同點就在這裡啦,建議大家先去回顧下 NameServer 的請求處理流程。

下面我們來分析Broker中注冊的各個Processor的作用:

SendMessageProcessor:

public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
    SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
        	/* 用戶端消費消息後的回執處理(用戶端代碼抛出異常、傳回重試狀态等異常情況下會觸發) */
            return this.consumerSendMsgBack(ctx, request);
        default:
        	// 反射解析請求頭
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return null;
            }
            // 建構消息上下文
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            // 執行發送消息鈎子sendMessageBefore方法
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
            RemotingCommand response;
            if (requestHeader.isBatch()) {
            	// 批量發送消息
                response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
            	/* 單條發送消息 */
                response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
            }
            // 執行發送消息鈎子sendMessageAfter方法
            this.executeSendMessageHookAfter(response, mqtraceContext);
            return response;
    }
}
           

SendMessageProcessor:

private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
    throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    // 解析請求頭,包含分組、topic、最大重試次數等資訊
    final ConsumerSendMsgBackRequestHeader requestHeader =
        (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
    // 如果注冊了消費消息處理鈎子(ConsumeMessageHook),則執行其consumeMessageAfter方法,預設無鈎子
    if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
        ConsumeMessageContext context = new ConsumeMessageContext();
        context.setConsumerGroup(requestHeader.getGroup());
        context.setTopic(requestHeader.getOriginTopic());
        context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
        context.setCommercialRcvTimes(1);
        context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
        this.executeConsumeMessageHookAfter(context);
    }
    // 擷取訂閱組配置
    SubscriptionGroupConfig subscriptionGroupConfig =
        this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
    if (null == subscriptionGroupConfig) {
        response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
        response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
            + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
        return response;
    }
    // 校驗寫權限
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
        return response;
    }
    // 如果重試隊列數量小于等于0,直接傳回成功
    if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
    // 生成用于重試的topic,生成方式為%RETRY%字首加上訂閱組名稱
    String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
    int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
    int topicSysFlag = 0;
    if (requestHeader.isUnitMode()) {
        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
    }
    // 建立topic配置,如果緩存中存在,則讀取緩存中的配置,如果緩存中不存在,則建立并将配置落盤持久化
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
        newTopic,
        subscriptionGroupConfig.getRetryQueueNums(),
        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    if (null == topicConfig) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("topic[" + newTopic + "] not exist");
        return response;
    }
    if (!PermName.isWriteable(topicConfig.getPerm())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
        return response;
    }
    // 根據消息偏移量在消息存儲中讀取消息内容
    MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
    if (null == msgExt) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("look message by offset failed, " + requestHeader.getOffset());
        return response;
    }
    // 擷取重試消息的原始topic
    final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
    if (null == retryTopic) {
        MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
    }
    msgExt.setWaitStoreMsgOK(false);
    // 擷取延遲發送級别
    int delayLevel = requestHeader.getDelayLevel();
    // 擷取最大重試次數,16次
    int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
    if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
        maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
    }
    // 判斷消息重試次數是否超過了最大重試次數
    if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
        || delayLevel < 0) {
        // 超過最大重試次數進入删除隊列,生成删除隊列topic,生成方式為%DLQ%字首加上訂閱組名稱
        newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
        // 建構topic配置
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
            DLQ_NUMS_PER_GROUP,
            PermName.PERM_WRITE, 0
        );
        if (null == topicConfig) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("topic[" + newTopic + "] not exist");
            return response;
        }
    } else {
    	// 如果延遲發送級别為0,則将級别加3,也就是跳過了兩個級别
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);
    }
    // 建構broker内部的消息對象
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(newTopic);
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
    msgInner.setQueueId(queueIdInt);
    msgInner.setSysFlag(msgExt.getSysFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
    String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
    MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
    // 将消息持久化到消息存儲中,等待處理
    PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    if (putMessageResult != null) {
    	// 處理持久化結果
        switch (putMessageResult.getPutMessageStatus()) {
            case PUT_OK:
                String backTopic = msgExt.getTopic();
                String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                if (correctTopic != null) {
                    backTopic = correctTopic;
                }
                this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
                return response;
            default:
                break;
        }
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(putMessageResult.getPutMessageStatus().name());
        return response;
    }
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("putMessageResult is null");
    return response;
}
           

這裡會涉及到消息的讀取和持久化,這部分内容我們用單獨的文章來分析,接下來我們來分析消息發送的流程,我們以單條消息發送的流程為例:

SendMessageProcessor:

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                    final RemotingCommand request,
                                    final SendMessageContext sendMessageContext,
                                    final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
    response.setOpaque(request.getOpaque());
    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    log.debug("receive SendMessage request command, {}", request);
    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    // 校驗broker開始接受消息發送請求的時間,如果目前時間小于這個時間,傳回異常,表示broker還未開始接收請求
    if (this.brokerController.getMessageStore().now() < startTimstamp) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
        return response;
    }
    response.setCode(-1);
    // 消息檢查,例如broker權限校驗、topic是否與系統自動建立的topic名稱沖突、topic配置校驗、隊列校驗等
    super.msgCheck(ctx, requestHeader, response);
    if (response.getCode() != -1) {
        return response;
    }
    final byte[] body = request.getBody();
    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (queueIdInt < 0) {
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    }
    // 封裝broker内部使用的消息對象
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);
    /* 處理重試和删除隊列 */
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return response;
    }
    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setPropertiesString(requestHeader.getProperties());
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    PutMessageResult putMessageResult = null;
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    // 根據請求頭中的屬性判斷是否是事務消息
    if (traFlag != null && Boolean.parseBoolean(traFlag)) {
    	// 如果broker配置了拒絕事務消息,則傳回沒有權限
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        // 事務消息持久化
        putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
    	// 普通消息持久化到消息存儲中,等待處理
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    }
    /* 處理存放消息結果 */
    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
           

這裡涉及到RocketMQ的事務消息,我們同樣會用單獨的文章分析。

SendMessageProcessor:

private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
                                  RemotingCommand request,
                                  MessageExt msg, TopicConfig topicConfig) {
    String newTopic = requestHeader.getTopic();
    // 判斷是否包含重試topic字首
    if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
        String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
        // 擷取訂閱組配置
        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
        if (null == subscriptionGroupConfig) {
            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
            response.setRemark(
                "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
            return false;
        }
        // 擷取訂閱組配置的最大重試次數
        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
        }
        // 擷取目前重試次數
        int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
        // 判斷目前重試次數是否已經是最大重試次數
        if (reconsumeTimes >= maxReconsumeTimes) {
            // 超過最大重試次數進入删除隊列,生成删除隊列topic,生成方式為%DLQ%字首加上訂閱組名稱
            newTopic = MixAll.getDLQTopic(groupName);
            int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
            // 建構topic配置
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                DLQ_NUMS_PER_GROUP,
                PermName.PERM_WRITE, 0
            );
            msg.setTopic(newTopic);
            msg.setQueueId(queueIdInt);
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return false;
            }
        }
    }
    int sysFlag = requestHeader.getSysFlag();
    if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
        sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
    }
    msg.setSysFlag(sysFlag);
    return true;
}
           

SendMessageProcessor:

private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
                                               RemotingCommand request, MessageExt msg,
                                               SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
                                               int queueIdInt) {
    if (putMessageResult == null) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("store putMessage return null");
        return response;
    }
    boolean sendOK = false;
    // 判斷存放消息的狀态是否成功
    switch (putMessageResult.getPutMessageStatus()) {
    	// 從這開始是成功狀态
        case PUT_OK:
            sendOK = true;
            response.setCode(ResponseCode.SUCCESS);
            break;
        case FLUSH_DISK_TIMEOUT:
            response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
            sendOK = true;
            break;
        case FLUSH_SLAVE_TIMEOUT:
            response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
            sendOK = true;
            break;
        case SLAVE_NOT_AVAILABLE:
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            sendOK = true;
            break;
        // 從這開始是失敗狀态
        case CREATE_MAPEDFILE_FAILED:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("create mapped file failed, server is busy or broken.");
            break;
        case MESSAGE_ILLEGAL:
        case PROPERTIES_SIZE_EXCEEDED:
            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
            response.setRemark(
                "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
            break;
        case SERVICE_NOT_AVAILABLE:
            response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
            response.setRemark(
                "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
            break;
        case OS_PAGECACHE_BUSY:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
            break;
        case UNKNOWN_ERROR:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("UNKNOWN_ERROR");
            break;
        default:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("UNKNOWN_ERROR DEFAULT");
            break;
    }
    String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
    if (sendOK) {
    	// 增加topic存放數量、topic存放大小等狀态值
        this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
        this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
            putMessageResult.getAppendMessageResult().getWroteBytes());
        this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
        response.setRemark(null);
        responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
        responseHeader.setQueueId(queueIdInt);
        responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
        // 如果不是單向的RPC請求,則傳回響應結果
        doResponse(ctx, request, response);
        // 如果有發送消息鈎子,則将消息id、隊列資訊、發送狀态、次數等資訊設定到上下文,供鈎子使用
        if (hasSendMessageHook()) {
            sendMessageContext.setMsgId(responseHeader.getMsgId());
            sendMessageContext.setQueueId(responseHeader.getQueueId());
            sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
            int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
            int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
            int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
        return null;
    } else {
    	// 如果是失敗狀态并且有發送消息鈎子,則将發送狀态、次數等資訊設定到上下文,供鈎子使用
        if (hasSendMessageHook()) {
            int wroteSize = request.getBody().length;
            int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
    }
    return response;
}
           

到這裡,Broker 處理消息發送請求的流程就分析完成啦。

繼續閱讀