天天看点

RocketMQ源码分析之映射文件队列MappedFileQueue

作者:程序员阿龙

上一篇:RocketMQ源码分析之文件内存映射对象层MappedFile

一、前言

既然 MappedFile 是管理单个文件的类, 那么就会存在用来管理 这些 MappedFile的类:MappedFileQueue。

我们可以把他们之间的关系形象的理解成 : 文件(MappedFile) 和 目录(MappedFileQueue)

RocketMQ源码分析之映射文件队列MappedFileQueue

二、源码分析

  1. MappedFileQueue成员变量;
  2. 加载磁盘数据到内存映射区域里来;
  3. 获取当前正在顺序写的MappedFile对象;
  4. 根据时间删除文件;
  5. 获取commitlog最大偏移量;
  6. 获取到当前最大的一个写位置;
  7. flush刷盘;

1、MappedFileQueue成员变量

public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    // commitlog存储路径:
    // 1. CommitLog文件目录路径为: ../store/commit/log
    // 2. ConsumeQueue文件目录路径为: ../store/xxx_topic/x
    private final String storePath;
    // 映射文件大小
    // 1. commitLog文件 默认1g
    // 2. consumeQueue文件 默认600w字节)
    protected final int mappedFileSize;
    // 所有的映射文件
    protected final CopyOnWriteArrayList<MappedFile> mappedFiles =
            new CopyOnWriteArrayList<MappedFile>();
    // 分配映射文件服务
    private final AllocateMappedFileService allocateMappedFileService;
    // 从哪里开始flush
    protected long flushedWhere = 0;
    // 从哪里开始commit
    private long committedWhere = 0;
    // 当前目录下最后一条msg存储时间
    private volatile long storeTimestamp = 0;
}           

上述属性基本上都很简单, 这里需要强调其中一个 属性 flushedWhere;

MappedFileQueue目录中的 MappedFile文件 是顺序写的, 当文件写满了之后,才回去创建新的MappedFile , 其中MappedFile的文件名为 物理偏移量。

简单举个例子(仅作说明使用 ):假设 每个文件大小为 64bytes 第一个文件名为 00000 , 当该文件写满了 则需要创建第二个文件,那么这第二个文件的文件名为 00064 , 此时写也只能向第二个文件中写,那么当写了 32bytes后 的 flushedWhere = 00064 + 00032 = 00096。

2、加载磁盘数据到内存映射区域里来

  1. 根据 指定文件目录( 如:../store/commit/log) , 构建 File 对象(注意:是个文件夹)。
  2. 遍历 该文件夹下 所有的 文件 并排序 得到 File[] files 数组 (注意:这是文件的集合)。
  3. 遍历 排序后的文件集合,为每个文件创建 MappedFile对象 并赋上初始值,然后存入 MappedFiles集合中;

正常Broker 在启动后, 会先调用 load() 方法 加载出目录下所有的MappedFile, 然后再通过 recover的相关方法来重新赋上准确的值。

// 加载磁盘数据到内存映射区域里来
public boolean load() {
    // 先把commitlog存储路径封装成一个file,此时这个file是存储目录
    File dir = new File(this.storePath);
    // 对存储目录里的子文件列表获取到,拿到一堆的file
    File[] ls = dir.listFiles();
    if (ls != null) {
        // 对所有的files磁盘文件去进行加载操作
        return doLoad(Arrays.asList(ls));
    }
    return true;
}           
// 对所有的磁盘文件进行一个加载
public boolean doLoad(List<File> files) {
    // ascending order
    // 基于文件名称进行一个排序,文件名称都是物理偏移量
    files.sort(Comparator.comparing(File::getName));

    // 对排序后的文件做一个遍历
    for (File file : files) {
        if (file.length() != this.mappedFileSize) {
            log.warn(file + "\t" + file.length()
                    + " length not matched message store config value, please check it manually");
            return false;
        }

        try {
            // 直接把一个磁盘文件封装成一个MappedFile,定位一下他的可以写入的位置、flush位置、commit位置
            MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
            // 对映射完毕的mappedfile设置可以写入位置、可以flush位置、可以commit位置,这些位置都是从mappedfile大小开始的
            mappedFile.setWrotePosition(this.mappedFileSize);
            mappedFile.setFlushedPosition(this.mappedFileSize);
            mappedFile.setCommittedPosition(this.mappedFileSize);
            // 加入到 list中
            this.mappedFiles.add(mappedFile);
            log.info("load " + file.getPath() + " OK");
        } catch (IOException e) {
            log.error("load file " + file + " error", e);
            return false;
        }
    }
    return true;
}           

3、获取当前正在顺序写的MappedFile对象

当前正在顺序写的MappedFile 必定是 MappedFile集合中的末尾文件。 因此代码中直接调用 getLastMappedFile() 方法获取了末尾的MappedFile, 而此时 会存在 3中情况:

  1. 该 MappedFile 存在 且 MappedFile 内 还有剩余可写空间。(这也是最好的情况,正常返回就行了)
  2. 该 MappedFile 存在,但是该MappedFile 已经被写满了。 (需要创建 新的MappedFile)
  3. 该 MappedFile 不存在 ,也就说明 目录下并没有任何文件。(需要创建 新的MappedFile)

其中 2 ,3 情况 需要 创建新的 MappedFile ,而创建 MappedFile 的方式分为了两种:

  1. 通过 allocateMappedFileService 使用其它线程来创建。( MappedFile >= 1g 时 有预热操作)
  2. 普通 new MappedFile() 方式创建。(无预热操作)
/**
 * 获取当前正在顺序写的MappedFile对象
 *   (存储消息 或者 存储ConsumeQueue数据时, 都需要获取当前正在顺序写的MappedFile对象)
 *   注意: 如果MappedFile写满了 或者 不存在查找的MappedFile, 则创建新的MappedFile
 *
 * @param startOffset   文件起始偏移量
 * @param needCreate    当list为空时,是否创建 mappedFile
 * @return
 */
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    // 该值 控制是否需要创建MappedFile ,当需要创建MappedFile时,它充当文件名
    // 两种情况 会创建:
    //  1. list 内没有mappedFIle
    //  2. list最后一个mappedFile (当前顺序写的mappedFile)它写满了
    long createOffset = -1;
    //  获取 list 中的最后一个 MappedFile
    MappedFile mappedFileLast = getLastMappedFile();

    // 情况1 list 内没有mappedFile
    if (mappedFileLast == null) {
        // createOffset 取值必须是 mappedFileSize 的倍数 或者 0
        createOffset = startOffset - (startOffset % this.mappedFileSize);
    }

    // 情况2 list最后一个mappedFile (当前顺序写的mappedFile)它写满了
    if (mappedFileLast != null && mappedFileLast.isFull()) {
        // 上一个文件名 转Long + mappedFileSize
        createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }

    // 这里是创建 新的 mappedFile 逻辑
    if (createOffset != -1 && needCreate) {
        return tryCreateMappedFile(createOffset);
    }

    return mappedFileLast;
}           
protected MappedFile tryCreateMappedFile(long createOffset) {
    // 获取 下次待创建文件的 绝对路径
    String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
    // 获取 下下次待创建文件的 绝对路径
    String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
    return doCreateMappedFile(nextFilePath, nextNextFilePath);
}           
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
    MappedFile mappedFile = null;

    // 使用 allocateMappedFileService 来创建 MappedFile
    if (this.allocateMappedFileService != null) {
        // 当mappedFileSize >= 1g 的话, 这里创建的mappedFile 会执行它的 预热方法
        mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(
                nextFilePath,
                nextNextFilePath,
                this.mappedFileSize
        );
    }
    // 直接创建 MappedFile (这里没有预热)
    else {
        try {
            mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
        } catch (IOException e) {
            log.error("create mappedFile exception", e);
        }
    }

    // 将创建的 mappedFile 添加到 list中 并返回
    if (mappedFile != null) {
        if (this.mappedFiles.isEmpty()) {
            mappedFile.setFirstCreateInQueue(true);
        }
        this.mappedFiles.add(mappedFile);
    }

    // 走到这里... 是无需创建 MappedFile时 返回。
    return mappedFile;
}           

4、根据时间删除文件

/**
 * commitLog 根据时间删除文件
 * @param expiredTime  过期时间
 * @param deleteFilesInterval  删除两个文件之间的时间间隔
 * @param intervalForcibly  强制关闭资源的时间间隔  mf.destory传递的参数
 * @param cleanImmediately  true 强制删除,不考虑过期时间这个条件
 * @return
 */
public int deleteExpiredFileByTime(
        final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately) {
    // 拷贝mappedfiles数组
    Object[] mfs = this.copyMappedFiles(0);

    if (null == mfs)
        return 0;

    // 这里 减-1 是保证 当前正在顺序写的MappedFile不被删除
    int mfsLength = mfs.length - 1;
    // 记录删除的文件数
    int deleteCount = 0;
    List<MappedFile> files = new ArrayList<MappedFile>();

    if (null != mfs) {
        // 对每一个mappedfile做一个遍历
        for (int i = 0; i < mfsLength; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            // 对每个mappedfile获取到他的最近一次修改时间 + 一个mappedfile过期时间
            // 计算出来这个mappedfile在最近一次修改了之后,最大的可以存活的时间
            long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;

            // 如果说当前时间已经超过了最大可以存活的时间了,此时就说明这个文件已经过期了,可以清理了
            // 或者是不管他到没到最大存活时间,都标识了一个立即清理,也可以清理了
            if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {

                // 对当前mappedfile调用一下销毁的函数,销毁和释放成功了,就可以去走下面的收尾逻辑
                if (mappedFile.destroy(intervalForcibly)) {
                    // 把销毁成功的mappedfile加入到files列表里去
                    files.add(mappedFile);
                    // 成功删除和销毁的mappedfile数量做一个累加
                    deleteCount++;

                    // 如果销毁掉的文件数量大于等于最大批量删除文件的大小,此时就终止文件清理
                    // 这个也是一样的,避免一次性删除文件过多,也会导致磁盘io过高,机器负载增加的
                    if (files.size() >= DELETE_FILES_BATCH_MAX) {
                        break;
                    }

                    // 如果说删除文件间隔是大于0,而且此时还没删除完毕
                    if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                        try {
                            // 休眠一会儿,停顿指定的删除文件时间间隔,避免频繁的删除文件,可能会导致磁盘io过高
                            Thread.sleep(deleteFilesInterval);
                        } catch (InterruptedException e) {
                        }
                    }
                } else {
                    break;
                }
            } else {
                //avoid deleting files in the middle
                break;
            }
        }
    }

    // 对我们已经销毁掉的文件进行过期文件删除
    deleteExpiredFile(files);

    return deleteCount;
}           
private Object[] copyMappedFiles(final int reservedMappedFiles) {
    Object[] mfs;

    if (this.mappedFiles.size() <= reservedMappedFiles) {
        return null;
    }

    // 就是把mappedfiles这个list转换为了一个数组
    mfs = this.mappedFiles.toArray();
    return mfs;
}           

调用MappedFile#destroy方法

// 销毁一个mappedfile
public boolean destroy(final long intervalForcibly) {
    // 对当前的这个mappedfile调用关闭函数
    // 此处调用会触发引用资源的关闭和释放,以及时触发mappedfile自己的清理函数,完成内存区域的清理和释放
    this.shutdown(intervalForcibly);

    // 如果说已经清理释放完毕了
    if (this.isCleanupOver()) {
        try {
            // 此时就对这个mappedfile绑定的一个nio文件通道做一个关闭函数的调用
            this.fileChannel.close();
            log.info("close file channel " + this.fileName + " OK");

            long beginTime = System.currentTimeMillis();
            // 同时再次对这个mappedfile进行一个文件上的删除
            boolean result = this.file.delete();
            log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                + this.getFlushedPosition() + ", "
                + UtilAll.computeElapsedTimeMilliseconds(beginTime));
        } catch (Exception e) {
            log.warn("close file channel " + this.fileName + " Failed. ", e);
        }

        return true;
    } else {
        log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
            + " Failed. cleanupOver: " + this.cleanupOver);
    }

    return false;
}           

对我们已经销毁掉的文件进行过期文件删除

//  删除过期文件
void deleteExpiredFile(List<MappedFile> files) {
    if (!files.isEmpty()) {

        // 对过期文件进行遍历
        Iterator<MappedFile> iterator = files.iterator();
        while (iterator.hasNext()) {
            MappedFile cur = iterator.next();
            // 如果说此时这个文件在mappedFiles列表里已经不存在的了,此时就可以给他删除掉
            if (!this.mappedFiles.contains(cur)) {
                iterator.remove();
                log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());
            }
        }

        try {
            // 最终会从mappedfile里把要删除的文件全部剔除掉
            if (!this.mappedFiles.removeAll(files)) {
                log.error("deleteExpiredFile remove failed.");
            }
        } catch (Exception e) {
            log.error("deleteExpiredFile has exception.", e);
        }
    }
}           

上述代码 虽然长,但是很容易理解, 就是 遍历 目录下的 MappedFile 集合, 寻找出 满足删除条件的 MappedFile ,再调用 mf.destory() 方法进行删除。

只需要注意的是: 该方法是供 删除 CommitLog 文件使用的。

5、获取commitlog最大偏移量

// 获取commitlog最大偏移量
public long getMaxOffset() {
    // 先获取到最后一个mappedfile
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        // 把mappedfile文件名称代表的一个物理偏移量 + 当前mappedfile自己内部的可读位置
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}           

6、获取到当前最大的一个写位置

// 获取到当前最大的一个写位置
public long getMaxWrotePosition() {
    // 拿到最后一个mappedfile
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        // 返回mappedfile文件名称代表的物理位置 + 文件内部写位置
        return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
    }
    return 0;
}           

7、flush刷盘

public boolean flush(final int flushLeastPages) {
    boolean result = true;
    // 根据偏移量去查找第一个mappedfile,从flushedWhere偏移量开始,去查找一个MappedFile出来
    MappedFile mappedFile = this.findMappedFileByOffset(
            this.flushedWhere,
            this.flushedWhere == 0
    );

    if (mappedFile != null) {
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        int offset = mappedFile.flush(flushLeastPages); // 真正对mappedfile发起flush
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        this.flushedWhere = where;
        if (0 == flushLeastPages) {
            this.storeTimestamp = tmpTimeStamp;
        }
    }

    return result;
}           
/**
 * Finds a mapped file by offset.
 *
 * @param offset Offset.
 * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
 * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
 */
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
        // 先获取到commitlog对应的第一个mappedfile
        MappedFile firstMappedFile = this.getFirstMappedFile();
        // 再获取到commitlog对应的最后一个mappedfile
        MappedFile lastMappedFile = this.getLastMappedFile();

        if (firstMappedFile != null && lastMappedFile != null) {
            // 你要查找的这个偏移量小于第一个mappedfile文件名物理偏移量,或者是大于了mappedfile文件名偏移量+文件大小
            if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                    offset,
                    firstMappedFile.getFileFromOffset(),
                    lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                    this.mappedFileSize,
                    this.mappedFiles.size());
            }

            // 正常情况下查找的偏移量再第一个mappedfile和最后一个mappedfile中间
            else {
                // 查找偏移量对mappedfile文件大小做一个整除,其实首先可以大致先推算出来这个偏移量是在第几个mappedfile里
                // 此时推算出来的mappedfile index,还需要扣减掉第一个文件起始偏移量对文件大小做一个整除,扣减掉了以后就可以拿到相对的index索引位置
                // 我需要知道,查找偏移量是在现存的mappedfile里的第几个
                int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                MappedFile targetFile = null;
                try {
                    // 此时就直接可以定位到我的这个偏移量所对应的那个mappedfile
                    targetFile = this.mappedFiles.get(index);
                } catch (Exception ignored) {
                }

                // 如果说定位到的mappedfile不为null,而且我们查找偏移量是大于等于定位mappedfile起始偏移量的
                // 而且查找偏移量是小于定位mappedfile起始偏移量+文件大小,此时可以说明,查找偏移量刚好处于定位文件数据范围内
                // 此时就可以直接返回了
                if (targetFile != null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                    return targetFile;
                }

                // 如果说上面定位没有成功,此时遍历你内存里的每个mappedfile
                for (MappedFile tmpMappedFile : this.mappedFiles) {
                    // 如果说查找偏移量在任何一个mappedfile数据范围内,此时也可以返回
                    if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                        return tmpMappedFile;
                    }
                }
            }

            // 实在是没有找到,此时如果说没找到就返回第一个mappedfile就可以来返回
            if (returnFirstOnNotFound) {
                return firstMappedFile;
            }
        }
    } catch (Exception e) {
        log.error("findMappedFileByOffset Exception", e);
    }

    return null;
}