上一篇:RocketMQ源码分析之文件内存映射对象层MappedFile
一、前言
既然 MappedFile 是管理单个文件的类, 那么就会存在用来管理 这些 MappedFile的类:MappedFileQueue。
我们可以把他们之间的关系形象的理解成 : 文件(MappedFile) 和 目录(MappedFileQueue)
二、源码分析
- MappedFileQueue成员变量;
- 加载磁盘数据到内存映射区域里来;
- 获取当前正在顺序写的MappedFile对象;
- 根据时间删除文件;
- 获取commitlog最大偏移量;
- 获取到当前最大的一个写位置;
- flush刷盘;
1、MappedFileQueue成员变量
public class MappedFileQueue {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
private static final int DELETE_FILES_BATCH_MAX = 10;
// commitlog存储路径:
// 1. CommitLog文件目录路径为: ../store/commit/log
// 2. ConsumeQueue文件目录路径为: ../store/xxx_topic/x
private final String storePath;
// 映射文件大小
// 1. commitLog文件 默认1g
// 2. consumeQueue文件 默认600w字节)
protected final int mappedFileSize;
// 所有的映射文件
protected final CopyOnWriteArrayList<MappedFile> mappedFiles =
new CopyOnWriteArrayList<MappedFile>();
// 分配映射文件服务
private final AllocateMappedFileService allocateMappedFileService;
// 从哪里开始flush
protected long flushedWhere = 0;
// 从哪里开始commit
private long committedWhere = 0;
// 当前目录下最后一条msg存储时间
private volatile long storeTimestamp = 0;
}
上述属性基本上都很简单, 这里需要强调其中一个 属性 flushedWhere;
MappedFileQueue目录中的 MappedFile文件 是顺序写的, 当文件写满了之后,才回去创建新的MappedFile , 其中MappedFile的文件名为 物理偏移量。
简单举个例子(仅作说明使用 ):假设 每个文件大小为 64bytes 第一个文件名为 00000 , 当该文件写满了 则需要创建第二个文件,那么这第二个文件的文件名为 00064 , 此时写也只能向第二个文件中写,那么当写了 32bytes后 的 flushedWhere = 00064 + 00032 = 00096。
2、加载磁盘数据到内存映射区域里来
- 根据 指定文件目录( 如:../store/commit/log) , 构建 File 对象(注意:是个文件夹)。
- 遍历 该文件夹下 所有的 文件 并排序 得到 File[] files 数组 (注意:这是文件的集合)。
- 遍历 排序后的文件集合,为每个文件创建 MappedFile对象 并赋上初始值,然后存入 MappedFiles集合中;
正常Broker 在启动后, 会先调用 load() 方法 加载出目录下所有的MappedFile, 然后再通过 recover的相关方法来重新赋上准确的值。
// 加载磁盘数据到内存映射区域里来
public boolean load() {
// 先把commitlog存储路径封装成一个file,此时这个file是存储目录
File dir = new File(this.storePath);
// 对存储目录里的子文件列表获取到,拿到一堆的file
File[] ls = dir.listFiles();
if (ls != null) {
// 对所有的files磁盘文件去进行加载操作
return doLoad(Arrays.asList(ls));
}
return true;
}
// 对所有的磁盘文件进行一个加载
public boolean doLoad(List<File> files) {
// ascending order
// 基于文件名称进行一个排序,文件名称都是物理偏移量
files.sort(Comparator.comparing(File::getName));
// 对排序后的文件做一个遍历
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
// 直接把一个磁盘文件封装成一个MappedFile,定位一下他的可以写入的位置、flush位置、commit位置
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
// 对映射完毕的mappedfile设置可以写入位置、可以flush位置、可以commit位置,这些位置都是从mappedfile大小开始的
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
// 加入到 list中
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
return true;
}
3、获取当前正在顺序写的MappedFile对象
当前正在顺序写的MappedFile 必定是 MappedFile集合中的末尾文件。 因此代码中直接调用 getLastMappedFile() 方法获取了末尾的MappedFile, 而此时 会存在 3中情况:
- 该 MappedFile 存在 且 MappedFile 内 还有剩余可写空间。(这也是最好的情况,正常返回就行了)
- 该 MappedFile 存在,但是该MappedFile 已经被写满了。 (需要创建 新的MappedFile)
- 该 MappedFile 不存在 ,也就说明 目录下并没有任何文件。(需要创建 新的MappedFile)
其中 2 ,3 情况 需要 创建新的 MappedFile ,而创建 MappedFile 的方式分为了两种:
- 通过 allocateMappedFileService 使用其它线程来创建。( MappedFile >= 1g 时 有预热操作)
- 普通 new MappedFile() 方式创建。(无预热操作)
/**
* 获取当前正在顺序写的MappedFile对象
* (存储消息 或者 存储ConsumeQueue数据时, 都需要获取当前正在顺序写的MappedFile对象)
* 注意: 如果MappedFile写满了 或者 不存在查找的MappedFile, 则创建新的MappedFile
*
* @param startOffset 文件起始偏移量
* @param needCreate 当list为空时,是否创建 mappedFile
* @return
*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
// 该值 控制是否需要创建MappedFile ,当需要创建MappedFile时,它充当文件名
// 两种情况 会创建:
// 1. list 内没有mappedFIle
// 2. list最后一个mappedFile (当前顺序写的mappedFile)它写满了
long createOffset = -1;
// 获取 list 中的最后一个 MappedFile
MappedFile mappedFileLast = getLastMappedFile();
// 情况1 list 内没有mappedFile
if (mappedFileLast == null) {
// createOffset 取值必须是 mappedFileSize 的倍数 或者 0
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 情况2 list最后一个mappedFile (当前顺序写的mappedFile)它写满了
if (mappedFileLast != null && mappedFileLast.isFull()) {
// 上一个文件名 转Long + mappedFileSize
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
// 这里是创建 新的 mappedFile 逻辑
if (createOffset != -1 && needCreate) {
return tryCreateMappedFile(createOffset);
}
return mappedFileLast;
}
protected MappedFile tryCreateMappedFile(long createOffset) {
// 获取 下次待创建文件的 绝对路径
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
// 获取 下下次待创建文件的 绝对路径
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
// 使用 allocateMappedFileService 来创建 MappedFile
if (this.allocateMappedFileService != null) {
// 当mappedFileSize >= 1g 的话, 这里创建的mappedFile 会执行它的 预热方法
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(
nextFilePath,
nextNextFilePath,
this.mappedFileSize
);
}
// 直接创建 MappedFile (这里没有预热)
else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
// 将创建的 mappedFile 添加到 list中 并返回
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
// 走到这里... 是无需创建 MappedFile时 返回。
return mappedFile;
}
4、根据时间删除文件
/**
* commitLog 根据时间删除文件
* @param expiredTime 过期时间
* @param deleteFilesInterval 删除两个文件之间的时间间隔
* @param intervalForcibly 强制关闭资源的时间间隔 mf.destory传递的参数
* @param cleanImmediately true 强制删除,不考虑过期时间这个条件
* @return
*/
public int deleteExpiredFileByTime(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
// 拷贝mappedfiles数组
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
// 这里 减-1 是保证 当前正在顺序写的MappedFile不被删除
int mfsLength = mfs.length - 1;
// 记录删除的文件数
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
// 对每一个mappedfile做一个遍历
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
// 对每个mappedfile获取到他的最近一次修改时间 + 一个mappedfile过期时间
// 计算出来这个mappedfile在最近一次修改了之后,最大的可以存活的时间
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 如果说当前时间已经超过了最大可以存活的时间了,此时就说明这个文件已经过期了,可以清理了
// 或者是不管他到没到最大存活时间,都标识了一个立即清理,也可以清理了
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
// 对当前mappedfile调用一下销毁的函数,销毁和释放成功了,就可以去走下面的收尾逻辑
if (mappedFile.destroy(intervalForcibly)) {
// 把销毁成功的mappedfile加入到files列表里去
files.add(mappedFile);
// 成功删除和销毁的mappedfile数量做一个累加
deleteCount++;
// 如果销毁掉的文件数量大于等于最大批量删除文件的大小,此时就终止文件清理
// 这个也是一样的,避免一次性删除文件过多,也会导致磁盘io过高,机器负载增加的
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
// 如果说删除文件间隔是大于0,而且此时还没删除完毕
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
// 休眠一会儿,停顿指定的删除文件时间间隔,避免频繁的删除文件,可能会导致磁盘io过高
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
// 对我们已经销毁掉的文件进行过期文件删除
deleteExpiredFile(files);
return deleteCount;
}
private Object[] copyMappedFiles(final int reservedMappedFiles) {
Object[] mfs;
if (this.mappedFiles.size() <= reservedMappedFiles) {
return null;
}
// 就是把mappedfiles这个list转换为了一个数组
mfs = this.mappedFiles.toArray();
return mfs;
}
调用MappedFile#destroy方法
// 销毁一个mappedfile
public boolean destroy(final long intervalForcibly) {
// 对当前的这个mappedfile调用关闭函数
// 此处调用会触发引用资源的关闭和释放,以及时触发mappedfile自己的清理函数,完成内存区域的清理和释放
this.shutdown(intervalForcibly);
// 如果说已经清理释放完毕了
if (this.isCleanupOver()) {
try {
// 此时就对这个mappedfile绑定的一个nio文件通道做一个关闭函数的调用
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
// 同时再次对这个mappedfile进行一个文件上的删除
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
对我们已经销毁掉的文件进行过期文件删除
// 删除过期文件
void deleteExpiredFile(List<MappedFile> files) {
if (!files.isEmpty()) {
// 对过期文件进行遍历
Iterator<MappedFile> iterator = files.iterator();
while (iterator.hasNext()) {
MappedFile cur = iterator.next();
// 如果说此时这个文件在mappedFiles列表里已经不存在的了,此时就可以给他删除掉
if (!this.mappedFiles.contains(cur)) {
iterator.remove();
log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());
}
}
try {
// 最终会从mappedfile里把要删除的文件全部剔除掉
if (!this.mappedFiles.removeAll(files)) {
log.error("deleteExpiredFile remove failed.");
}
} catch (Exception e) {
log.error("deleteExpiredFile has exception.", e);
}
}
}
上述代码 虽然长,但是很容易理解, 就是 遍历 目录下的 MappedFile 集合, 寻找出 满足删除条件的 MappedFile ,再调用 mf.destory() 方法进行删除。
只需要注意的是: 该方法是供 删除 CommitLog 文件使用的。
5、获取commitlog最大偏移量
// 获取commitlog最大偏移量
public long getMaxOffset() {
// 先获取到最后一个mappedfile
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
// 把mappedfile文件名称代表的一个物理偏移量 + 当前mappedfile自己内部的可读位置
return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
}
return 0;
}
6、获取到当前最大的一个写位置
// 获取到当前最大的一个写位置
public long getMaxWrotePosition() {
// 拿到最后一个mappedfile
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
// 返回mappedfile文件名称代表的物理位置 + 文件内部写位置
return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
}
return 0;
}
7、flush刷盘
public boolean flush(final int flushLeastPages) {
boolean result = true;
// 根据偏移量去查找第一个mappedfile,从flushedWhere偏移量开始,去查找一个MappedFile出来
MappedFile mappedFile = this.findMappedFileByOffset(
this.flushedWhere,
this.flushedWhere == 0
);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages); // 真正对mappedfile发起flush
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
/**
* Finds a mapped file by offset.
*
* @param offset Offset.
* @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
* @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
*/
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
// 先获取到commitlog对应的第一个mappedfile
MappedFile firstMappedFile = this.getFirstMappedFile();
// 再获取到commitlog对应的最后一个mappedfile
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
// 你要查找的这个偏移量小于第一个mappedfile文件名物理偏移量,或者是大于了mappedfile文件名偏移量+文件大小
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
}
// 正常情况下查找的偏移量再第一个mappedfile和最后一个mappedfile中间
else {
// 查找偏移量对mappedfile文件大小做一个整除,其实首先可以大致先推算出来这个偏移量是在第几个mappedfile里
// 此时推算出来的mappedfile index,还需要扣减掉第一个文件起始偏移量对文件大小做一个整除,扣减掉了以后就可以拿到相对的index索引位置
// 我需要知道,查找偏移量是在现存的mappedfile里的第几个
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
// 此时就直接可以定位到我的这个偏移量所对应的那个mappedfile
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
// 如果说定位到的mappedfile不为null,而且我们查找偏移量是大于等于定位mappedfile起始偏移量的
// 而且查找偏移量是小于定位mappedfile起始偏移量+文件大小,此时可以说明,查找偏移量刚好处于定位文件数据范围内
// 此时就可以直接返回了
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
// 如果说上面定位没有成功,此时遍历你内存里的每个mappedfile
for (MappedFile tmpMappedFile : this.mappedFiles) {
// 如果说查找偏移量在任何一个mappedfile数据范围内,此时也可以返回
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
// 实在是没有找到,此时如果说没找到就返回第一个mappedfile就可以来返回
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}