broker有超級多的配置屬性。
配置屬性請參考https://blog.csdn.net/wulitaot/article/details/79566053
先看broker啟動的initialize()方法。
broker的啟動類org.apache.rocketmq.broker.BrokerStartup#start
org.apache.rocketmq.broker.BrokerStartup#createBrokerController
核心方法org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {
//加載
boolean result = this.topicConfigManager.load();//加載topicconfig檔案,topics.json,初始化topicConfigTable
result = result && this.consumerOffsetManager.load();//加載消費進度檔案consumerOffset.json,初始化offsetTable
result = result && this.subscriptionGroupManager.load();//加載訂閱檔案subscriptionGroup.json 初始化subscriptionGroupTable
result = result && this.consumerFilterManager.load();//忽略
if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);//建立消息存儲對象,
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));//這裡為啥也要加個布隆過濾器
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
result = result && this.messageStore.load();//加載
/**
* 下面是load方法
public boolean load() {
boolean result = true;
try {
boolean lastExitOK = !this.isTempFileExist();//存在abort檔案則說明,異常退出
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
if (null != scheduleMessageService) {//加載延時配置檔案
result = result && this.scheduleMessageService.load();
}
// load Commit Log 初始化mappedFileQueue 中的,mappedfile
result = result && this.commitLog.load();
// load Consume Queue 初始化consumeQueueTable的consumeQueue
result = result && this.loadConsumeQueue();
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);//加載索引檔案,如果上次異常關閉,則判斷f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp(),當索引檔案的更新時間大于存盤點的時間,表示資料不可用
//如果上次是正常退出,周遊最後三個commitLog,把flushedWhere,committedWhere指派上最大實體位移就行,while循環不斷更mappedFileOffset
//非正常退出,從最後一個檔案開始,直到找到存儲正常的檔案,剩下的和上面的差不多
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
*/
下面就是初始化各種線程池
return result;
}
broker端消息存儲分為三部分
- commitlog,這個是存儲用戶端發來消息的原始資料,所有topic的資料都放這裡
- consumeQueue,消費隊列,根據commitlog進行分隊列存儲
- indexFile,消息索引檔案,友善通過key來定位到消息(offset)
MappedFileQueue是映射檔案隊列(可以了解為一個檔案夾),MappedFile映射檔案(檔案夾下的一個檔案),
消息存儲主要處理類屬性
public class DefaultMessageStore implements MessageStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
//消息存儲配置屬性
private final MessageStoreConfig messageStoreConfig;
// CommitLog
private final CommitLog commitLog;
//topic和隊列的集合
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
//消息隊列刷盤
private final FlushConsumeQueueService flushConsumeQueueService;
private final CleanCommitLogService cleanCommitLogService;
private final CleanConsumeQueueService cleanConsumeQueueService;
//索引刷盤
private final IndexService indexService;
//mappedfile配置設定服務
private final AllocateMappedFileService allocateMappedFileService;
//commitlog分發服務,分發consumqueue
private final ReputMessageService reputMessageService;
//備份線程
private final HAService haService;
//延時消息處理服務
private final ScheduleMessageService scheduleMessageService;
private final StoreStatsService storeStatsService;
//記憶體緩存池
private final TransientStorePool transientStorePool;
private final RunningFlags runningFlags = new RunningFlags();
private final SystemClock systemClock = new SystemClock();
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
private final BrokerStatsManager brokerStatsManager;
//消息到達監聽
private final MessageArrivingListener messageArrivingListener;
//broker配置
private final BrokerConfig brokerConfig;
private volatile boolean shutdown = true;
//存盤
private StoreCheckpoint storeCheckpoint;
private AtomicLong printTimes = new AtomicLong(0);
//分發服務
private final LinkedList<CommitLogDispatcher> dispatcherList;
private RandomAccessFile lockFile;
private FileLock lock;
boolean shutDownNormal = false;
消息存放入口
org.apache.rocketmq.store.DefaultMessageStore#putMessages
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
if (this.shutdown) {
log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {//salve不能寫消息
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {//topic不能超過127個字元
log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {//body不能超過4m
log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (this.isOSPageCacheBusy()) {//是否緩存頁繁忙
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);//執行消息存放的核心類
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
消息存放核心類
org.apache.rocketmq.store.CommitLog#putMessages
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
//發送
final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (messageExtBatch.getDelayTimeLevel() > 0) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//取最近一個記憶體映射檔案
//fine-grained lock instead of the coarse-grained
MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));//資料的封裝
putMessageLock.lock();//擷取鎖,這個也是并發的,
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
messageExtBatch.setStoreTimestamp(beginLockTimestamp);//設定存儲時間
if (null == mappedFile || mappedFile.isFull()) {//如果mappedFile為空或者滿了 重新建立一個mappedfile
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//消息追加操作
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
然後就走到了org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get();//目前檔案的寫位置
if (currentPos < this.fileSize) {//說明還可以寫入
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();//建立共享記憶體區
byteBuffer.position(currentPos);//目前需要寫入的位置
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);//如果放不下(fileSize - currentPos)+8,則會産生空行
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());//寫為值增加
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
消息追加
org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();//mappedfile的起位置+目前的檔案需要的偏移量
this.resetByteBuffer(hostHolder, 8);//限制8個位元組
//4位元組ip+4位元組port+8位元組長度
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);//隊列的偏移量
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
/**
* Serialize message
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {//沒有足夠的空間來存放消息,會重新建立檔案,檔案中會存放一個最大空行數量和一個空行魔數
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);//更新隊列的偏移量
break;
default:
break;
}
return result;
}
這個時候消息已經存儲在記憶體中了,然後需要根據刷盤政策進行刷盤。
org.apache.rocketmq.store.CommitLog#handleDiskFlush
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush 同步刷盤
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
//建構送出請求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
//将刷盤請求送出給commit線程線程
service.putRequest(request);
//等待刷盤成功,雖然前面commit線程是異步的,但是這邊會等待刷盤超過 時間還是同步請求
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
處理commit刷盤請求
org.apache.rocketmq.store.CommitLog.GroupCommitService#run
然後就是org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit
private void doCommit() {
synchronized (this.requestsRead) {//串行
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();//上次的刷盤指針不大于要送出的資料指針
if (!flushOK) {//這裡是true
CommitLog.this.mappedFileQueue.flush(0);//刷盤
}
}
req.wakeupCustomer(flushOK);//完成喚醒剛送出刷盤請求阻塞的線程,
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
這裡資料已經儲存到磁盤了。
接着就該該記錄資料到 消費隊列和索引檔案了
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
調用org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
private void doReput() {
/**
* reputFromOffset從哪個便宜量開始同步資料,
* private boolean isCommitLogAvailable() {
* return this.reputFromOffset (同步資料偏移量)< DefaultMessageStore.this.commitLog.getMaxOffset();(寫入檔案偏移量)
* }
*/
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
//如果允許重複消費消息的話,确認的偏移量比同步偏移量大退出
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
//根據偏移量選取對應的 mappedbuffer
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();//上一次已經commit的或者 writebuffer!=null 的情況下writepostion位置
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
//逐條讀取所有的資料
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest);//執行org.apache.rocketmq.store.CommitLogDispatcher.dispatch方法
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {//當不上從節點,并且長輪訓啟用的情況下。
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),//需要喚醒用戶端channel 告知有消息到來,
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
this.reputFromOffset += size;//增加下次讀取的offset
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {//即時不成功
if (size > 0) {//還是更新偏移量
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
看下consumqueue儲存怎麼處理請求
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch
org.apache.rocketmq.store.DefaultMessageStore#putMessagePositionInfo
核心方法org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper
每個consumequeue檔案是20*30w個位元組
queue一條消息就三個值固定20位元組一條消息
this.byteBufferIndex.putLong(offset);//實體偏移量
this.byteBufferIndex.putInt(size);//消息大小
this.byteBufferIndex.putLong(tagsCode);//消息tag hashcode,友善定長處理,沒存tag
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();//判斷隊列是否可寫入
for (int i = 0; i < maxRetries && canWrite; i++) {//沒有超過
long tagsCode = request.getTagsCode();//
if (isExtWriteEnable()) {//false
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());//将消息存儲到檔案隊列中
if (result) {//存儲超過,需要更新存盤點時間,存盤點回定時寫到檔案
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
索引檔案
單個索引檔案是由
org.apache.rocketmq.store.index.IndexService#buildIndex
檔案構成是
8+8+8+8+4+4(檔案描述),(4*500w index條目索引),2000w(4+8+4+4 index條目) 機關都是位元組
8+8+8+8+4+4(檔案描述)
8個位元組的消息最小存儲時間
8個位元組的消息最大存儲時間
8個位元組最小實體偏移量
8個位元組最大實體偏移量
4個位元組的哈希槽個數
4個位元組的索引個數
2000w(4+8+4+4 index條目)
4個位元組的hash,8個位元組的實體偏移量,4個位元組的存儲時間與第一條消息的時間差,前一條記錄的index索引(會有hash沖突)
首先根據key進行hash取值,然後對hash槽取模配置設定吧,具體資訊寫入index條目中,如果出現hash沖突的情況,
則用覆寫直接的資料,并且目前資料填上之前的index。構造連結清單
public void buildIndex(DispatchRequest req) {
IndexFile indexFile = retryGetAndCreateIndexFile();//擷取indexfile檔案
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();//存儲消息最大實體偏移量
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
if (msg.getCommitLogOffset() < endPhyOffset) {//新加入的消息偏移量 比index存儲的要大
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
if (req.getUniqKey() != null) {//判斷是不是唯一key
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
if (keys != null && keys.length() > 0) {//有多個key的話
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));//每個key對應一個index條目
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}