天天看点

RocketMQ原理解析(三)存储文件组织与内存映射

作者:Civen

RocketMQ通过使用内存映射文件来提高I/O访问性能,无论是 CommitLog、Consume-Queue还是Index,单个文件都被设计为固定长 度,一个文件写满以后再创建新文件,文件名就为该文件第一条消息 对应的全局物理偏移量。

RocketMQ使用MappedFile、MappedFileQueue来封装存储文件。

1.1 MappedFileQueue映射文件队列

MappedFileQueue是MappedFile的管理容器,MappedFileQueue对 存储目录进行封装,例如CommitLog文件的存储路径为 ${ROCKET_HOME}/store/commitlog/,该目录下会存在多个内存映射文 件MappedFile。MappedFileQueue类图如图4-11所示。

RocketMQ原理解析(三)存储文件组织与内存映射

图4-11 MappedFileQueue类图

下面介绍MappedFileQueue的核心属性。

1)String storePath:存储目录。

2)int mappedFileSize:单个文件的存储大小。 3)CopyOnWriteArrayList mappedFiles:MappedFile集合。

4)AllocateMappedFileService allocateMappedFileService: 创建MappedFile服务类。

5)long flushedWhere = 0:当前刷盘指针,表示该指针之前的 所有数据全部持久化到磁盘。

6)long committedWhere = 0:当前数据提交指针,内存中 ByteBuffer当前的写指针,该值大于、等于flushedWhere。

接下来重点分析根据不同维度查找MappedFile的方法,如代码清 单4-11所示。

代码清单4-11 MappedFileQueue#getMappedFileByTime

public MappedFile getMappedFileByTime(final long timestamp) {
  Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return null;
for (int i = 0; i < mfs .length; i++) {
  MappedFile mappedFile = (MappedFile) mfs[i];
if (mappedFile .getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
return (MappedFile) mfs[mfs.length - 1];}           

根据消息存储时间戳查找MappdFile。从MappedFile列表中第一个 文件开始查找,找到第一个最后一次更新时间大于待查找时间戳的文 件,如果不存在,则返回最后一个MappedFile,如代码清单4-12所 示。

代码清单4-12 MappedFileQueue#findMappedFileByOffset

public MappedFile findMappedFileByOffset(final long offset, final boolean
returnFirstOnNotFound) {
// 省略外层 try ...catch
MappedFile mappedFile = this .getFirstMappedFile();
if (mappedFile != null) {
int index = (int) ((offset / this .mappedFileSize) -
(mappedFile .getFileFromOffset() /
this .mappedFileSize));
if (index < 0 || index >= this .mappedFiles .size()) { // 省略警告日志
}
try {
return this .mappedFiles .get(index);
} catch (Exception e) {
if (returnFirstOnNotFound) {
return mappedFile;
}
LOG_ERROR.warn("findMappedFileByOffset failure. ", e); }
}
}           

根据消息偏移量offset查找MappedFile,但是不能直接使用 offset%mappedFileSize。这是因为使用了内存映射,只要是存在于存 储目录下的文件,都需要对应创建内存映射文件,如果不定时将已消 费的消息从存储文件中删除,会造成极大的内存压力与资源浪费,所 以RocketMQ采取定时删除存储文件的策略。也就是说,在存储文件 中,第一个文件不一定是00000000000000000000,因为该文件在某一 时刻会被删除,所以根据offset定位MappedFile的算法为(int) ((offset/this.mappedFileSize)

(mappedFile.getFileFromOffset()/this.MappedFileSize)),如代码 清单4-13所示。

代码清单4-13 MappedFileQueue#getMinOffset

public long getMinOffset() {
if (!this .mappedFiles .isEmpty()) { try {
return this .mappedFiles .get(0) .getFileFromOffset(); } catch (IndexOutOfBoundsException e) {
} catch (Exception e) {
log .error("getMinOffset has exception .", e);           

获取存储文件最小偏移量。从这里也可以看出,并不是直接返回 0,而是返回MappedFile的getFileFormOffset()方法,如代码清单4- 14所示。

代码清单4-14 MappedFileQueue#getMaxOffset

public long getMaxOffset() {
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
return mappedFile.getFileFromOffset() +
mappedFile.getReadPosition();
}
return 0;
}           

获取存储文件的最大偏移量。返回最后一个MappedFile的 fileFromOffset,加上MappedFile当前的写指针,如代码清单4-15所 示。

代码清单4-15 MappedFileQueue#getMaxWrotePosition

public long getMaxWrotePosition() {
MappedFile mappedFile = getLastMappedFile(); if (mappedFile !=
null) {
return mappedFile.getFileFromOffset() +
mappedFile.getWrotePosition();
}
return 0;
}           

返回存储文件当前的写指针。返回最后一个文件的

fileFromOffset,加上当前写指针位置。

关于MappedFileQueue的相关业务方法,我们在具体使用到时再去

剖析。

1.2 MappedFile内存映射文件

MappedFile是RocketMQ内存映射文件的具体实现,如图4-12所 示。

RocketMQ原理解析(三)存储文件组织与内存映射

图4-12 MappedFile类图

下面介绍MappedFile的核心属性。

1)int OS_PAGE_SIZE:操作系统每页大小,默认4KB。

2)AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY:当前JVM实例中 MappedFile的虚拟内存。

3)AtomicInteger TOTAL_MAPPED_FILES:当前JVM实例中 MappedFile对象个数。

4)AtomicInteger wrotePosition:当前文件的写指针,从0开始 (内存映射文件中的写指针)。

5)AtomicInteger committedPosition:当前文件的提交指针, 如果开启transientStore-PoolEnable,则数据会存储在 TransientStorePool中,然后提交到内存映射ByteBuffer中,再写入 磁盘。

6)AtomicInteger flushedPosition:将该指针之前的数据持久 化存储到磁盘中。

7)int fileSize:文件大小。

8)FileChannel fileChannel:文件通道。

9)ByteBuffer writeBuffer:堆外内存ByteBuffer,如果不为 空,数据首先将存储在该Buffer中,然后提交到MappedFile创建的 FileChannel中。transientStorePoolEnable为true时不为空。

10)TransientStorePool transientStorePool:堆外内存池,该 内存池中的内存会提供内存锁机制。transientStorePoolEnable为 true时启用。

11)String fileName:文件名称。

12)long fileFromOffset:该文件的初始偏移量。

13)File file:物理文件。

14)MappedByteBuffer mappedByteBuffer:物理文件对应的内存 映射Buffer。

15)volatile long storeTimestamp = 0:文件最后一次写入内 容的时间。

16)boolean firstCreateInQueue:是否是MappedFileQueue队列 中第一个文件。

1. MappedFile初始化

第一步:根据是否开启transientStorePoolEnable存在两种初始 化情况。transientStorePool-Enable为true表示内容先存储在堆外内 存,然后通过Commit线程将数据提交到FileChannel中,再通过Flush 线程将数据持久化到磁盘中,如代码清单4-16所示。

代码清单4-16 MappedFile#init(final String fileName, final int fileSize)

this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this .fileFromOffset = Long .parseLong(this .file .getName()); ensureDirOK(this .file .getParent());
this .fileChannel = new RandomAccessFile(this .file,
"rw") .getChannel();
this .mappedByteBuffer = this .fileChannel .map(MapMode .READ_WRITE, 0,
fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();           

第二步:初始化fileFromOffset为文件名,也就是文件名代表该 文件的起始偏移量,通过RandomAccessFile创建读写文件通道,并将 文件内容使用NIO的内存映射Buffer将文件映射到内存中,如代码清单 4-17所示。

代码清单4-17 MappedFile#init

public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws
IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer();           

如果transientStorePoolEnable为true,则初始化MappedFile的 writeBuffer,该buffer从transientStorePool中获取。

2. MappedFile提交

内存映射文件的提交动作由MappedFile的commit()方法实现,如 代码清单4-18所示。

代码清单4-18 MappedFile#commit

public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0(commitLeastPages);
this.release();
} else {
log .warn("in commit, hold failed, commit offset = "
+
this.committedPosition.get());
}
}
if (writeBuffer != null && this.transientStorePool != null
&&
this.fileSize == this.committedPosition.get()) {
this .transientStorePool .returnBuffer(writeBuffer);
this .writeBuffer = null;
}
return this.committedPosition.get();
}           

执行提交操作,commitLeastPages为本次提交的最小页数,如果 待提交数据不满足commitLeastPages,则不执行本次提交操作,等待 下次提交。writeBuffer如果为空,直接返回wrotePosition指针,无 须执行commit操作,这表明commit操作的主体是writeBuffer,如代码 清单4-19所示。

代码清单4-19 MappedFile#isAbleToCommit

protected boolean isAbleToCommit(final int commitLeastPages) {
int flush = this.committedPosition.get();
int write = this.wrotePosition.get();
if (this.isFull()) {
return true;
}
if (commitLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) commitLeastPages;
>=
}
return write > flush;
}           

判断是否执行commit操作。如果文件已满,返回true。如果 commitLeastPages大于0,则计算wrotePosition(当前writeBuffe的 写指针)与上一次提交的指针(committedPosition)的差值,将其除 以OS_PAGE_SIZE得到当前脏页的数量,如果大于commitLeastPages, 则返回true。如果commitLeastPages小于0,表示只要存在脏页就提 交,如代码清单4-20所示。

代码清单4-20 MappedFile#commit0

protected void commit0(final int commitLeastPages) { int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get(); if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos); }
catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}           

下面介绍具体的MappedFile提交实现过程。首先创建writeBuffer 的共享缓存区,然后将新创建的position回退到上一次提交的位置 (committedPosition),设置limit为wrotePosition(当前最大有效 数据指针),接着把committedPosition到wrotePosition的数据复制

(写入)到FileChannel中,最后更新committedPosition指针为 wrotePosition。commit的作用是将MappedFile# writeBuffer中的数 据提交到文件通道FileChannel中。

ByteBuffer使用技巧:调用slice()方法创建一个共享缓存区,与 原先的ByteBuffer共享内存并维护一套独立的指针(position、 mark、limit)。

3. MappedFile刷盘

刷盘指的是将内存中的数据写入磁盘,永久存储在磁盘中,由 MappedFile的flush()方法实现,如代码清单4-21所示。

代码清单4-21 MappedFile#flush

public int flush(final int flushLeastPages) { if
(this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
if (writeBuffer != null ||
this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to
disk.", e);
}
this.flushedPosition.set(value);
this.release();
} else {
this.flushedPosition.set(getReadPosition()); }
}
return this.getFlushedPosition();
}           

直接调用mappedByteBuffer或fileChannel的force()方法将数据 写入磁盘,将内存中的数据持久化到磁盘中,那么flushedPosition应 该等于MappedByteBuffer中的写指针。如果writeBuffer不为空,则 flushedPosition应等于上一次commit指针。因为上一次提交的数据就 是进入MappedByteBuffer中的数据。如果writeBuffer为空,表示数据

是直接进入MappedByteBuffer的,wrotePosition代表的是 MappedByteBuffer中的指针,故设置flushedPosition为 wrotePosition。

4. 获取MappedFile最大读指针

RocketMQ文件的一个组织方式是内存映射,预先申请一块连续且 固定大小的内存,需要一套指针标识当前最大有效数据的位置,获取 最大有效数据偏移量的方法由MappedFile的getReadPosition()方法实 现,如代码清单4-22所示。

代码清单4-22 MappedFile#getReadPosition

public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}           

获取当前文件最大的可读指针,如代码清单4-23所示。如果 writeBuffer为空,则直接返回当前的写指针。如果writeBuffer不为 空,则返回上一次提交的指针。在MappedFile设计中,只有提交了的 数据(写入MappedByteBuffer或FileChannel中的数据)才是安全的数 据。

代码清单4-23 MappedFile#selectMappedBuffer

public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) { if
(this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice(); byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset
+ pos,
byteBufferNew, size, this);
}
}
return null;
}           

首先查找pos到当前最大可读指针之间的数据,因为在整个写入期 间都未曾改变MappedByteBuffer的指针,所以

mappedByteBuffer.slice()方法返回的共享缓存区空间为整个 MappedFile。然后通过设置byteBuffer的position为待查找的值,读 取字节为当前可读字节长度,最终返回的ByteBuffer的limit(可读最 大长度)为size。整个共享缓存区的容量为

MappedFile#fileSizepos,故在操作SelectMappedBufferResult时不 能对包含在里面的ByteBuffer调用flip()方法。

注意

操作ByteBuffer时如果使用了slice()方法,对其ByteBuffer进行读取 时一般手动指定position和limit指针,而不是调用flip()方法切换读 写状态。

5. MappedFile销毁

MappedFile文件销毁的实现方法为public boolean destroy(final long intervalForcibly),intervalForcibly表示拒 绝被销毁的最大存活时间,如代码清单4-24所示。

代码清单4-24 MappedFile#shutdown

public void shutdown(final long intervalForcibly) { if
(this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis(); this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() -
this.firstShutdownTimestamp) >=
intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount()); this.release();
}
}
}           

第一步:关闭MappedFile。初次调用时this.available为true, 设置available为false,并设置初次关闭的时间戳 (firstShutdownTimestamp)为当前时间戳。调用release()方法尝试

释放资源,release只有在引用次数小于1的情况下才会释放资源。如 果引用次数大于0,对比当前时间与firstShutdownTimestamp,如果已 经超过了其最大拒绝存活期,则每执行一次引用操作,引用数减少 1000,直到引用数小于0时通过执行realse()方法释放资源,如代码清 单4-25所示。

代码清单4-25 MappedFile#isCleanupOver

public boolean isCleanupOver() {
return this.refCount.get() <= 0 && this.cleanupOver; }           

第二步:判断是否清理完成,判断标准是引用次数小于、等于0并 且cleanupOver为true,cleanupOver为true的触发条件是release成功 将MappedByteBuffer资源释放了,如代码清单4-26所示。稍后详细分 析release()方法。

代码清单4-26 MappedFile#destroy

this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();           

第三步:关闭文件通道,删除物理文件。

在整个MappedFile销毁的过程中,首先需要释放资源,释放资源 的前提条件是该MappedFile的引用小于、等于0。接下来重点看一下 release()方法的实现原理,如代码清单4-27所示。

代码清单4-27 ReferenceResource#release

public void release() {
long value = this.refCount.decrementAndGet();
if (value > 0)
return;
synchronized (this) {
this.cleanupOver = this.cleanup(value);
}
}           

将引用次数减1,如果引用数小于、等于0,则执行cleanup()方 法,下面重点分析cleanup()方法的实现,如代码清单4-28所示。

代码清单4-28 MappedFile#cleanup

public boolean cleanup(final long currentRef) { 
  if(this .isAvailable()) {
return false;
}
if (this.isCleanupOver()) {
return true;
}
clean(this .mappedByteBuffer);
TOTAL_MAPPED_VIRTUAL_MEMORY .addAndGet(this .fileSize * (-1)); TOTAL_MAPPED_FILES .decrementAndGet();
log .info("unmap file[REF:" + currentRef + "] " + this .fileName
+ " OK");
return true;
}           

如果available为true,表示MappedFile当前可用,无须清理,返 回false,如果资源已经被清除,返回true。如果是堆外内存,调用堆 外内存的cleanup()方法进行清除,维护MappedFile类变量 TOTAL_MAPPED_VIRTUAL_MEMORY、TOTAL_MAPPED_FILES并返回true,表 示cleanupOver为true。

1.3 TransientStorePool

TransientStorePool即短暂的存储池。RocketMQ单独创建了一个 DirectByteBuffer内存缓存池,用来临时存储数据,数据先写入该内 存映射中,然后由Commit线程定时将数据从该内存复制到与目标物理 文件对应的内存映射中。RokcetMQ引入该机制是为了提供一种内存锁 定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁 盘中。

TransientStorePool类图如图4-13所示。

图4-13 TransientStorePool类图

下面介绍TransientStorePool的核心属性,如代码清单4-29所 示。

1)int poolSize:avaliableBuffers个数,可在broker配置文件 中通过transient StorePoolSize进行设置,默认为5。

2)int fileSize:每个ByteBuffer的大小,默认为 mapedFileSizeCommitLog,表明TransientStorePool为CommitLog文件 服务。

3)Deque availableBuffers:ByteBuffer容器,双端队列。 代码清单4-29 TransientStorePool#init

public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address(); Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); availableBuffers.offer(byteBuffer);
}
}           

创建数量为poolSize的堆外内存,利用com.sun.jna.Library类库 锁定该批内存,避免被置换到交换区,以便提高存储性能。

RocketMQ原理解析(三)存储文件组织与内存映射

2 RocketMQ存储文件

RocketMQ存储路径为${ROCKET_HOME}/store,主要存储文件如图 4-14所示。下面介绍RocketMQ主要的存储文件夹。

RocketMQ原理解析(三)存储文件组织与内存映射

图4-14 RocketMQ存储 目 录 1)commitlog:消息存储目录。 2)config:运行期间的一些配置信息,主要包括下列信息。

RocketMQ原理解析(三)存储文件组织与内存映射

consumerFilter.json:主题消息过滤信息。

consumerOffset.json:集群消费模式下的消息消费进度。

delayOffset.json:延时消息队列拉取进度。

subscriptionGroup.json:消息消费组的配置信息。

topics.json:topic配置属性。

3)consumequeue:消息消费队列存储目录。

4)index:消息索引文件存储目录。

5)abort:如果存在abort文件,说明Broker非正常关闭,该文件 默认在启动Broker时创建,在正常退出之前删除。

6)checkpoint:检测点文件,存储CommitLog文件最后一次刷盘 时间戳、ConsumeQueue最后一次刷盘时间、index文件最后一次刷盘时 间戳。

2.1 CommitLog文件

CommitLog目录的结构在1节已经详细介绍过了,该目录下的文 件主要用于存储消息,其特点是每一条消息长度不相同。CommitLog文 件存储格式如图4-15所示,每条消息的前面4个字节存储该条消息的总 长度。

RocketMQ原理解析(三)存储文件组织与内存映射

图4-15 CommitLog文件存储格式

CommitLog文件的存储目录默认为

${ROCKET_HOME}/store/commitlog,可以通过在broker配置文件中设 置storePathRootDir属性改变默认路径,如代码清单4-30所示。 CommitLog文件默认大小为1GB,可通过在broker配置文件中设置 mapedFileSizeCommitLog属性改变默认大小。本节将基于上述存储结 构,重点分析消息的查找实现。

代码清单4-30 Commitlog#getMinOffset

public long getMinOffset() {
MappedFile mappedFile =
this.mappedFileQueue.getFirstMappedFile();
if (mappedFile != null) {
if (mappedFile.isAvailable()) {
return mappedFile.getFileFromOffset();
} else {
return
this.rollNextFile(mappedFile.getFileFromOffset());
}
}
return -1;
}           

获取当前CommitLog目录的最小偏移量,首先获取目录下的第一个 文件,如果该文件可用,则返回该文件的起始偏移量,否则返回下一 个文件的起始偏移量,如代码清单4-31所示。

代码清单4-31 CommitLog#rollNextFile

public long rollNextFile(final long offset) {
int mappedFileSize =
this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
return offset + mappedFileSize - offset % mappedFileSize; }           

根据offset返回下一个文件的起始偏移量。获取一个文件的大 小,减去offset % mapped-FileSize,回到下一文件的起始偏移量, 如代码清单4-32所示。

代码清单4-32 CommitLog#getMessage

public SelectMappedBufferResult getMessage(final long offset, final int size) { int
mappedFileSize =
this.defaultMessageStore.getMessageStoreConfig().
getMapedFileSizeCommitLog();
MappedFile mappedFile =
this.mappedFileQueue.findMappedFileByOffset(offset,
offset == 0);
if (mappedFile != null) {           

根据偏移量与消息长度查找消息。首先根据偏移找到文件所在的 物理偏移量,然后用offset与文件长度取余,得到在文件内的偏移 量,从该偏移量读取size长度的内容并返回。如果只根据消息偏移量 查找消息,则首先找到文件内的偏移量,然后尝试读取4字节,获取消 息的实际长度,最后读取指定字节。

2.2 ConsumeQueue文件

RocketMQ基于主题订阅模式实现消息消费,消费者关心的是一个 主题下的所有消息,但同一主题的消息是不连续地存储在CommitLog文 件中的。如果消息消费者直接从消息存储文件中遍历查找订阅主题下 的消息,效率将极其低下。RocketMQ为了适应消息消费的检索需求, 设计了ConsumeQueue文件,该文件可以看作CommitLog关于消息消费的 “索引”文件,ConsumeQueue的第一级目录为消息主题,第二级目录 为主题的消息队列,如图4-16所示。

RocketMQ原理解析(三)存储文件组织与内存映射

图4-16 ConsumeQueue文件结构

为了加速ConsumeQueue消息条目的检索速度并节省磁盘空间,每 一个ConsumeQueue条目不会存储消息的全量信息,存储格式如图4-17 所示。

RocketMQ原理解析(三)存储文件组织与内存映射

图4-17 ConsumeQueue文件存储格式

单个ConsumeQueue文件中默认包含30万个条目,单个文件的长度 为3×106 ×20字节,单个ConsumeQueue文件可以看作一个ConsumeQueue 条目的数组,其下标为ConsumeQueue的逻辑偏移量,消息消费进度存 储的偏移量即逻辑偏移量。ConsumeQueue即为CommitLog文件的索引文 件,其构建机制是当消息到达CommitLog文件后,由专门的线程产生消 息转发任务,从而构建ConsumeQueue文件与下文提到的Index文件,如 代码清单4-33所示。本节只分析如何根据消息逻辑偏移量、时间戳查 找消息,4.6节将重点讨论消息消费队列的构建、恢复等内容。

代码清单4-33 ConsumeQueue#getIndexBuffer

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
int mappedFileSize = this.mappedFileSize;
long offset = startIndex *
CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
MappedFile mappedFile =
this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
SelectMappedBufferResult result =
mappedFile.selectMappedBuffer((int)
(offset % mappedFileSize));
return result;
}
}
return null;
}           

根据startIndex获取消息消费队列条目。通过startIndex×20得 到在ConsumeQueue文件的物理偏移量,如果该偏移量小于 minLogicOffset,则返回null,说明该消息已被删除,如果大于 minLogicOffset,则根据偏移量定位到具体的物理文件。通过将该偏 移量与物理文件的大小取模获取在该文件的偏移量,从偏移量开始连 续读取20个字节即可。

ConsumeQueue文件提供了根据消息存储时间来查找具体实现的算 法getOffsetInQueue- ByTime(final long timestamp),其具体实现 如下。

第一步:根据时间戳定位到物理文件,就是从第一个文件开始, 找到第一个文件更新时间大于该时间戳的文件,如代码清单4-34所

示。

代码清单4-34 ConsumeQueue#getOffsetInQueueByTime

int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset
- mappedFile.getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
long leftIndexValue = -1L, rightIndexValue = -1L;
long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset(); SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
if (null != sbr) {
ByteBuffer byteBuffer = sbr.getByteBuffer(); high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;           

第二步:采用二分查找来加速检索。首先计算最低查找偏移量, 取消息队列最小偏移量与该文件注销偏移量的差为最小偏移量low。获 取当前存储文件中有效的最小消息物理偏移量minPhysicOffset,如果 查找到的消息偏移量小于该物理偏移量,则结束该查找过程,如代码 清单4-35所示。

代码清单4-35 ConsumeQueue#getOffsetInQueueByTime

while (high >= low) {
midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
byteBuffer.position(midOffset);
long phyOffset = byteBuffer.getLong();
int size = byteBuffer.getInt();
if (phyOffset < minPhysicOffset) {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
continue;
}
long storeTime =
this.defaultMessageStore.getCommitLog().
pickupStoreTimestamp(phyOffset, size);
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
targetOffset = midOffset; break;
} else if (storeTime > timestamp) {
high = midOffset - CQ_STORE_UNIT_SIZE;
rightOffset = midOffset;           
rightIndexValue = storeTime;
} else {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
leftIndexValue = storeTime;
}
}           

二分查找的常规退出循环为low>high,首先查找中间的偏移量 midOffset,将ConsumeQueue文件对应的ByteBuffer定位到 midOffset,然后读取4个字节,获取该消息的物理偏移量,如代码清 单4-36所示。

1)如果得到的物理偏移量小于当前的最小物理偏移量,说明待查 找消息的物理偏移量肯定大于midOffset,则将low设置为midOffset, 继续折半查找。

2)如果得到的物理偏移量大于最小物理偏移量,说明该消息是有 效消息,则根据消息偏移量和消息长度获取消息的存储时间戳。

3)如果存储时间小于0,则为无效消息,直接返回0。

4)如果存储时间戳等于待查找时间戳,说明查找到了匹配消息, 则设置targetOffset并跳出循环。

5)如果存储时间戳大于待查找时间戳,说明待查找消息的物理偏 移量小于midOffset,则设置high为midOffset,并设置 rightIndexValue等于midOffset。

6)如果存储时间戳小于待查找时间戳,说明待查找消息的物理偏 移量大于midOffset,则设置low为midOffset,并设置leftIndexValue 等于midOffset。

代码清单4-36 ConsumeQueue#getOffsetInQueueByTime

if (targetOffset != -1) {
offset = targetOffset;
} else {
if (leftIndexValue == -1) {
offset = rightOffset;
} else if (rightIndexValue == -1) {
offset = leftOffset;
  } else {
offset = Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
- rightIndexValue) ? rightOffset : leftOffset;
}
}
return (mappedFile.getFileFromOffset() + offset) /
CQ_STORE_UNIT_SIZE;           

第三步:如果targetOffset不等于-1,表示找到了存储时间戳等 于待查找时间戳的消息。如果leftIndexValue等于-1,表示返回当前 时间戳大于待查找消息的时间戳,并且最接近待查找消息的偏移量。 如果rightIndexValue等于-1,表示返回的时间戳比待查找消息的时间 戳小,并且最接近待查找消息的偏移量,如代码清单4-37所示。

代码清单4-37 ConsumeQueue#rollNextFile

public long rollNextFile(final long index) { int mappedFileSize = this.mappedFileSize;
                              int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE; 
                                            return index + totalUnitsInFile - index % totalUnitsInFile;
}           

根据当前偏移量获取下一个文件的起始偏移量。首先获取文件包 含多少个消息消费队列条目,减去index%totalUnitsInFile的目的是 选中下一个文件的起始偏移量。

2.3 Index文件

ConsumeQueue是RocketMQ专门为消息订阅构建的索引文件,目的 是提高根据主题与消息队列检索消息的速度。另外,RocketMQ引入哈 希索引机制为消息建立索引,HashMap的设计包含两个基本点:哈希槽 与哈希冲突的链表结构。RocketMQ索引文件Index存储格式如图4-18所 示。

RocketMQ原理解析(三)存储文件组织与内存映射

图4-18 Index文件存储格式

从图4-20可以看出,Index包含Index文件头、哈希槽、Index条目 (数据)。Index文件头包含40字节,记录该Index的统计信息,其结 构如下。

1)beginTimestamp:Index文件中消息的最小存储时间。 2)endTimestamp:Index文件中消息的最大存储时间。

3)beginPhyoffset:Index文件中消息的最小物理偏移量 (CommitLog文件偏移量)。

4)endPhyoffset:Index文件中消息的最大物理偏移量 (CommitLog文件偏移量)。

5)hashslotCount:hashslot个数,并不是哈希槽使用的个数, 在这里意义不大。

6)indexCount:Index条目列表当前已使用的个数,Index条目在 Index条目列表中按顺序存储。

一个Index默认包含500万个哈希槽。哈希槽存储的是落在该哈希 槽的哈希码最新的Index索引。默认一个Index文件包含2000万个条 目,每个Index条目结构如下。

1)hashcode:key的哈希码。

2)phyoffset:消息对应的物理偏移量。

3)timedif:该消息存储时间与第一条消息的时间戳的差值,若 小于0,则该消息无效。

4)pre index no:该条目的前一条记录的Index索引,当出现哈 希冲突时,构建链表结构。

接下来重点分析如何将Map<String/*消息索引key*/,long phyOffset/*消息物理偏移量*/>存入Index文件,以及如何根据消息索 引key快速查找消息。

RocketMQ将消息索引键与消息偏移量的映射关系写入Index的实现 方法为public boolean putKey(final String key, final long phyOffset, final long storeTimestamp),参数含义分别为消息索 引、消息物理偏移量、消息存储时间,如代码清单4-38所示。

代码清单4-38 IndexFile#putKey

if (this.indexHeader.getIndexCount() < this.indexNum) { int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this .hashSlotNum;
int absSlotPos = IndexHeader .INDEX_HEADER_SIZE + slotPos *
hashSlotSize;
}           

第一步:当前已使用条目大于、等于允许最大条目数时,返回 fasle,表示当前Index文件已写满。如果当前index文件未写满,则根 据key算出哈希码。根据keyHash对哈希槽数量取余定位到哈希码对应 的哈希槽下标,哈希码对应的哈希槽的物理地址为IndexHeader(40字 节)加上下标乘以每个哈希槽的大小(4字节),如代码清单4-39所 示。

代码清单4-39 IndexFile#putKey

int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue >
this.indexHeader.getIndexCount())
{
slotValue = invalidIndex;
}           

第二步:读取哈希槽中存储的数据,如果哈希槽存储的数据小于0 或大于当前Index文件中的索引条目,则将slotValue设置为0,如代码 清单4-40所示。

代码清单4-40 IndexFile#putKey

long timeDiff = storeTimestamp -
this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer .MAX_VALUE) {
timeDiff = Integer .MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}           

第三步:计算待存储消息的时间戳与第一条消息时间戳的差值, 并转换成秒,如代码清单4-41所示。

代码清单4-41 IndexFile#putKey

int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum *
hashSlotSize + this.indexHeader.getIndexCount() *
indexSize;
this.mappedByteBuffer .putInt(absIndexPos, keyHash);
this .mappedByteBuffer .putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); this.mappedByteBuffer.putInt(absSlotPos,
this .indexHeader .getIndexCount());           

第四步:将条目信息存储在Index文件中。

1)计算新添加条目的起始物理偏移量:头部字节长度+哈希槽数 量×单个哈希槽大小(4个字节)+当前Index条目个数×单个Index条 目大小(20个字节)。

2)依次将哈希码、消息物理偏移量、消息存储时间戳与Index文 件时间戳、当前哈希槽的值存入MappedByteBuffer。

3)将当前Index文件中包含的条目数量存入哈希槽中,覆盖原先 哈希槽的值。

以上是哈希冲突链式解决方案的关键实现,哈希槽中存储的是该 哈希码对应的最新Index条目的下标,新的Index条目最后4个字节存储 该哈希码上一个条目的Index下标。如果哈希槽中存储的值为0或大于 当前Index文件最大条目数或小于-1,表示该哈希槽当前并没有与之对 应的Index条目。值得注意的是,Index文件条目中存储的不是消息索 引key,而是消息属性key的哈希,在根据key查找时需要根据消息物理 偏移量找到消息,进而验证消息key的值。之所以只存储哈希,而不存 储具体的key,是为了将Index条目设计为定长结构,才能方便地检索 与定位条目,如代码清单4-42所示。

代码清单4-42 IndexFile#putKey

if (this .indexHeader .getIndexCount() <= 1) {
this .indexHeader .setBeginPhyOffset(phyOffset); this .indexHeader .setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);           

第五步:更新文件索引头信息。如果当前文件只包含一个条目, 则更新beginPhyOffset、beginTimestamp、endPyhOffset、 endTimestamp以及当前文件使用索引条目等信息,如代码清单4-43所 示。

RocketMQ根据索引key查找消息的实现方法为 selectPhyOffset(List<Long> phy Offsets, String key, int maxNum,long begin, long end),其参数说明如下。

1)List<Long> phyOffsets:查找到的消息物理偏移量。 2)String key:索引key。

3)int maxNum:本次查找最大消息条数。

4)long begin:开始时间戳。

5)long end:结束时间戳。

代码清单4-43 IndexFile#selectPhyOffset

int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *
hashSlotSize;           

第一步:根据key算出key的哈希码,keyHash对哈希槽数量取余, 定位到哈希码对应的哈希槽下标,哈希槽的物理地址为 IndexHeader(40字节)加上下标乘以每个哈希槽的大小(4字节), 如代码清单4-44所示。

代码清单4-44 IndexFile#selectPhyOffset

int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue >
this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) { // 返回; }           

第二步:如果对应的哈希槽中存储的数据小于1或大于当前索引条 目个数,表示该哈希码没有对应的条目,直接返回,如代码清单4-45 所示。

代码清单4-45 IndexFile#selectPhyOffset

for (int nextIndexToRead = slotValue; ; ){
// 省略部分代码
}           

第三步:因为会存在哈希冲突,所以根据slotValue定位该哈希槽 最新的一个Item条目,将存储的物理偏移量加入phyOffsets,然后继 续验证Item条目中存储的上一个Index下标,如果大于、等于1并且小 于当前文件的最大条目数,则继续查找,否则结束查找,如代码清单 4-46所示。

代码清单4-46 IndexFile#selectPhyOffset

int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum *
hashSlotSize + nextIndexToRead * indexSize; int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos +
4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4
+ 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 +
8 + 4);           

第四步:根据Index下标定位到条目的起始物理偏移量,然后依次 读取哈希码、物理偏移量、时间戳、上一个条目的Index下标,如代码 清单4-47所示。

代码清单4-47 查找消息偏移量

if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
if (keyHash == keyHashRead && timeMatched) {
  phyOffsets.add(phyOffsetRead);

}

if (prevIndexRead <= invalidIndex

|| prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) {

break;

}

nextIndexToRead = prevIndexRead;           

第五步:如果存储的时间戳小于0,则直接结束查找。如果哈希匹 配并且消息存储时间介于待查找时间start、end之间,则将消息物理 偏移量加入phyOffsets,并验证条目的前一个Index索引,如果索引大 于、等于1并且小于Index条目数,则继续查找,否则结束查找。

2.4 checkpoint文件

checkpoint(检查点)文件的作用是记录ComitLog、 ConsumeQueue、Index文件的刷盘时间点,文件固定长度为4KB,其中 只用该文件的前面24字节,其存储格式如图4-19所示。

RocketMQ原理解析(三)存储文件组织与内存映射

图4-19 checkpoint文件存储格式 1)physicMsgTimestamp:CommitLog文件刷盘时间点。 2)logicsMsgTimestamp :ConsumeQueue文件刷盘时间点。 3)indexMsgTimestamp:Index文件刷盘时间点。

继续阅读