天天看點

rocketMQ之StoreCheckpoint

一、 StoreCheckpoint 1、作用 記錄commitLog、ConsumeQueue、Index檔案的刷盤時間點,當上一次broker是異常結束時,會根據StoreCheckpoint的資料進行恢複 2、參數 private volatile long physicMsgTimestamp =  0; private volatile long logicsMsgTimestamp =  0; private volatile long indexMsgTimestamp = 0; physicMsgTimestamp為刷盤點commitLog最新一條記錄的存儲時間 logicsMsgTimestamp為刷盤點ConsumeQueue最新一條記錄的存儲時間 indexMsgTimestamp為刷盤點最近一個已經寫完的index的最後一條記錄時間   3、flush代碼 public void flush() {     this.mappedByteBuffer.putLong(0,  this.physicMsgTimestamp);     this.mappedByteBuffer.putLong(8,  this.logicsMsgTimestamp);     this.mappedByteBuffer.putLong(16,  this.indexMsgTimestamp);     this.mappedByteBuffer.force(); } 4、flush觸發 1)ConsumeQueue刷盤時會觸發 2)當更換index檔案時觸發     二、檔案刷盤 1、 ConsumeQueue資料進行刷盤(FlushConsumeQueueService線程 ) 頻率參數 int interval =  DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); int flushConsumeQueueLeastPages =  DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); int flushConsumeQueueThoroughInterval =DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval() 每隔interval時間,線程執行一次。判斷資料寫入是否大于flushConsumeQueueLeastPages,如果大于就對ConsumeQueue檔案進行資料刷盤。如果多次執行間隔大于flushConsumeQueueThoroughInterval,就會進行一次StoreCheckpoint資料的刷盤與ConsumeQueue檔案刷盤   2、CommitLog資料刷盤 if (FlushDiskType.SYNC_FLUSH ==  defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {     this.flushCommitLogService = new  GroupCommitService(); } else {     this.flushCommitLogService = new  FlushRealTimeService(); }   this.commitLogService = new  CommitRealTimeService(); 如果broker配置的是同步刷盤,走GroupCommitService 如果broker配置的是異步刷盤,走FlushRealTimeService 如果broker還啟動了TransientStorePool,先把資料送出到TransientStorePool中,然後通過commitLogService線程送出到 mappedByteBuffer,然後通過刷盤線程刷盤 2.1、FlushRealTimeService線程 int interval =  CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); int flushPhysicQueueLeastPages =  CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int  flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); 每隔interval時間,線程執行一次。判斷資料寫入是否大于flushPhysicQueueLeastPages,如果大于就對CommitLog檔案進行資料刷盤。如果多次執行間隔大于flushPhysicQueueThoroughInterval也會對CommitLog檔案刷盤。線程每執行一次都會更新StoreCheckpoint中的PhysicMsgTimestamp值   2.2、GroupCommitService 每次寫入一個檔案,會生成一個GroupCommitRequest對象,GroupCommitService會根據GroupCommitRequest做刷盤操作 2.3、CommitRealTimeService int interval =  CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); int commitDataLeastPages =  CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); int commitDataThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); 将TransientStorePool中的資料刷到fileChannel中                

繼續閱讀