天天看点

rocketMQ之StoreCheckpoint

一、 StoreCheckpoint 1、作用 记录commitLog、ConsumeQueue、Index文件的刷盘时间点,当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复 2、参数 private volatile long physicMsgTimestamp =  0; private volatile long logicsMsgTimestamp =  0; private volatile long indexMsgTimestamp = 0; physicMsgTimestamp为刷盘点commitLog最新一条记录的存储时间 logicsMsgTimestamp为刷盘点ConsumeQueue最新一条记录的存储时间 indexMsgTimestamp为刷盘点最近一个已经写完的index的最后一条记录时间   3、flush代码 public void flush() {     this.mappedByteBuffer.putLong(0,  this.physicMsgTimestamp);     this.mappedByteBuffer.putLong(8,  this.logicsMsgTimestamp);     this.mappedByteBuffer.putLong(16,  this.indexMsgTimestamp);     this.mappedByteBuffer.force(); } 4、flush触发 1)ConsumeQueue刷盘时会触发 2)当更换index文件时触发     二、文件刷盘 1、 ConsumeQueue数据进行刷盘(FlushConsumeQueueService线程 ) 频率参数 int interval =  DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); int flushConsumeQueueLeastPages =  DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); int flushConsumeQueueThoroughInterval =DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval() 每隔interval时间,线程执行一次。判断数据写入是否大于flushConsumeQueueLeastPages,如果大于就对ConsumeQueue文件进行数据刷盘。如果多次执行间隔大于flushConsumeQueueThoroughInterval,就会进行一次StoreCheckpoint数据的刷盘与ConsumeQueue文件刷盘   2、CommitLog数据刷盘 if (FlushDiskType.SYNC_FLUSH ==  defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {     this.flushCommitLogService = new  GroupCommitService(); } else {     this.flushCommitLogService = new  FlushRealTimeService(); }   this.commitLogService = new  CommitRealTimeService(); 如果broker配置的是同步刷盘,走GroupCommitService 如果broker配置的是异步刷盘,走FlushRealTimeService 如果broker还启动了TransientStorePool,先把数据提交到TransientStorePool中,然后通过commitLogService线程提交到 mappedByteBuffer,然后通过刷盘线程刷盘 2.1、FlushRealTimeService线程 int interval =  CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); int flushPhysicQueueLeastPages =  CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int  flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); 每隔interval时间,线程执行一次。判断数据写入是否大于flushPhysicQueueLeastPages,如果大于就对CommitLog文件进行数据刷盘。如果多次执行间隔大于flushPhysicQueueThoroughInterval也会对CommitLog文件刷盘。线程每执行一次都会更新StoreCheckpoint中的PhysicMsgTimestamp值   2.2、GroupCommitService 每次写入一个文件,会生成一个GroupCommitRequest对象,GroupCommitService会根据GroupCommitRequest做刷盘操作 2.3、CommitRealTimeService int interval =  CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); int commitDataLeastPages =  CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); int commitDataThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); 将TransientStorePool中的数据刷到fileChannel中                

继续阅读