前文回顾:RocketMQ知识盘点【壹】_Producer和NameServer。
1.Broker
一个topic拥有多个消息队列,一个Broker默认为每个topic创建4个读队列和4个写队列。多个Broker组成集群,brokerName相同的多个Broker组成master-slave架构。brokerId为0的为master,大于0为slave。
对于一个Broker,它的存储目录是这样的
下面做逐个解释。
2.消息存储
rocketMq消息存储文件分为CommitLog文件、ConsumeQueue文件和IndexFile文件等。其中:
CommitLog存储所有topic的消息(类似mysql的redo log);
ConsumeQueue为消息消费队列,消息到达CommitLog后会异步转发到这里供消费者消费;
IndexFile消息索引文件,存储消息key和offset的关系;
事务状态服务:存储每条消息事务状态;
定时消息服务:因每个延迟队列对应一个消息消费队列,这里存储每个延迟队列的拉取进度。
2.1 CommitLog
储存地址为/store/commitlog目录,每个文件默认1G,文件写满则创建新文件,以该文件第一个消息的全局物理偏移量为文件名,偏移量小于20用0补齐。
CommitLog文件存储格式为,每条消息前4个字节存储该消息的总长度,之后存储该消息的其他信息。通过这样的设计,可以根据偏移量和消息长度查找消息。首先根据偏移量找到所在的物理偏移量文件,然后用offset和文件长度取余得到文件中的偏移量,从该位置读取size长度内容即可得到消息。
2.2 ConsumeQueue
基于topic模式实现消息消费,因同一topic消息不连续地保存在CommitLog文件中,因此设计这个ConsumeQueue文件,可理解为索引。单个ConsumeQueue文件默认存储30W个条目。每个条目的长度是固定的,因此也通过物理偏移量来获取多个条目。
文件结构:
还可以根据时间戳来查找,基于文件修改时间。
2.3 Index文件
IndexHead
- beginTimestamp:落broker开始时间
- endTimestamp:落broker截止时间
- beginPhyoffset:开始偏移量
- endPhyoffset:截止偏移量
- hashSlotcount:槽数量
- indexcount:当前索引总数
slotTable数组
槽位置 = key的hashCode % 槽数量,每个槽记录的是当前索引数
indexLinkedList链表
- hashCode:key的哈希值
- pyhoffset:物理偏移地址
- timedif:落盘时间
- preIndexNo:hash冲突后上一个索引地址
通过messageId查找,可以直接解析出broker和物理偏移地址,直接查commitLog即可;
通过messageKey查找,则用index文件通过key定位slot,从索引最大值倒序查找,对比hash值和落盘时间返回物理偏移地址,再取commitLog查找。
2.4 保存策略
简单来说分为同步和异步。
同步是指消息追加到内存映射文件的内存中后,立即刷盘并返回结果,默认超时时间为5s。
异步是会申请一个和commitLog同样大小的堆外内存,消息先追加到堆外内存,然后再提交到内存映射文件的内存,最后刷盘。默认10s强制刷一次,每1000条消息强制刷一次。
再多说一句,ConsumeQueue文件和Index文件都是消息保存到CommitLog文件后,通过ReputMessageService线程池异步保存的。
2.5 过期文件删除
3种情况触发:
1.磁盘空间不足;
2.凌晨4点(deleteWhen)删除过期文件(默认过期时间为72小时,fileReservedTime配置);
3.手动触发。executeDeleteFilesManualy
RocketMQ知识盘点【壹】_Producer和NameServer
RocketMQ知识盘点【贰】_Broker和消息存储
RocketMQ知识盘点【叁】_Consumer
RocketMQ知识盘点【肆】_最佳实践