閱讀須知
- 文章中使用注釋的方法會做深入分析
正文
在前面分析Broker的文章,我們看到,Broker其實同樣也是用 NettyRemotingServer 來處理遠端調用,在之前分析NameServer的請求處理源碼中,我們已經分析了在Netty中注冊的各種ChannelHandler的作用,既然是複用的是相同的 NettyRemotingServer 類,那麼 Broker 請求處理流程的不同點在哪裡呢?不知道大家是否還記得,在分析 Broker 初始化的文章中,我們分析了 BrokerController 的 initialize 方法,方法中會調用 BrokerController 的 registerProcessor 方法注冊各種處理器,而我們在分析 NameServer 的請求處理流程中,NameServer 處理請求使用的是預設處理器 DefaultRequestProcessor,是以不同點就在這裡啦,建議大家先去回顧下 NameServer 的請求處理流程。
下面我們來分析Broker中注冊的各個Processor的作用:
SendMessageProcessor:
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
/* 用戶端消費消息後的回執處理(用戶端代碼抛出異常、傳回重試狀态等異常情況下會觸發) */
return this.consumerSendMsgBack(ctx, request);
default:
// 反射解析請求頭
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
// 建構消息上下文
mqtraceContext = buildMsgContext(ctx, requestHeader);
// 執行發送消息鈎子sendMessageBefore方法
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
// 批量發送消息
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
/* 單條發送消息 */
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
// 執行發送消息鈎子sendMessageAfter方法
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
SendMessageProcessor:
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 解析請求頭,包含分組、topic、最大重試次數等資訊
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
// 如果注冊了消費消息處理鈎子(ConsumeMessageHook),則執行其consumeMessageAfter方法,預設無鈎子
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
ConsumeMessageContext context = new ConsumeMessageContext();
context.setConsumerGroup(requestHeader.getGroup());
context.setTopic(requestHeader.getOriginTopic());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
this.executeConsumeMessageHookAfter(context);
}
// 擷取訂閱組配置
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
// 校驗寫權限
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response;
}
// 如果重試隊列數量小于等于0,直接傳回成功
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
// 生成用于重試的topic,生成方式為%RETRY%字首加上訂閱組名稱
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
// 建立topic配置,如果緩存中存在,則讀取緩存中的配置,如果緩存中不存在,則建立并将配置落盤持久化
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return response;
}
// 根據消息偏移量在消息存儲中讀取消息内容
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return response;
}
// 擷取重試消息的原始topic
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
// 擷取延遲發送級别
int delayLevel = requestHeader.getDelayLevel();
// 擷取最大重試次數,16次
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
// 判斷消息重試次數是否超過了最大重試次數
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// 超過最大重試次數進入删除隊列,生成删除隊列topic,生成方式為%DLQ%字首加上訂閱組名稱
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
// 建構topic配置
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
} else {
// 如果延遲發送級别為0,則将級别加3,也就是跳過了兩個級别
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
// 建構broker内部的消息對象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
// 将消息持久化到消息存儲中,等待處理
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
// 處理持久化結果
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
}
這裡會涉及到消息的讀取和持久化,這部分内容我們用單獨的文章來分析,接下來我們來分析消息發送的流程,我們以單條消息發送的流程為例:
SendMessageProcessor:
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("receive SendMessage request command, {}", request);
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
// 校驗broker開始接受消息發送請求的時間,如果目前時間小于這個時間,傳回異常,表示broker還未開始接收請求
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
// 消息檢查,例如broker權限校驗、topic是否與系統自動建立的topic名稱沖突、topic配置校驗、隊列校驗等
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
// 封裝broker内部使用的消息對象
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
/* 處理重試和删除隊列 */
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 根據請求頭中的屬性判斷是否是事務消息
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
// 如果broker配置了拒絕事務消息,則傳回沒有權限
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
// 事務消息持久化
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
// 普通消息持久化到消息存儲中,等待處理
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
/* 處理存放消息結果 */
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
這裡涉及到RocketMQ的事務消息,我們同樣會用單獨的文章分析。
SendMessageProcessor:
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
RemotingCommand request,
MessageExt msg, TopicConfig topicConfig) {
String newTopic = requestHeader.getTopic();
// 判斷是否包含重試topic字首
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
// 擷取訂閱組配置
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(
"subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return false;
}
// 擷取訂閱組配置的最大重試次數
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
// 擷取目前重試次數
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
// 判斷目前重試次數是否已經是最大重試次數
if (reconsumeTimes >= maxReconsumeTimes) {
// 超過最大重試次數進入删除隊列,生成删除隊列topic,生成方式為%DLQ%字首加上訂閱組名稱
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
// 建構topic配置
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
msg.setTopic(newTopic);
msg.setQueueId(queueIdInt);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return false;
}
}
}
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
msg.setSysFlag(sysFlag);
return true;
}
SendMessageProcessor:
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg,
SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
int queueIdInt) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
return response;
}
boolean sendOK = false;
// 判斷存放消息的狀态是否成功
switch (putMessageResult.getPutMessageStatus()) {
// 從這開始是成功狀态
case PUT_OK:
sendOK = true;
response.setCode(ResponseCode.SUCCESS);
break;
case FLUSH_DISK_TIMEOUT:
response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
sendOK = true;
break;
case FLUSH_SLAVE_TIMEOUT:
response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
sendOK = true;
break;
case SLAVE_NOT_AVAILABLE:
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
sendOK = true;
break;
// 從這開始是失敗狀态
case CREATE_MAPEDFILE_FAILED:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("create mapped file failed, server is busy or broken.");
break;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(
"the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
break;
case SERVICE_NOT_AVAILABLE:
response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark(
"service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
break;
case OS_PAGECACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
break;
case UNKNOWN_ERROR:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR");
break;
default:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR DEFAULT");
break;
}
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
if (sendOK) {
// 增加topic存放數量、topic存放大小等狀态值
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
response.setRemark(null);
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
// 如果不是單向的RPC請求,則傳回響應結果
doResponse(ctx, request, response);
// 如果有發送消息鈎子,則将消息id、隊列資訊、發送狀态、次數等資訊設定到上下文,供鈎子使用
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
sendMessageContext.setQueueId(responseHeader.getQueueId());
sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
return null;
} else {
// 如果是失敗狀态并且有發送消息鈎子,則将發送狀态、次數等資訊設定到上下文,供鈎子使用
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
}
return response;
}
到這裡,Broker 處理消息發送請求的流程就分析完成啦。