天天看点

RocketMQ(非商业版)组件介绍及基本原理

作者:码农小头条

RocketMQ基本组成

RocketMQ架构由以下这几个组件组成(可跟图中对应):

NameServer : 提供轻量级的Broker路由服务,主要包括两个功能:Broker管理,Na meServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提 供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关 于Broker集群的整个路由信息和用于客户端查询的队列信息。各个NameServer之间不 相互通信。

Broker:实际处理消息存储、转发等服务的核心组件。

Producer:消息生产者集群。通常是业务系统中的一个功能模块。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递。

Consumer:消息消费者集群。通常也是业务系统中的一个功能模块。

RocketMQ(非商业版)组件介绍及基本原理

RocketMQ架构模型图

RocketMQ把示一类消息的集合称之为Topic,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。Topic只是一个逻辑概念,并不实际保存消息。同一个Topic下的消息,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。MessageQueue是一个具有FIFO特性的队列结构,生产者发送消息与消费者消费消息的最小单位。

RocketMQ主要由 Producer、Broker、Consumer 、NameServer四部分组成,其中Producer负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,NameServer负责管理Broker以及给Producer和Consumer提供相互的路由信息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中(如下图所示)。ConsumerGroup由多个Consumer 实例构成。

RocketMQ(非商业版)组件介绍及基本原理

基础概念图

RocketMQ消息类型

基本消息:可以使用消息生产者分别通过三种方式发送消息,同步发送(等待消息返回后再继续进行下面的操作)、异步发送(所有消息回调方法都执行完了再关闭Producer)以及单向发送(没有返回值,也没有回调。就是只管把消息发出去就行了)。

顺序消息:消息有序指的是可以按照消息的发送顺序来消费(FIFO),RocketMQ保证的是消息的局部有序,而不是全局有序。在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。

广播消息:在集群状态下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。

延迟消息:延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。开源版本的RocketMQ中,只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

批量消息:批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。批量消息的使用是有一定限制的(比如消息大小不超过4M,官网建议不超过1M),这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。

过滤消息:可以使用Message的Tag属性来简单快速的过滤信息;可以使用SQL表达式来对消息进行过滤。

事务消息:事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别的。事务消息共有三种状态,提交状态、回滚状态、中间状态(未提交状态),事务消息的机制如下图:

RocketMQ(非商业版)组件介绍及基本原理

事务消息处理机制

读队列与写队列

RocketMQ的MessageQueue分为读写队列(默认是4个,值可以修改),写队列会真实的创建对应的存储文件,负责消息写入。而读队列会记录Consumer的Offset,负责消息读取。在往写队列里写Message时,会同步写入到一个对应的读队列中。如果写队列个数大于读队列,就会有一部分写队列无法写入到读队列中,这一部分的消息就无法被读取,就会造成消息丢失;反之,会造成消费者空转,极大的浪费性能。

只有一种情况下可以考虑将读写队列设置为不一致,就是要对Topic的MessageQueue进行缩减的时候。例如原来四个队列,现在要缩减成两个队列。如果立即缩减读写队列,那么被缩减的MessageQueue上没有被消费的消息,就会丢失。这时,可以先缩减写队列,待空出来的读队列上的消息都被消费完了之后,再来缩减读队列,这样就可以比较平稳的实现队列缩减了。

消息持久化

RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定。

存储日志文件主要分为以下三个部分:

CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。CommitLog文件存储所有消息实体。所有生产者发过来的消息,都会无差别的依次存储到Commitlog文件当中。这样的好处是可以减少查找目标文件的时间,让消息以最快的速度落盘。

ConsumerQueue:存储消息在CommitLog的索引。ConsumeQueue文件主要是加速消费者的消息索引。他的每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ComsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件当中的消费进度,会保存在config/consumerOffset.json文件当中。每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成,数据块的内容包括:msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的长度)+msgTagCode(8byte,消息的tag的Hash值)。

IndexFile:IndexFile文件主要是辅助消息检索。为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程。

RocketMQ(非商业版)组件介绍及基本原理

存储日志文件主要部分流程

还有3个辅助的存储文件:

checkpoint:数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳。

config/*.json:这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。

abort:这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。

过期文件删除

RocketMQ内部有一个定时任务,对文件进行扫描,并且触发文件删除的操作。用户可以指定文件删除操作的执行时间。在broker.conf中deleteWhen属性指定。默认是凌晨四点。另外,RocketMQ还会检查服务器的磁盘空间是否足够,如果磁盘空间的使用率达到一定的阈值,也会触发过期文件删除。所以RocketMQ官方就特别建议,broker的磁盘空间不要少于4G。

高效文件写

零拷贝技术加速文件读写:零拷贝(zero-copy)是操作系统层面提供的一种加速文件读写的操作机制,非常多的开源软件都在大量使用零拷贝,来提升IO操作的性能。对于Java应用层,对应着mmap和sendFile两种方式。原理参考 https://blog.csdn.net/weixin_42096901/article/details/103017044

顺序写加速文件写入磁盘:通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片。所以我们去写一个文件时,也就无法把一个文件写在一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写。这个过程中有大量的寻址操作,会严重影响写数据的性能。而顺序写机制是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。

刷盘机制保证消息不丢失:在操作系统层面,当应用程序写入一个文件时,文件内容并不会直接写入到硬件当中,而是会先写入到操作系统中的一个缓存PageCache中。PageCache缓存以4K大小为单位,缓存文件的具体内容。这些写入到PageCache中的文件,在应用程序看来,是已经完全落盘保存好了的,可以正常修改、复制等等。但是,本质上,PageCache依然是内存状态,所以一断电就会丢失。因此,需要将内存状态的数据写入到磁盘当中,这样数据才能真正完成持久化,断电也不会丢失。这个过程就称为刷盘。RocketMQ对于何时进行刷盘,也设计了两种刷盘机制,同步刷盘和异步刷盘。

消息主从复制

同步复制:同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。在同步复制下,如果Master节点故障,Slave上有全部的数据备份,这样容易恢复数据。但是同步复制会增大数据写入的延迟,降低系统的吞吐量。

异步复制:异步复制是只要master写入消息成功,就反馈给客户端写入成功的状态。然后再异步的将消息复制给Slave节点。在异步复制下,系统拥有较低的延迟和较高的吞吐量。但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失。

负载均衡

Producer负载均衡:Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。

Consumer负载均衡:广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。Consumer也是以MessageQueue为单位来进行负载均衡。在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可,有如下几种模式,

AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者。

AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。

AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。

AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。

AllocateMessageQueueConsitentHashTest:这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀。

消息重试和死信队列

消息重试:重试的消息会进入一个 “%RETRY%”+ConsumeGroup 的队列中,后RocketMQ默认允许每条消息最多重试16次,如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列。

死信队列:死信队列的名称是%DLQ%+ConsumGroup,一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例;如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列;一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic;死信队列中的消息不会再被消费者正常消费;死信队列的有效期跟正常消息相同,默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

备注

本文文字摘抄于图灵学院课堂讲义。

继续阅读