天天看点

rocketmq的broker源码解读五(刷盘)5)刷盘

5)刷盘

在2.1.1.3)小节中,提到了启动刷盘线程,这里进行详述;commitLog默认异步刷盘,用的是FlushRealTimeService实例,同步刷盘用的是GroupCommitService实例;将数据从堆外内存writeBuffer提交至FileChannel的线程是commitRealTimeService;consumeQueue刷盘用的是FlushConsumeQueueService实例;commitLog的刷盘线程和提交线程都有两种醒来方式:超时醒来和存储消息的线程唤醒;而cq的刷盘线程只会超时醒来;但都是循环执行;

5.1)commitLog的异步刷盘

5.1.1)FlushRealTimeService#run方法

先获取刷盘间隔500ms,刷盘脏页最小值4页,以及强制刷盘间隔时间10s;

首先判断若距上一次刷盘已过去了10s,则当下次刷盘时,执行强制刷盘,不考虑脏页最小值;

接着调用GroupCommitService#onWaitEnd方法,其中又会调用countDownLatch的await方法,执行超时阻塞,阻塞时间为500ms,即每隔500ms刷一次盘;也可以被执行存储的业务线程唤醒;

醒来后,会执行commitLog的mappedFileQueue的flush方法,入参为脏页最小值;MappedFileQueue#flush方法分为三步;

a)MappedFileQueue#findMappedFileByOffset,入参为当前mfq的flushedWhere,目的是找到flushedWhere位点所在的mappedFile文件;

b)MappedFile#flush返回更新后的flushedPosition,该值可能没变,因为没数据可刷;

b.1)isAbleToFlush方法中以下三种情况之一可刷盘

b.1.1)mappedFile满了;

b.1.2)(写位点减去已刷盘位点)/OS_PAGE_SIZE大于入参脏页最小值;

b.1.3)入参脏页最小值为0,此时只要写位点wrotePosition大于已刷盘位点flushedPosition就刷盘;

b.2)hold方法, refCount计数加1,保证刷盘过程中不能释放当前mappedFile资源(其实就是判断若refCount大于0,则禁止刷盘);

b.3)执行fireChannel.force或mappedByteBuffer.force方法;

b.4)最后更新flushedPosition为wrotePosition,执行release方法即refCount计数减1释放mappedFile;

c)比较flushedWhere与(flushedPosition+文件名),若一样,则代表本次没有数据刷入磁盘;

退出flush方法后,会更新storeCheckPoint的存储时间戳;当最后退出run方法中的while循环,还会再执行刷盘,入参为0,保证脏页都能刷进磁盘;

5.2)commitLog的同步刷盘

5.2.1)GroupCommitService#run方法

5.2.1.1)阻塞

最开始broker启动后,会启动刷盘线程,生产者并没有发送消息给broker,此时groupCommitService线程的run方法中会调用countDownLatch的await(10)方法陷入超时阻塞,超时时间为10ms,也可以被执行存储的业务线程唤醒;

5.2.1.2)swapRequests方法

醒来后会执行onWaitEnd方法,其中会swapRequests方法,交换List 类型的requestsRead和requestsWrite指向;

5.2.1.3)doCommit方法

由于储存消息的线程往requestsWrite集合中添加了GroupCommitRequest实例,紧接着唤醒了groupCommitService线程,而groupCommitService线程随后交换了requestsRead和requestsWrite,所以requestsWrite集合为空了,requestsRead集合不为空;

a)若requestsRead不为空,则遍历requestsRead,拿到每一个请求实例GroupCommitRequest;若刷盘的位点小于最新的偏移位点,则代表有新加入的数据可刷,接着调用mappedFileQueue.flush(0)将全部脏数据刷至磁盘;若刷盘位点等于最新的偏移位点,则表示数据全部刷完;此时调用req.wakeCustomer方法,将刷盘结果存入CompletableFuture实例中,此时调用CompletableFuture#get方法的写线程拿到结果,从而被唤醒;最后清理requsetsRead集合,方便下一次交换后使用;

b)若requestsRead为空则直接执行mappedFileQueue.flush(0)方法,将脏数据全都刷盘,实际上没有任何数据可刷;

groupCommitService线程接着执行10毫秒的sleep(等待存储消息的线程将request放进队列),再次执行swapRequests方法和doCommit方法,随后再次循环;

5.3)CommitLog#handleDiskFlush

异步刷盘直接调用countDownLatch的countDown方法,唤醒flushRealTimeService线程;

同步刷盘则稍微复杂,执行存储的业务线程分为两步:

a)封装下一次需要被刷盘的数据大小至GroupCommitRequest实例中,并调用GroupCommitService#putRequest方法,将请求提交至requestsWrite集合,执行countDown方法唤醒GroupCommitService线程;

b)存储消息的线程提交请求之后,调用CompletableFuture#get方法,将自己挂起,超时时间是5秒,当刷盘线程处理完返回结果存进同一个CompletableFuture实例中后,存储消息的线程被唤醒;

5.4)commitRealTimeService线程启动

在存储消息时,会判断是否使用堆外内存池,若没有,则直接将消息追加MappedFile对应的MappedByteBuffer中,再刷至磁盘;若使用了堆外内存,则先将数据从writeBuffer提交至FileChannel,再刷至磁盘;提交操作单独有一个线程CommitRealTimeService,默认每200毫秒将writeBuffer中的内容提交至FileChannel中,刷盘线程FlushRealTimeService默认每500毫秒将FileChannel新追加的数据(wrotePosition-flushedPosition)通过调用FileChannel#force方法将数据刷至磁盘。

写线程的handleDiskFlush方法中判断若为异步刷盘,则再判断是否启用了堆外内存,若启用了,则执行commitLogService.wakeup()方法,唤醒commitRealTimeService线程,而让flushRealTimeService线程自动超时醒来;若没启用堆外内存则执行flushCommitLogService.wakeup()方法立即唤醒commitLog异步刷盘线程;

至此,commitLog同步与异步刷盘以及与堆外内存有关的提交线程分析完了,其实这三个线程在循环中都是超时阻塞;

从前面分析可知,GroupCommitService和FlushRealTimeService线程只能有一个可以启动,而CommitRealTimeService线程则一定会启动;

5.5)consumeQueue的刷盘

consumeQueue刷盘用的是FlushConsumeQueueService实例;找到其run方法;

先拿到默认的间隔时间为1s;再调用waitForRunning方法执行countDownLatch的await方法进行超时阻塞,超时时间为1s;醒来后会执行doFlush方法,该方法中先拿到默认的cq最少刷盘页数为2,再拿到默认的刷盘最长间隔60s,若距上次刷盘已超过60s,则接下来将cq上所有脏数据刷完,否则还是按照最少2页刷;遍历consumeQueueTable,执行每个ConsumeQueue实例的flush方法,其中又会调用当前ConsumeQueue实例中的mappedFileQueue的flush方法;