天天看點

RocketMQ源碼8-broker存儲檔案組織和記憶體映射

作者:Java機械師

1. 存儲設計

RocketMQ存儲檔案主要包括:CommitLog檔案、ConsumerQueue檔案、Index檔案

  • CommitLog檔案:所有Topic的消息按照抵達順序依次追加到CommitLog中,一旦寫入不支援修改
  • ConsumeQueue檔案:消息消費隊列,用于消費者消費,即消費者通過此檔案來從CommitLog中擷取消息。消息達到CommitLog後,将異步轉發到ConsumeQueue檔案
  • Index檔案:消息索引,主要存儲消息key與offset的對應關系

RocketMQ将所有Topic的消息都存儲在同一個CommitLog檔案中,一般按照Topic來檢索消息,是以為了提高消息消費的效率,RocketMQ引入了ConsumeQueue檔案(消費隊列),每一個Topic包含多個消息消費隊列,每一個消費隊列都有一個檔案。

為了根據消息的屬性從CommitLog檔案中快速檢索消息,RocketMQ引入了Index索引檔案。

存儲目錄為:

RocketMQ源碼8-broker存儲檔案組織和記憶體映射

1.1 基本原理

  • Page Cache:Page Cache是檔案系統層的Cache,主要用來減少對磁盤的I/O操作,通過對磁盤的通路變為實體記憶體的通路,緩存的是記憶體頁面,操作時按照頁為基本機關。在Linux系統中寫入資料的時候并不會直接寫到硬碟上,而是會先寫到Page Cache中,并打上dirty辨別,由核心線程flusher定期将被打上dirty的頁發送給IO排程層,最後由IO排程決定何時落地到磁盤中,而Linux一般會把還沒有使用的記憶體全拿來給Page Cache使用。而讀的過程也是類似,會先到Page Cache中尋找是否有資料,有的話直接傳回,如果沒有才會到磁盤中去讀取并寫入Page Cache,然後再次讀取Page Cache并傳回。而且讀的這個過程中作業系統也會有一個預讀的操作,你的每一次讀取作業系統都會幫你預讀出後面一部分資料。當你一直在使用預讀資料的時候,系統會幫你預讀出更多的資料(最大到128K)。
  • Buffer Cache:Buffer Cache是針對裝置的,實際操作按塊為基本機關,對于裸盤的讀寫會占用Buffer Cache,當讀寫完成之後,會歸還給作業系統。
  • 在linux2.4核心中Buffer Cache和Page Cache是共存的,因為檔案的讀寫最終會轉化為塊裝置的讀寫,即同一份檔案的資料,可能既在Buffer Cache中也在Page Cache中,這樣就造成了實體記憶體的浪費。
  • linux2.6核心對Buffer Cache和Page Cache進行了合并,統一為Page Cache。當進行檔案讀寫時,如果檔案在磁盤上的存儲塊是連續的,那麼檔案在Page Cache中對應的頁是普通的page,如果檔案在磁盤上的資料塊是不連續的,或者是裝置檔案,那麼檔案在Page Cache中對應的頁就是Buffer Cache
  • 檢視記憶體情況
  • $ # cat /proc/meminfo MemTotal: 3876772 kB MemFree: 126704 kB MemAvailable: 137132 kB Buffers: 48 kB Cached: 258648 kB SwapCached: 12344 kB ...省略... Buffers: 表示`Buffer Cache`的容量 Cached: 表示位于實體記憶體中的頁緩存`Page Cache` SwapCached:表示位于磁盤交換區的頁緩存`Page Cache` 實際的`Page Cache`容量=Cached+SwapCached 複制代碼
  • linux底層提供mmap将檔案映射進虛拟記憶體,對檔案的讀寫變成對記憶體的讀寫,能充分利用Page Cache,但是如果對檔案進行随機讀寫,會使虛拟記憶體産生很多缺頁(Page Fault)中斷,此時作業系統需要将磁盤檔案的資料再次加載到Page Cache,這個過程比較慢。如果對檔案進行順序讀寫,讀和寫的區域都是被作業系統緩存過的熱點區域,不會産生大量的缺頁中斷,檔案的讀寫操作相當于直接記憶體的操作,性能會提升很多。如果記憶體不夠充足,核心把記憶體配置設定給Page Cache後,空閑記憶體會變少,如果程式有新的記憶體配置設定或者缺頁中斷,但是空閑記憶體不夠,核心需要花費時間将熱度低的Page Cache記憶體回收掉,此時性能會下降。當遇到作業系統進行髒頁回寫,記憶體回收,記憶體換入換出等情形時,會産生較大的讀寫延遲,造成存儲引擎偶發的高延遲,針對這種現象,RocketMQ采用了多種優化技術,比如記憶體預配置設定,檔案預熱,mlock系統調用,讀寫分離等,來保證利用Page Cache優點的同時,消除其帶來的延遲。
  • 工具檢視:hcache是基于pcstat,pcstat可以檢視檔案是否被緩存和根據pid來檢視緩存了哪些檔案,hcache是pcstat的增強版本,增加了檢視整個系統Cache和根據Cache大小排序的功能:
  • 檢視使用Cache最多的3個程序 $ hcache --top 3 +----------------------------------+--------------+-------+--------+---------+ | Name | Size (bytes) | Pages | Cached | Percent | |----------------------------------+--------------+-------+--------+---------| | /usr/share/atom/atom | 81137776 | 19810 | 19785 | 099.874 | | /usr/bin/dockerd | 68608880 | 16751 | 14321 | 085.493 | | /usr/share/atom/snapshot_blob.bin| 54619240 | 13335 | 13335 | 100.000 | +----------------------------------+--------------+-------+--------+---------+ 複制代碼

1.2 CommitLog

  • RocketMQ Broker單個執行個體下所有的Topic都使用同一個日志資料檔案(CommitLog)來存儲(即單個執行個體消息整體有序),這點與kafka不同(kafka采用每個分區一個日志檔案存儲)
  • CommitLog單個檔案大小預設1G,檔案檔案名是起始偏移量,總共20位,左邊補零,起始偏移量是0。假設檔案按照預設大小1G來算
    • 第一個檔案的檔案名為00000000000000000000 ,當第一個檔案被寫滿之後,開始寫入第二個檔案
    • 第二個檔案的檔案名為00000000001073741824 ,1G=1073741824=1024*1024*1024
    • 第三個檔案的檔案名是00000000002147483648,(檔案名相差1G=1073741824=1024*1024*1024)
  • CommitLog按照上述命名的好處是給出任意一個消息的實體偏移量,可以通過二分法進行查找,快速定位這個檔案的位置,然後用消息實體偏移量減去所在檔案的名稱,得到的內插補點就是在該檔案中的絕對位址

1.3 ConsumeQueue

ConsumeQueue是消息消費隊列,它是一個邏輯隊列,相當于CommitLog的索引檔案。因為RocketMQ的隊列不存儲任何實際資料,它隻存儲CommitLog中的【起始實體位置偏移量,消息的内容大小,消息Tag的哈希值】,每一個ConsumeQueue存儲的格式如下,總共20B。存tag是為了在消費者取到消息offset後先根據tag做一次過濾,剩下的才需要到CommitLog中取消息詳情:

RocketMQ源碼8-broker存儲檔案組織和記憶體映射

每個ConsumeQueue都有一個queueId,queueId 的值為0到TopicConfig配置的隊列數量。比如某個Topic的消費隊列數量為4,那麼四個ConsumeQueue的queueId就分别為0、1、2、3。

消費者消費時會先從ConsumeQueue中查找消息在CommitLog中的offset,再去CommitLog中找原始消息資料。如果某個消息隻在CommitLog中有資料,沒在ConsumeQueue中, 則消費者無法消費

ConsumeQueue類對應的是每個topic和queuId下面的所有檔案。預設存儲路徑是$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每個檔案由30w條資料組成,單個檔案的大小是30w x 20Byte,即每個檔案為600w位元組,單個消費隊列的檔案大小約為5.722M=(600w/(1024*1024))

1.4 Index檔案

IndexFile:索引檔案,實體存儲上,檔案名為建立時間的時間戳命名,固定的單個IndexFile檔案大小約為400M,一個IndexFile可以儲存2000W個索引

IndexFile(索引檔案)由IndexHeader(索引檔案頭), Slot(槽位)和Index(消息的索引内容)三部分構成。對于每個IndexFile來說IndexHeader是固定大小的,Slot是索引的目錄,用于定位Index在IndexFile中存儲的實體位置。存儲圖:

RocketMQ源碼8-broker存儲檔案組織和記憶體映射

1.5 checkpoint檔案

checkpoint檢查點檔案的作用是記錄CommitLog、ConsumeQueue、Index檔案的刷盤時間點,檔案固定長度為4kb,隻用該檔案的前24個位元組

  • physicMsgTimestamp:CommitLog檔案刷盤時間點
  • logicsMsgTimestamp:ConsumeQueue檔案刷盤時間點
  • indexMsgTimestamp:Index檔案刷盤時間點

1.6 TransientStorePool機制

RocketMQ為了降低PageCache的使用壓力,引入了transientStorePoolEnable機制,即記憶體級别的讀寫分離機制

預設情況,RocketMQ将消息寫入PageCache,消費時從PageCache中讀取消息。但是這樣在高并發下PageCache壓力會比較大,容易出現瞬時broker busy異常。RocketMQ通過開啟transientStorePoolEnable=true,将消息寫入堆外記憶體并立即傳回,然後異步将堆外記憶體中的資料批量送出到PageCache,再異步刷盤到磁盤中。這樣的好處就是形成記憶體級别的讀寫分離,發送寫入消息是向堆外記憶體,消費讀取消息是從PageCache

該機制的缺點就是如果意外導緻broker程序異常退出,已經放入到PageCache中的資料不會丢失,而存儲在堆外記憶體的資料會丢失

2. MappedFileQueue

RocketMQ使用MappedFile、MappedFileQueue來封裝存儲檔案。MappedFileQueue是MappedFile的管理容器,使用CopyOnWriteArrayList來管理所有的MappedFile。MappedFileQueue提供查找目錄下MappedFile的方法。MappedFileQueue核心屬性:

/**
 * 1.MappedFile組成的隊列
 *  * 2.包括CommitLog(消息主題以及中繼資料) ConsumerQueue邏輯隊列
 */
public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    // 存儲目錄
    private final String storePath;

    // 單個檔案的存儲大小
    private final int mappedFileSize;

    // MappedFile集合
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    // 建立MappedFile服務類
    private final AllocateMappedFileService allocateMappedFileService;

    // 目前刷盤指針,表示該指針之前的所有資料全部持久化到磁盤
    private long flushedWhere = 0;
    // 目前資料送出指針,記憶體中ByteBuffer目前的寫指針,該值大于、等于flushedWhere
    // (write >= commit >= flush位置)
    private long committedWhere = 0;
    //目前已刷盤的最後一條消息存儲的時間戳
    private volatile long storeTimestamp = 0;

    public MappedFileQueue(final String storePath, int mappedFileSize,
        AllocateMappedFileService allocateMappedFileService) {
        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;
        this.allocateMappedFileService = allocateMappedFileService;
    }
    ...
}
複制代碼           

2.1 擷取第一和最後一個MappedFile

擷取第一個MappedFile

/**
 * 傳回隊列中第一個MappedFile,這裡忽略索引越界異常,可能一個都沒有,傳回null
 * 先判斷mappedFiles是否為空,然後get(0),因為存在并發,是以需要即使判斷為空,還是可能索引越界
 * @return
 */
public MappedFile getFirstMappedFile() {
    MappedFile mappedFileFirst = null;

    if (!this.mappedFiles.isEmpty()) {
        try {
            mappedFileFirst = this.mappedFiles.get(0);
        } catch (IndexOutOfBoundsException e) {
            //ignore
        } catch (Exception e) {
            log.error("getFirstMappedFile has exception.", e);
        }
    }

    return mappedFileFirst;
}
複制代碼           

擷取最後一個MappedFile

public MappedFile getLastMappedFile() {
    MappedFile mappedFileLast = null;

    while (!this.mappedFiles.isEmpty()) {
        try {
            //由于get和size沒有加鎖
            // size擷取的值可能是舊的,是以可能出現錯誤的大小,導緻索引越界
            // get擷取的值可能是舊的數組,是以可能出現索引越界
            mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
            break;
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getLastMappedFile has exception.", e);
            break;
        }
    }

    return mappedFileLast;
}
複制代碼           

通過起始偏移量,擷取最後一個MappedFile,如果不存在,可自動建立

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    long createOffset = -1;
    //最後一個映射檔案
    MappedFile mappedFileLast = getLastMappedFile();

    if (mappedFileLast == null) {
        //如果沒有映射檔案就 建立開始的offset
        createOffset = startOffset - (startOffset % this.mappedFileSize);
    }

    if (mappedFileLast != null && mappedFileLast.isFull()) {
        createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }

    //建立新的MappedFile
    if (createOffset != -1 && needCreate) {
        //檔案名
        String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
        String nextNextFilePath = this.storePath + File.separator
            + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
        MappedFile mappedFile = null;

        if (this.allocateMappedFileService != null) {
            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
        } else {
            try {
                mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.error("create mappedFile exception", e);
            }
        }

        //添加到隊列
        if (mappedFile != null) {
            if (this.mappedFiles.isEmpty()) {
                //辨別第一個檔案
                mappedFile.setFirstCreateInQueue(true);
            }
            this.mappedFiles.add(mappedFile);
        }

        return mappedFile;
    }

    return mappedFileLast;
}
複制代碼           

2.2 擷取最小最大偏移量

  • 最小:擷取第一個MappedFile,然後擷取其起始偏移量
  • 最大:擷取最後一個MappedFile,然後【起始偏移量】+【可讀位置】
/**
 * 擷取存儲檔案最小偏移量。從這裡也可以看出,并不是直接傳回
 * 0,而是傳回MappedFile的getFileFormOffset()方法
 */
public long getMinOffset() {

    if (!this.mappedFiles.isEmpty()) {
        try {
            return this.mappedFiles.get(0).getFileFromOffset();
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getMinOffset has exception.", e);
        }
    }
    return -1;
}

/**
 * 擷取存儲檔案的最大偏移量。傳回最後一個MappedFile的
 * fileFromOffset,加上MappedFile目前的讀指針
 */
public long getMaxOffset() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}

/**
 * 傳回存儲檔案目前的寫指針。傳回最後一個檔案的
 * fileFromOffset,加上目前寫指針位置
 */
public long getMaxWrotePosition() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
    }
    return 0;
}

/**
 *還有多少位元組等待commit的(wrote與commit位置之差)
 */
public long remainHowManyDataToCommit() {
    return getMaxWrotePosition() - committedWhere;
}
複制代碼           

2.3 根據時間戳查詢MappedFile

根據【消息存儲的時間戳】查詢。如果this.mappedFiles為空,則直接傳回null。如果不為空,則從第一個檔案開始查找,找到第一個最後一次更新時間大于查找時間戳的檔案,如果不存在,則傳回最後一個MappedFile檔案。

/**
 * 根據消息存儲時間戳查找MappdFile
 *
 * 從MappedFile清單中第一個
 * 檔案開始查找,找到第一個最後一次更新時間大于待查找時間戳的文
 * 件,如果不存在,則傳回最後一個MappedFile
 */
public MappedFile getMappedFileByTime(final long timestamp) {
    Object[] mfs = this.copyMappedFiles(0);

    if (null == mfs)
        return null;

    for (int i = 0; i < mfs.length; i++) {
        MappedFile mappedFile = (MappedFile) mfs[i];
        if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
            return mappedFile;
        }
    }

    return (MappedFile) mfs[mfs.length - 1];
}

private Object[] copyMappedFiles(final int reservedMappedFiles) {
    Object[] mfs;

    if (this.mappedFiles.size() <= reservedMappedFiles) {
        return null;
    }

    mfs = this.mappedFiles.toArray();
    return mfs;
}
複制代碼           

2.4 根據消息存儲的偏移量查詢MappedFile

根據【消息存儲的偏移量】查詢

/**
 * Finds a mapped file by offset.
 *
 * @param offset Offset.
 * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
 * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
 *
 * 根據消息偏移量offset查找MappedFile,但是不能直接使用
 * offset%mappedFileSize。這是因為使用了記憶體映射,隻要是存在于存
 * 儲目錄下的檔案,都需要對應建立記憶體映射檔案,如果不定時将已消
 * 費的消息從存儲檔案中删除,會造成極大的記憶體壓力與資源浪費,所
 * 以RocketMQ采取定時删除存儲檔案的政策。也就是說,在存儲檔案
 * 中,第一個檔案不一定是00000000000000000000,因為該檔案在某一
 * 時刻會被删除,是以根據offset定位MappedFile的算法為
 * (int)((offset/this.mappedFileSize)-(mappedFile.getFileFromOffset()/this.MappedFileSize))
 */
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
        MappedFile firstMappedFile = this.getFirstMappedFile();
        MappedFile lastMappedFile = this.getLastMappedFile();
        if (firstMappedFile != null && lastMappedFile != null) {
            if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                    offset,
                    firstMappedFile.getFileFromOffset(),
                    lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                    this.mappedFileSize,
                    this.mappedFiles.size());
            } else {
                // todo
                int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                MappedFile targetFile = null;
                try {
                    targetFile = this.mappedFiles.get(index);
                } catch (Exception ignored) {
                }

                if (targetFile != null && offset >= targetFile.getFileFromOffset()
                    && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                    return targetFile;
                }

                for (MappedFile tmpMappedFile : this.mappedFiles) {
                    if (offset >= tmpMappedFile.getFileFromOffset()
                        && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                        return tmpMappedFile;
                    }
                }
            }

            if (returnFirstOnNotFound) {
                return firstMappedFile;
            }
        }
    } catch (Exception e) {
        log.error("findMappedFileByOffset Exception", e);
    }

    return null;
}
複制代碼           

2.5 重置offset

将offset以後的MappedFile都清除掉,在目前4.3.1版本中存在bug,貌似也沒有使用

/**
 * 将offset以後的MappedFile都清除掉
 */
public boolean resetOffset(long offset) {
    MappedFile mappedFileLast = getLastMappedFile();

    if (mappedFileLast != null) {
        // 最後一個MappedFile的【起始偏移量】+ 【寫入PageCache的位置】
        long lastOffset = mappedFileLast.getFileFromOffset() +
            mappedFileLast.getWrotePosition();
        // 最後的寫入位置與offset的內插補點,如果大于2個MappedFile大小,就不做重置
        long diff = lastOffset - offset;

        final int maxDiff = this.mappedFileSize * 2;
        if (diff > maxDiff)
            return false;
    }

    ListIterator<MappedFile> iterator = this.mappedFiles.listIterator();

    while (iterator.hasPrevious()) {
        mappedFileLast = iterator.previous();
        if (offset >= mappedFileLast.getFileFromOffset()) {
            // 定位到offset在第幾個MappedFile中
            int where = (int) (offset % mappedFileLast.getFileSize());
            // 重置最後一個MappedFile的位置
            mappedFileLast.setFlushedPosition(where);
            mappedFileLast.setWrotePosition(where);
            mappedFileLast.setCommittedPosition(where);
            break;
        } else {
            // 如果offset小于目前的MappedFile的起始偏移量,則直接删除MappedFile
            iterator.remove();
        }
    }
    return true;
}
複制代碼           

3. MappedFile

MappedFile是RocketMQ記憶體映射檔案的具體實作。将消息位元組寫入Page Cache緩沖區中(commit方法),或者将消息刷入磁盤(flush)。CommitLog consumerQueue、index三類檔案磁盤的讀寫都是通過MappedFile。

MappedFile的核心屬性:

  • wrotePosition:儲存目前檔案所映射到的消息寫入page cache的位置
  • flushedPosition:儲存刷盤的最新位置
  • wrotePosition和flushedPosition的初始化值為0,一條1k大小的消息送達,當消息commit也就是寫入page cache以後,wrotePosition的值為1024 * 1024;如果消息刷盤以後,則flushedPosition也是1024 * 1024;另外一條1k大小的消息送達,當消息commit時,wrotePosition的值為1024 * 1024 + 1024 * 1024,同樣,消息刷盤後,flushedPosition的值為1024 * 1024 + 1024 * 1024。
public class MappedFile extends ReferenceResource {
    // 作業系統每頁大小,預設4KB
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    // 目前JVM執行個體中MappedFile的虛拟記憶體
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    // 目前JVM執行個體中
    // MappedFile對象個數
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    // 目前檔案的寫指針,從0開始(記憶體映射檔案中的寫指針)
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 目前檔案的送出指針,如果開啟transientStore-PoolEnable,則資料會存儲在
    //TransientStorePool中,然後送出到記憶體映射ByteBuffer中,再寫入磁盤
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 将該指針之前的資料持久化存儲到磁盤中
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    // 檔案大小
    protected int fileSize;
    // 檔案通道
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    // 堆外記憶體ByteBuffer,如果不為空,資料首先将存儲在該Buffer中,然後送出到MappedFile建立的
    //FileChannel中。transientStorePoolEnable為true時不為空
    protected ByteBuffer writeBuffer = null;
    // 堆外記憶體池,該記憶體池中的記憶體會提供記憶體鎖機制。transientStorePoolEnable為true時啟用
    protected TransientStorePool transientStorePool = null;
    // 檔案名稱
    private String fileName;
    // 該檔案的初始偏移量
    private long fileFromOffset;
    // 實體檔案
    private File file;
    // 實體檔案對應的記憶體映射Buffer
    private MappedByteBuffer mappedByteBuffer;
    // 檔案最後一次寫入内容的時間
    private volatile long storeTimestamp = 0;
    // 是否是MappedFileQueue隊列中第一個檔案。
    private boolean firstCreateInQueue = false;
}
複制代碼           

3.1 構造方法

根據transientStorePoolEnable是否為true調用不同的構造方法。

  • transientStorePoolEnable=true(隻在異步刷盤情況下生效)表示将内容先儲存在堆外記憶體中。TransientStorePool會通過ByteBuffer.allocateDirect調用直接申請堆外記憶體,消息資料在寫入記憶體的時候是寫入預申請的記憶體中
  • 通過Commit線程将資料送出到FileChannel中
  • 在異步刷盤的時候,再由刷盤線程(Flush線程)将資料持久化到磁盤檔案。

構造方法源碼:

/**
 * 如果設定transientStorePoolEnable為false則調用此方法,參見
 * org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation()
 */
public MappedFile(final String fileName, final int fileSize) throws IOException {
    init(fileName, fileSize);
}
/**
 * 如果設定transientStorePoolEnable為true則調用此方法,參見
 *org.apache.rocketmq.store.config.MessageStoreConfig#isTransientStorePoolEnable()
 * org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation()
 */
public MappedFile(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize, transientStorePool);
}

public void init(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    //如果transientStorePoolEnable為true,則初始化MappedFile的
    //writeBuffer,該buffer從transientStorePool中擷取
    this.writeBuffer = transientStorePool.borrowBuffer();
    this.transientStorePool = transientStorePool;
}
複制代碼           

FileChannel提供了map()方法把檔案映射到虛拟記憶體,通常情況可以映射整個檔案,如果檔案比較大,可以進行分段映射,RocketMQ這裡映射大小為(0,fileSize)。當通過map()方法建立映射關系之後,就不依賴于用于建立映射的FileChannel。特别是,關閉通道(Channel)對映射的有效性沒有影響。MappedFile的初始化(init)方法,初始化MappedByteBuffer,模式為MapMode.READ_WRITE(讀/寫),此模式對緩沖區的更改最終将寫入檔案;但該更改對映射到同一檔案的其他程式不一定是可見的。

private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    //通過檔案名擷取起始偏移量
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;
    //確定父目錄存在
    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("create file channel " + this.fileName + " Failed. ", e);
        throw e;
    } catch (IOException e) {
        log.error("map file " + this.fileName + " Failed. ", e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}
複制代碼           

修改MappedByteBuffer實際會将資料寫入檔案對應的Page Cache中,而TransientStorePool方案下寫入的則為純粹的記憶體。是以在消息寫入操作上會更快,是以能更少的占用CommitLog.putMessageLock鎖,進而能夠提升消息處理量。使用TransientStorePool方案的缺陷主要在于在異常崩潰的情況下會丢失更多的消息。

3.2 追加内容

追加就是将消息内容追加到映射檔案中,并且記錄更新時間和寫的位置

public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
    // todo
    return appendMessagesInner(msg, cb);
}

public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
    return appendMessagesInner(messageExtBatch, cb);
}

/**
 * 将消息追加到MappedFile檔案中
 */
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;

    // 擷取MappedFile目前檔案寫指針
    int currentPos = this.wrotePosition.get();

    // 如果currentPos小于檔案大小
    if (currentPos < this.fileSize) {
        /**
         * RocketMQ提供兩種資料落盤的方式:
         * 1. 直接将資料寫到mappedByteBuffer, 然後flush;
         * 2. 先寫到writeBuffer, 再從writeBuffer送出到fileChannel, 最後flush.
         */
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        // 單個消息
        if (messageExt instanceof MessageExtBrokerInner) {
            // todo 追加消息
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        // 批量消息
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    // 如果currentPos大于或等于檔案大小,表明檔案已寫滿,抛出異常
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
複制代碼           

3.3 送出(commit)

commit()方法:記憶體映射的送出,commit操作主要針對異步刷盤模式

commitLeastPages 為本次送出最小的頁數,如果待送出資料不滿commitLeastPages(預設4*4kb),則不執行本次送出操作,待下次送出。commit的作用就是将writeBuffer 中的資料送出到FileChannel中。

/**
 * commitLeastPages 為本次送出最小的頁面,預設4頁(4*4KB),可參見
 * org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run()
 */
public int commit(final int commitLeastPages) {
    /**
     * 1.writeBuffer 為空就不送出,而writeBuffer隻有開啟
     * transientStorePoolEnable為true并且是異步刷盤模式才會不為空
     * 是以commit是針對異步刷盤使用的
     */
    if (writeBuffer == null) {
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0(commitLeastPages);
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // All dirty data has been committed to FileChannel.
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        //清理工作,歸還到堆外記憶體池中,并且釋放目前writeBuffer
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }

    return this.committedPosition.get();
}

protected void commit0(final int commitLeastPages) {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();

    if (writePos - this.committedPosition.get() > 0) {
        try {
            //建立writeBuffer的共享緩存區,slice共享記憶體,其實就是切片
            //但是position、mark、limit單獨維護
            //新緩沖區的position=0,其capacity和limit将是緩沖區中剩餘的位元組數,其mark=undefined
            ByteBuffer byteBuffer = writeBuffer.slice();
            //上一次的送出指針作為position
            byteBuffer.position(lastCommittedPosition);
            //目前最大的寫指針作為limit
            byteBuffer.limit(writePos);
            //把commitedPosition到wrotePosition的寫入FileChannel中
            this.fileChannel.position(lastCommittedPosition);
            this.fileChannel.write(byteBuffer);
            //更新送出指針
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

/**
 * 是否能夠flush
 *  1. 檔案已經寫滿
 *  2. flushLeastPages > 0 && 未flush部分超過flushLeastPages
 *  3. flushLeastPages==0&&有新寫入的部分
 * @param flushLeastPages flush最小分頁
 *      mmap映射後的記憶體一般是記憶體頁大小的倍數,而記憶體頁大小一般為4K,是以寫入到映射記憶體的資料大小可以以4K進行分頁,
 *      而flushLeastPages這個參數隻是訓示寫了多少頁後才可以強制将映射記憶體區域的資料強行寫入到磁盤檔案
 * @return
 */
protected boolean isAbleToCommit(final int commitLeastPages) {
    int flush = this.committedPosition.get();
    int write = this.wrotePosition.get();
		// 如果檔案滿了(檔案大小與寫入位置一樣),則傳回true
    if (this.isFull()) {
        return true;
    }

    if (commitLeastPages > 0) {
        //總共寫入的頁大小-已經送出的頁大小>=最少一次寫入的頁大小,OS_PAGE_SIZE預設4kb
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
    }

    return write > flush;
}

public synchronized boolean hold() {
    if (this.isAvailable()) {
        if (this.refCount.getAndIncrement() > 0) {
            return true;
        } else {
            this.refCount.getAndDecrement();
        }
    }

    return false;
}

public void release() {
    long value = this.refCount.decrementAndGet();
    if (value > 0)
        return;

    synchronized (this) {
        //如果引用計數等于0,則執行清理堆外記憶體
        this.cleanupOver = this.cleanup(value);
    }
}
複制代碼           

3.4 刷盤(flush)

flush操作是将記憶體中的資料永久的寫入磁盤。刷寫磁盤是直接調用MappedByteBuffer或FileChannel的force()将記憶體中的資料持久化到磁盤。

/**
 * @return The current flushed position
 *
 * 刷盤指的是将記憶體中的資料寫入磁盤,永久存儲在磁盤中
 *
 */
public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            // todo flushedPosition應該等于MappedByteBuffer中的寫指針
            int value = getReadPosition();

            try {
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    // todo 将記憶體中資料持久化到磁盤
                    this.fileChannel.force(false);
                } else {
                    //
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
            // 設定
            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}
複制代碼           

3.5 預熱(warm)

對目前映射檔案進行預熱:

  • 第一步:對目前映射檔案的每個記憶體頁寫入一個位元組0。當刷盤政策為同步刷盤時,執行強制刷盤,并且是每修改pages(預設是16MB)個分頁刷一次盤
  • 第二步:将目前MappedFile全部的位址空間鎖定在實體存儲中,防止其被交換到swap空間。再調用madvise,傳入 MADV_WILLNEED 政策,将剛剛鎖住的記憶體預熱,其實就是告訴核心, 我馬上就要用(MADV_WILLNEED)這塊記憶體,先做虛拟記憶體到實體記憶體的映射,防止正式使用時産生缺頁中斷。

使用mmap()記憶體配置設定時,隻是建立了程序虛拟位址空間,并沒有配置設定虛拟記憶體對應的實體記憶體。當程序通路這些沒有建立映射關系的虛拟記憶體時,處理器自動觸發一個缺頁異常,進而進入核心空間配置設定實體記憶體、更新程序緩存表,最後傳回使用者空間,恢複程序運作。寫入假值0的意義在于實際配置設定實體記憶體,在消息寫入時防止缺頁異常。

源碼

/**
 * 1. 對目前映射檔案進行預熱
 *   1.1. 先對目前映射檔案的每個記憶體頁寫入一個位元組0.當刷盤政策為同步刷盤時,執行強制刷盤,并且是每修改pages個分頁刷一次盤
 *  再将目前MappedFile全部的位址空間鎖定,防止被swap
 *   1.2. 然後将目前MappedFile全部的位址空間鎖定在實體存儲中,防止其被交換到swap空間。再調用madvise,傳入 WILL_NEED 政策,将剛剛鎖住的記憶體預熱,其實就是告訴核心,我馬上就要用(WILL_NEED)這塊記憶體,先做虛拟記憶體到實體記憶體的映射,防止正式使用時産生缺頁中斷。
 *  2. 隻要啟用緩存預熱,都會通過mappedByteBuffer來寫入假值(位元組0),并且都會對mappedByteBuffer執行mlock和madvise。
 * @param type 刷盤政策
 * @param pages 預熱時一次刷盤的分頁數
 */
public void warmMappedFile(FlushDiskType type, int pages) {
    long beginTime = System.currentTimeMillis();
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    //上一次刷盤的位置
    int flush = 0;
    long time = System.currentTimeMillis();
    for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0);
        // force flush when flush disk type is sync
        if (type == FlushDiskType.SYNC_FLUSH) {
            /**
             *  同步刷盤,每修改pages個分頁強制刷一次盤,預設16MB
             * 參見org.apache.rocketmq.store.config.MessageStoreConfig#flushLeastPagesWhenWarmMapedFile
             */
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                flush = i;
                //FIXME 刷入修改的内容,不會有性能問題??
                mappedByteBuffer.force();
            }
        }

        // prevent gc
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                /***
                 * Thread.yield與Thread.sleep(0);相同,jvm底層使用的就是os::yield();
                 * https://www.jianshu.com/p/0964124ae822
                 * openJdk源碼thread.c jvm.cpp
                 */
                Thread.sleep(0);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }
    }

    // force flush when prepare load finished
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
            this.getFileName(), System.currentTimeMillis() - beginTime);
        mappedByteBuffer.force();
    }
    log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
        System.currentTimeMillis() - beginTime);

    this.mlock();
}
複制代碼           

3.6 預讀(mlock與munlock)

在讀取CommitLog時,雖然可以通過PageCache提高目标消息直接在實體記憶體中讀取的命中率。但是由于CommitLog存放的是所有Topic的消息,在讀取時是随機通路,是以仍會出現缺頁中斷問題,導緻記憶體被頻繁換入換出。為此,RocketMQ使用了mlock系統調用,将mmap調用後所占用的堆外記憶體鎖定,變為常駐記憶體,進一步讓目标消息更多的在記憶體中讀取。

mlock這個方法是一個Native級别的調用,調用了标準C庫的方法 mlock方法。在标準C中的實作是将鎖住指定的記憶體區域避免被作業系統調到swap空間中,

通過mmap建立的記憶體檔案,在開始時隻是建立一個映射關系,當讀取相應區域的時候,第一次還是會去讀磁盤,後續讀寫基本上與Page Cache互動。當讀相對應頁沒有拿到資料的時候,系統将會産生一個缺頁異常。madvise的作用是一次性先将一段資料讀入到映射記憶體區域,這樣就減少了缺頁異常的産生, 不過mlock和madvise在windows下的C庫沒有

madvise系統調用有兩個參數:位址指針、區間長度。madvise會向核心提供一個針對于程序虛拟位址區間的I/O建議,核心可能會采納這個建議,進行預讀。

RocketMQ使用net.java.dev.jna:jna:4.2.2,自己建立一個 LibC類繼承Library

import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.NativeLong;
import com.sun.jna.Platform;
import com.sun.jna.Pointer;

public interface LibC extends Library {
    LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);

    int MADV_WILLNEED = 3;
    int MADV_DONTNEED = 4;

    int MCL_CURRENT = 1;
    int MCL_FUTURE = 2;
    int MCL_ONFAULT = 4;

    /* sync memory asynchronously */
    int MS_ASYNC = 0x0001;
    /* invalidate mappings & caches */
    int MS_INVALIDATE = 0x0002;
    /* synchronous memory sync */
    int MS_SYNC = 0x0004;

    int mlock(Pointer var1, NativeLong var2);

    int munlock(Pointer var1, NativeLong var2);

    int madvise(Pointer var1, NativeLong var2, int var3);

    Pointer memset(Pointer p, int v, long len);

    int mlockall(int flags);

    int msync(Pointer p, NativeLong length, int flags);
}
複制代碼           

調用mmap()時核心隻是建立了邏輯位址到實體位址的映射表,并沒有映射任何資料到記憶體。 在你要通路資料時核心會檢查資料所在分頁是否在記憶體,如果不在,則發出一次缺頁中斷, linux預設分頁為4K,1G的消息存儲檔案要發生很多次中斷。

解決辦法:将madvise()和mmap()搭配起來使用,在使用資料前告訴核心這一段資料需要使用,将其一次讀入記憶體。 madvise()這個函數可以對映射的記憶體提出使用建議,進而減少在程式運作時的硬碟缺頁中斷。

mlock和munlock源碼:

public void mlock() {
    final long beginTime = System.currentTimeMillis();
    final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    {
        // 記憶體鎖定
        // 通過mlock可以将程序使用的部分或者全部的位址空間鎖定在實體記憶體中,防止其被交換到swap空間。
        // 對時間敏感的應用會希望全部使用實體記憶體,提高資料通路和操作的效率。
        int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
        log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }

    {
        //檔案預讀
        //madvise 一次性先将一段資料讀入到映射記憶體區域,這樣就減少了缺頁異常的産生。
        int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
        log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }
}

public void munlock() {
    final long beginTime = System.currentTimeMillis();
    final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize));
    log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
複制代碼           

3.7 清理(cleanup)

JDK預設公開釋放MappedByteBuffer的方法,隻能通過反射的方式

@Override
public boolean cleanup(final long currentRef) {
    if (this.isAvailable()) {
        log.error("this file[REF:" + currentRef + "] " + this.fileName
            + " have not shutdown, stop unmapping.");
        return false;
    }

    if (this.isCleanupOver()) {
        log.error("this file[REF:" + currentRef + "] " + this.fileName
            + " have cleanup, do not do it again.");
        return true;
    }

    clean(this.mappedByteBuffer);
    //加一個fileSize大小的負數值
    TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
    TOTAL_MAPPED_FILES.decrementAndGet();
    log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
    return true;
}
/**
 * 通過反射清理MappedByteBuffer
 * @param buffer
 */
public static void clean(final ByteBuffer buffer) {
    if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
        return;
    /**
     * 嵌套遞歸擷取directByteBuffer的最内部的attachment或者viewedBuffer方法
     * 擷取directByteBuffer的Cleaner對象,然後調用cleaner.clean方法,進行釋放資源
     *
     */
    invoke(invoke(viewed(buffer), "cleaner"), "clean");
}
private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
    return AccessController.doPrivileged(new PrivilegedAction<Object>() {
        public Object run() {
            try {
                Method method = method(target, methodName, args);
                method.setAccessible(true);
                return method.invoke(target);
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }
    });
}
private static ByteBuffer viewed(ByteBuffer buffer) {
    String methodName = "viewedBuffer";

    Method[] methods = buffer.getClass().getMethods();
    for (int i = 0; i < methods.length; i++) {
        if (methods[i].getName().equals("attachment")) {
            methodName = "attachment";
            break;
        }
    }

    ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
    if (viewedBuffer == null)
        return buffer;
    else
        return viewed(viewedBuffer);
}           

繼續閱讀