RocketMQ版本4.6.0,記錄自己看源碼的過程
接收請求的入口已經在上篇分析過了,是以這裡直接看處理器如何處理請求。
消息接收
處理發送消息請求的處理器是SendMessageProcessor。
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
/**
* 處理發送消息請求
*/
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
// 消費者發送的重試消息
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
// 生産者發送的普通消息
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
// 處理批量消息
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
// 處理發送消息請求
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
}
這裡隻看處理單條消息的
/**
* 處理發送消息請求
*/
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
// 初始化響應
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("receive SendMessage request command, {}", request);
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
// 檢查消息發送是否合理
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
final byte[] body = request.getBody();
// 要發送到的隊列id
int queueIdInt = requestHeader.getQueueId();
// 要發送到的topic配置
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
// 建立消息類
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
// 處理重試和死信
// todo 不懂為什麼這裡會處理,因為重試不是在consumerSendMsgBack()這個方法裡處理嗎?
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 校驗是否不允許發送事務消息
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
// 使用MessageStore元件将消息存儲在本地檔案,隻存儲CommitLog檔案,
// ConsumerQueue檔案和IndexFile檔案會由背景線程異步存儲
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
// 處理消息存儲結果
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
Step1:初始化響應資料;
Step2:檢查消息發送是否合理;
Step3:擷取要發送的隊列和topic配置,如果queueId<0,則随機選擇一個;
Step4:建構消息類;
Step5:處理重試和死信,對RETRY類型的消息處理。如果超過最大消費次數,則topic修改成"%DLQ%" + 分組名,即加入死信隊列;
Step6:如果是事務消息,則需要校驗是否不允許發送事務消息;
Step7:使用MessageStore元件将消息存儲在本地檔案,隻存儲CommitLog檔案,ConsumerQueue檔案和IndexFile檔案會由背景線程異步存儲;
Step8:處理消息存儲結果。
消息存儲
存儲結構
看源碼之前,先看下存儲結構
以下内容來源于「芋道源碼」對RocketMQ源碼的分析:
CommitLog
、
MappedFileQueue
、
MappedFile
的關系如下:
CommitLog
:
MappedFileQueue
:
MappedFile
= 1 : 1 : N。
反應到系統檔案如下:

CommitLog
、
MappedFileQueue
、
MappedFile
的定義如下:
●
MappedFile
:00000000000000000000、00000000001073741824、00000000002147483648等檔案。
●
MappedFileQueue
:
MappedFile
所在的檔案夾,對
MappedFile
進行封裝成檔案隊列,對上層提供可無限使用的檔案容量。
○ 每個
MappedFile
統一檔案大小。
○ 檔案命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 CommitLog 裡預設為 1GB。
●
CommitLog
:針對
MappedFileQueue
的封裝使用。
CommitLog
目前存儲在
MappedFile
有兩種内容類型:
1.MESSAGE :消息。
2. BLANK :檔案不足以存儲消息時的空白占位。
CommitLog
存儲在
MappedFile
的結構:
MESSAGE
在
CommitLog
存儲結構:
第幾位 | 字段 | 說明 | 資料類型 | 位元組數 |
---|---|---|---|---|
1 | MsgLen | 消息總長度 | Int | 4 |
2 | MagicCode | MESSAGE_MAGIC_CODE | Int | 4 |
3 | BodyCRC | 消息内容CRC | Int | 4 |
4 | QueueId | 消息隊列編号 | Int | 4 |
5 | Flag | flag | Int | 4 |
6 | QueueOffset | 消息隊列位置 | Long | 8 |
7 | PhysicalOffset | 實體位置。在 CommitLog 的順序存儲位置 | Long | 8 |
8 | SysFlag | MessageSysFlag | Int | 4 |
9 | BornTimestamp | 生成消息時間戳 | Long | 8 |
10 | BornHost | 生效消息的位址+端口 | Long | 8 |
11 | StoreTimestamp | 存儲消息時間戳 | Long | 8 |
12 | StoreHost | 存儲消息的位址+端口 | Long | 8 |
13 | ReconsumeTimes | 重新消費消息次數 | Int | 4 |
14 | PreparedTransationOffset | Long | 8 | |
15 | BodyLength + Body | 内容長度 + 内容 | Int + Bytes | 4 + bodyLength |
16 | TopicLength + Topic | Topic長度 + Topic | Byte + Bytes | 1 + topicLength |
17 | PropertiesLength + Properties | 拓展字段長度 + 拓展字段 | Short + Bytes | 2 + PropertiesLength |
BLANK 在
CommitLog
存儲結構:
第幾位 | 字段 | 說明 | 資料類型 | 位元組數 |
---|---|---|---|---|
1 | maxBlank | 空白長度 | Int | 4 |
2 | MagicCode | BLANK_MAGIC_CODE | Int | 4 |
CommitLog存儲流程
消息存儲是通過MessageStore元件來實作的
DefaultMessageStore
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// 下面都是幾種消息拒絕寫入檔案的情況
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
// 将消息存儲到commitLog檔案中
PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
使用commitLog存儲消息:
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 延時消息,則改變主題
if (msg.getDelayTimeLevel() > 0) {
// 超過最大延時級别則設為最大延時級别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// topic設為SCHEDULE_TOPIC_XXXX
topic = ScheduleMessageService.SCHEDULE_TOPIC;
// 隊列id = 延時級别-1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 備份真實主題和隊列id,REAL_TOPIC為延時之前的topic,REAL_QID為延時之前的queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 延遲消息則需要将消息發送到延時主題上
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
long eclipsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 獲得最新的CommitLog檔案
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 存儲之前先獲得鎖,寫檔案肯定得一個個來,不能亂來
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
// 如果mappedFile為空,表明${ROCKETMQ_HOME}/store/commitlog目錄下不存在任何檔案,
// 本次是該broker第一次消息發送,需要先建立一個偏移量為0的檔案
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 将消息寫入MappedFile映射的一塊記憶體,根據刷盤政策擇機刷盤,寫入commitLog
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
// 消息追加到映射記憶體成功
case PUT_OK:
break;
// 檔案不夠大了,需要重新建立一個檔案,再追加消息
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
// 消息長度超過最大允許長度
case MESSAGE_SIZE_EXCEEDED:
// 消息屬性超過最大允許長度
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (eclipsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
// 到這裡說明消息追加到映射記憶體成功了
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// 處理刷盤
handleDiskFlush(result, putMessageResult, msg);
// 處理broker之間的主從同步
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
Step1:如果是延時消息,則需要将消息發送到延時主題上,由背景定時任務定時從延時隊列中取出消息重新放到原本主題對應的隊列裡供消費者消費。具體的流程由後面發送延時消息具體分析。
Step2:獲得最新的CommitLog檔案。
Step3:存儲之前先獲得鎖。
Step4:将消息寫入MappedFile映射的一塊記憶體,根據刷盤政策擇機刷盤,寫入commitLog。
Step5:處理追加結果。
step6:釋放鎖。
Step7:處理CommitLog檔案刷盤。
Step8:處理broker之間的主從同步。
消息寫入映射記憶體
MappedFile的屬性:
/**
* 作業系統每頁大小,預設4K
*/
public static final int OS_PAGE_SIZE = 1024 * 4;
/**
* 目前檔案寫指針的位置,預設從0開始
*/
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
protected final AtomicInteger committedPosition = new AtomicInteger(0);
/**
* 刷盤的指針位置,該指針之前的資料已經刷到磁盤了
*/
private final AtomicInteger flushedPosition = new AtomicInteger(0);
/**
* 檔案大小
*/
protected int fileSize;
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
* 如果writeBuffer不為null,則消息會先存儲在buffer,然後再送出到MappedFile對應的記憶體映射檔案Buffer
*/
protected ByteBuffer writeBuffer = null;
protected TransientStorePool transientStorePool = null;
/**
* 檔案名
*/
private String fileName;
/**
* 檔案的初始偏移量
*/
private long fileFromOffset;
/**
* 實體檔案
*/
private File file;
/**
* 對應的記憶體映射
*/
private MappedByteBuffer mappedByteBuffer;
/**
* 檔案最後一次内容寫入時間
*/
private volatile long storeTimestamp = 0;
/**
* 是否是檔案目錄中的第一個檔案
*/
private boolean firstCreateInQueue = false;
appendMessage
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
return appendMessagesInner(msg, cb);
}
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
// 先擷取MappedFile目前的寫指針
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
// 通過slice方法開辟一個子緩沖區,與mappedByteBuffer共享的記憶體區域
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
// 開始寫入映射記憶體區域
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
// 檔案已經寫滿了
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
class DefaultAppendMessageCallback implements AppendMessageCallback {
// File at the end of the minimum fixed length empty
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
private final ByteBuffer msgIdMemory;
private final ByteBuffer msgIdV6Memory;
// 存儲消息的内容
private final ByteBuffer msgStoreItemMemory;
// 消息的最大長度
private final int maxMessageSize;
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// 在commitLog中的實體位置
long wroteOffset = fileFromOffset + byteBuffer.position();
int sysflag = msgInner.getSysFlag();
int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
this.resetByteBuffer(storeHostHolder, storeHostLength);
// 建立消息ID
String msgId;
if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
} else {
msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
}
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
/**
* Serialize message
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
// 計算要存的總消息長度
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// 檔案沒有多餘的空間了,需要重新建立一個新的檔案來存,剩餘的空間寫入BLANK占位
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 将資料寫入ByteBuffer
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// 将消息寫入byteBuffer,這裡還隻是在映射的記憶體中,後面需要根據刷盤政策持久化到磁盤中
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
// 建立追加消息的結果
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
}
這裡隻是将資料追加到記憶體中,需要根據是同步刷盤還是異步刷盤方式,将消息持久化到磁盤。
處理commitLog刷盤
在啟動broker服務過程中,對BrokerController進行初始化時,會建立消息存儲服務DefaultMessageStore。而在建立DefaultMessageStore的過程中,又會建立多個存儲相關的服務,其中有個commitLog服務,接着在建立CommitLog中就會建立commitLog刷盤線程。
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
// 建立對CommitLog的刷盤服務
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盤
this.flushCommitLogService = new GroupCommitService();
} else {
// 異步刷盤
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
而這些嵌套的服務又一個接着一個啟動,最終在CommitLog中啟動commitLog刷盤線程:
public void start() {
// 啟動刷盤線程
this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
是以在broker啟動時,commitLog刷盤線程就已經啟動了,通過不斷循環的執行。
其中,刷盤線程有3種
線程服務 | 場景 | 插入消息性能 |
---|---|---|
CommitRealTimeService | 異步刷盤 && 開啟記憶體位元組緩沖區 | 第一 |
CommitRealTimeService | 異步刷盤 && 關閉記憶體位元組緩沖區 | 第二 |
GroupCommitService | 同步刷盤 | 第三 |
這裡主要分析同步刷盤。
接着上面消息寫入映射記憶體後的處理刷盤
/**
* 處理刷盤動作
*/
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 同步刷盤
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 刷盤線程
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
// 是否等待消息存儲完成後在傳回 todo 如果該值false,就相當于異步刷盤??
if (messageExt.isWaitStoreMsgOK()) {
// 建立一個刷盤request送出給刷盤線程去刷盤,其實最終就是調用MappedByteBuffer的force()方法強制将記憶體的資料刷到磁盤中
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
// 将刷盤請求送出給GroupCommitService線程,線程會定時處理該request
service.putRequest(request);
// 等待刷盤完成
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
// 刷盤逾時
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// 異步刷盤,根據是否開啟TransientStorePoolEnable不同處理,
// 1、false:預設,就直接喚醒刷盤線程後就傳回了
// 2、true:會單獨申請一個與目标實體檔案同樣大小的堆外記憶體,該堆外記憶體将使用記憶體鎖定,
// 確定不會被置換到虛拟記憶體中去,消息首先追加到堆外記憶體,然後送出到與實體檔案
// 的記憶體映射記憶體中,再刷盤
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
由于刷盤線程啟動後就一直在執行,那不可能每次循環都會有消息要刷盤,是以不能一直空循環,每次循環完都要阻塞或沉睡一段時間,等到時間或有刷盤請求進來了就喚醒來處理刷盤。
是以當建立一個刷盤請求送出給刷盤線程時,如果線程剛好在沉睡狀态,就會喚醒它
/**
* 同步刷盤服務
*/
class GroupCommitService extends FlushCommitLogService {
/**
* 同步刷盤任務暫存容器
*/
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
/**
* 用作避免任務送出和任務執行的鎖沖突
*/
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
/**
* 送出刷盤請求
*/
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
// 任務送出時如果在沉睡就将其立即喚醒來執行刷盤
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
/**
* 每次刷盤完後,交換刷盤任務的寫隊列和讀隊列,然後寫隊列繼續寫,讀隊列繼續讀,不用産生鎖競争,
* 等讀隊列中的請求執行完後,接着交換執行
*
* 這裡不會造成對ArrayList的并發讀寫嗎?
* 不會,因為交換期間還在add,交換後write和read都是指向
* 同一個對象,是以這時讀要擷取的是寫操作獲得的鎖,競争的是同一把鎖,讀會阻塞,是以這個期間不會造成并發問題
*/
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
// 如果已刷盤點大于要送出的刷盤點,說明已刷盤成功
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
// 刷盤,最終調用mappedByteBuffer.force()強制刷盤
CommitLog.this.mappedFileQueue.flush(0);
}
}
// 刷盤完成,喚醒等待該任務結果的線程
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
// 記錄刷盤時間點
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
/**
* 該線程随着broker啟動而啟動
*/
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 間隔10ms執行一次,執行下次刷盤之前先交換下讀寫清單
// 正式阻塞之前會交換讀寫隊列
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
@Override
protected void onWaitEnd() {
this.swapRequests();
}
@Override
public String getServiceName() {
return GroupCommitService.class.getSimpleName();
}
@Override
public long getJointime() {
return 1000 * 60 * 5;
}
}
這裡有一個設計點,每次刷盤完後,交換刷盤任務的寫隊列和讀隊列,然後寫隊列繼續寫,讀隊列繼續讀,不用産生鎖競争, 等讀隊列中的請求執行完後,接着交換執行。
在阻塞等待之前會進行交換
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
// 最終調用GroupCommitService類的swapRequests方法,交換讀寫隊列
this.onWaitEnd();
return;
}
// 重置計數器
waitPoint.reset();
try {
// 阻塞,等待被喚醒或逾時喚醒
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
ConsumerQueue和IndexFile存儲流程
在commitLog刷盤完成,消息已經寫入磁盤後,背景會有一個線程,會間隔1ms循環不斷将commitLog中新增的消息轉發到ConsumerQueue和IndexFile檔案中。
class ReputMessageService extends ServiceThread {
private volatile long reputFromOffset = 0;
private void doReput() {
// 如果轉發偏移量比commitLog的最小的偏移量還小,則重新設定為commitLog的最小的偏移量
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 傳回commitLog從reputFromOffset偏移量開始的消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
// 周遊每條消息
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 從ByteBuffer中取出一條消息,封裝成DispatchRequest對象
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 消息轉發給ConsumerQueue和IndexFile檔案
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
}
從commitLog取出消息分别轉發到ConsumerQueue和IndexFile
DefaultMessageStore
public void doDispatch(DispatchRequest req) {
// 分别轉發到ConsumerQueue和IndexFile
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
dispatcherList在DefaultMessageStore元件建立的時候就初始化添加了
/**
* 消息轉發清單,這裡可以看出需要轉發到ConsumerQueue和IndexFile這兩個檔案
*/
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
建構消息消費隊列檔案
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
// 根據topic和queueId找到一個ConsumerQueue檔案
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
// 将消息存入ConsumerQueue
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
這裡存儲也隻是存到共享記憶體,由背景定時任務定時刷盤。
建構消息索引檔案
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
// 送出給index服務執行
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
将請求送出給index服務執行
IndexService
public void buildIndex(DispatchRequest req) {
// 擷取或建立IndexFile檔案
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
// 如果該消息的偏移量小于indexFile檔案中的偏移量,說明是重複消息,忽略該條消息
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
這部分就到這吧。
參考資料
《儒猿技術窩——從 0 開始帶你成為消息中間件實戰高手》