天天看点

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

(图片来自网络)

通过上图可以看到,RocketMQ的核心组件主要包括4个,分别是NameServer、Broker、Producer和Consumer,下面我们先依次简单说明下这四个核心组件:

NameServer:NameServer充当路由信息的提供者。生产者或消费者能够通过NameServer查找各Topic相应的Broker IP列表。多个Namesrver实例组成集群,但相互独立,没有信息交换。

Broker:消息中转角色,负责存储消息、转发消息。Broker服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

除了上面说的三个核心组件外,还有Topic这个概念下面也会多次提到:

Topic:表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。一个Topic可以分片在多个Broker集群上,每一个Topic分片包含多个queue,具体结构可以参考下图:

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

RocketMQ是基于主题的发布与订阅模式,核心功能包括消息发送、消息存储、消息消费,整体设计追求简单与性能第一,归纳来说主要是下面三种:

NameServer取代ZK充当注册中心,NameServer集群间互不通信,容忍路由信息在集群内分钟级不一致,更加轻量级;

使用内存映射机制实现高效的IO存储,达到高吞吐量;

容忍设计缺陷,通过ACK确保消息至少消费一次,但是如果ACK丢失,可能消息重复消费,这种情况设计上允许,交给使用者自己保证。

本文重点介绍的就是NameServer,我们下面一起来看下NameServer是如何启动以及如何进行路由管理的。

在第一章已经简单介绍了NameServer取代zk作为一种更轻量级的注册中心充当路由信息的提供者。那么具体是如何来实现路由信息管理的呢?我们先看下图:

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

上面的图描述了NameServer进行路由注册、路由剔除和路由发现的核心原理。

路由注册:Broker服务器在启动的时候会想NameServer集群中所有的NameServer发送心跳信号进行注册,并会每隔30秒向nameserver发送心跳,告诉NameServer自己活着。NameServer接收到Broker发送的心跳包之后,会记录该broker信息,并保存最近一次收到心跳包的时间。

路由剔除:NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。

路由发现:路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用(这块内容会在后续介绍producer消息发送时介绍,本文不展开讲解)。

高可用:NameServer通过部署多台NameServer服务器来保证自身的高可用,同时多个NameServer服务器之间不进行通信,这样路由信息发生变化时,各个NameServer服务器之间数据可能不是完全相同的,但是通过发送端的容错机制保证消息发送的高可用。这个也正是NameServer追求简单高效的目的所在。

在整理了解了NameServer的架构设计之后,我们先来看下NameServer到底是如何启动的呢?

既然是源码解读,那么我们先来看下代码入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[] args),实际调用的是main0()方法,

代码如下:

通过main方法启动NameServer,主要分为两大步,先创建NamesrvController,然后再初始化并启动NamesrvController。我们分别展开来分析。

具体展开阅读代码之前,我们先通过一个序列图对整体流程有个了解,如下图:

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

先来看核心代码,如下:

通过上面对每一行代码的注释,可以看出来,创建NamesrvController的过程主要分为两步:

Step1:通过命令行中获取配置。赋值给NamesrvConfig和NettyServerConfig类。 Step2:根据配置类NamesrvConfig和NettyServerConfig构造一个NamesrvController实例。

可见NamesrvConfig和NettyServerConfig是想当重要的,这两个类分别是NameServer的业务参数和网络参数,我们分别看下这两个类里面有哪些属性:

NamesrvConfig

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

NettyServerConfig

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料
注:Apache Commons CLI是开源的命令行解析工具,它可以帮助开发者快速构建启动命令,并且帮助你组织命令的参数、以及输出列表等。

创建了NamesrvController实例之后,开始初始化并启动NameServer。

首先进行初始化,代码入口是NamesrvController#initialize。

上面的代码是NameServer初始化流程,通过每行代码的注释,可以看出来,主要有5步骤操作:

Step1:加载KV配置,并写入到KVConfigManager的configTable属性中;

Step2:初始化netty服务器;

Step3:初始化处理netty网络交互数据的线程池;

Step4:注册心跳机制线程池,启动5秒后每隔10秒检测一次Broker的存活情况;

Step5:注册打印KV配置的线程池,启动1分钟后,每隔10分钟打印一次KV配置。

RocketMQ的开发团队还使用了一个常用的编程技巧,就是使用JVM钩子函数对NameServer进行优雅停机。这样在JVM进程关闭前,会先执行shutdown操作。

执行start函数,启动NameServer。代码比较简单,就是将第一步中创建的netty server进行启动。其中remotingServer.start()方法不展开详细说明了,需要对netty比较熟悉,不是本篇文章重点,有兴趣的同学可以自行下载源码阅读。

我们在第二章开篇有了解到NameServer作为一个轻量级的注册中心,主要是为消息生产者和消费者提供Topic的路由信息,并对这些路由信息和Broker节点进行管理,主要包括路由注册、路由剔除和路由发现。

本章将会通过源码的角度来具体分析NameServer是如果进行路由信息管理的。核心代码主要都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中实现。

在了解路由信息管理之前,我们首先需要了解NameServer到底存储了哪些路由元信息,数据结构分别是什么样的。

查看代码我们可以看到主要通过5个属性来维护路由元信息,如下:

我们依次对这5个属性进行展开说明。

说明:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡。

数据结构:HashMap结构,key是Topic名字,value是一个类型是QueueData的队列集合。在第一章就讲过,一个Topic中有多个队列。QueueData的数据结构如下:

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

数据结构示例:

说明:Broker基础信息,包含BrokerName、所属集群名称、主备Broker地址。

数据结构:HashMap结构,key是BrokerName,value是一个类型是BrokerData的对象。BrokerData的数据结构如下(可以结合下面Broker主从结构逻辑图来理解):

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

Broker主从结构逻辑图:

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

说明:Broker集群信息,存储集群中所有Broker名称。

数据结构:HashMap结构,key是ClusterName,value是存储BrokerName的Set结构。

说明:Broker状态信息。NameServer每次收到心跳包时会替换该信息

数据结构:HashMap结构,key是Broker的地址,value是BrokerLiveInfo结构的该Broker信息对象。BrokerLiveInfo的数据结构如下:

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

说明:Broker上的FilterServer列表,消息过滤服务器列表,后续介绍Consumer时会介绍,consumer拉取数据是通过filterServer拉取,consumer向Broker注册。

数据结构:HashMap结构,key是Broker地址,value是记录了filterServer地址的List集合。

路由注册是通过Broker和NameServer之间的心跳功能来实现的。主要分为两步:

Step1: Broker启动时向集群中所有NameServer发送心跳语句,每隔30秒(默认30s,时间间隔在10秒到60秒之间)再发一次。 Step2: NameServer收到心跳包更新topicQueueTable,brokerAddrTable,brokerLiveTable,clusterAddrTable,filterServerTable。

我们分别展开分析这两步。

发送心跳包的核心逻辑是在Broker启动逻辑里,代码入口是org.apache.rocketmq.broker.BrokerController#start,本篇文章重点关注的是发送心跳包的逻辑实现,只列出发送心跳包的核心代码,如下:

1)创建了一个线程池注册Broker,程序启动10秒后执行,每隔30秒(默认30s,时间间隔在10秒到60秒之间,BrokerConfig.getRegisterNameServerPeriod()的默认值是30秒)执行一次。

2)封装Topic配置和版本号之后,进行实际的路由注册(注:封装Topic配置不是本篇重点,会在介绍Broker源码时重点讲解)。实际路由注册是在org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll中实现,核心代码如下:

从上面代码来看,也比较简单,首先需要封装请求包头和requestBody,然后开启多线程到每个NameServer服务器去注册。

请求包头类型为RegisterBrokerRequestHeader,主要包括如下字段:

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

requestBody类型是RegisterBrokerBody,主要包括如下字段:

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

1)实际的路由注册是通过registerBroker方法实现,核心代码如下:

borker和NameServer之间通过netty进行网络传输,Broker向NameServer发起注册时会在请求中添加注册码RequestCode.REGISTER_BROKER。这是一种网络跟踪方法,RocketMQ的每个请求都会定义一个requestCode,服务端的网络处理器会根据不同的requestCode进行影响的业务处理。

Broker发出路由注册的心跳包之后,NameServer会根据心跳包中的requestCode进行处理。NameServer的默认网络处理器是DefaultRequestProcessor,具体代码如下:

判断requestCode,如果是RequestCode.REGISTER_BROKER,那么确定业务处理逻辑是注册Broker。根据Broker版本号选择不同的方法,我们已V3_0_11以上为例,调用registerBrokerWithFilterServer方法进行注册主要步骤分为三步:

解析requestHeader并验签(基于crc32),判断数据是否正确; 解析Topic信息; Step3: 调用RouteInfoManager#registerBroker来进行Broker注册;

核心注册逻辑是由RouteInfoManager#registerBroker来实现,核心代码如下:

通过上面的源码分析,可以分解出一个Broker的注册主要分7步:

Step1:加写锁,防止并发写RoutInfoManager中的路由表信息;

Step2:判断Broker所属集群是否存在,不存在需要创建,并将Broker名加入到集群Broker集合中;

Step3:维护BrokerData;

Step4:如过Broker是Master,并且Broker的Topic配置信息发生变化或者是首次注册,需要创建或更新Topic路由元数据,填充TopicQueueTable;

Step5:更新BrokerLivelnfo;

Step6:注册Broker的filterServer地址列表;

Step7:如果此Broker为从节点,则需要查找Broker Master的节点信息,并更新对应masterAddr属性,并返回给Broker端。

路由剔除的触发条件主要有两个:

NameServer每隔10s扫描BrokerLiveTable,连续120s没收到心跳包,则移除该Broker并关闭socket连接; Broker正常关闭时触发路由删除。

上面描述的触发点最终删除路由的逻辑是一样的,统一在RouteInfoManager#onChannelDestroy

中实现,核心代码如下:

路由删除整体逻辑主要分为6步:

Step1:加readlock,通过channel从BrokerLiveTable中找出对应的Broker地址,释放readlock,若该Broker已经从存活的Broker地址列表中被清除,则直接使用remoteAddr。

Step2:申请写锁,根据BrokerAddress从BrokerLiveTable、filterServerTable移除。

Step3:遍历BrokerAddrTable,根据BrokerAddress找到对应的brokerData,并将brokerData中对应的brokerAddress移除,如果移除后,整个brokerData的brokerAddress空了,那么将整个brokerData移除。

Step4:遍历clusterAddrTable,根据第三步中获取的需要移除的BrokerName,将对应的brokerName移除了。如果移除后,该集合为空,那么将整个集群从clusterAddrTable中移除。

Step5:遍历TopicQueueTable,根据BrokerName,将Topic下对应的Broker移除掉,如果该Topic下只有一个待移除的Broker,那么该Topic也从table中移除。

Step6:释放写锁。

从上面可以看出,路由剔除的整体逻辑比较简单,就是单纯地针对路由元信息的数据结构进行操作。为了大家能够更好地理解这块代码,建议大家对照4.1中介绍的路由元信息的数据结构来进行代码走读。

当路由信息发生变化之后,NameServer不会主动推送给客户端,而是等待客户端定期到nameserver主动拉取最新路由信息。这种设计方式降低了NameServer实现的复杂性。

producer在启动后会开启一系列定时任务,其中有一个任务就是定期从NameServer获取Topic路由信息。代码入口是MQClientInstance#start-ScheduledTask(),核心代码如下:

producer和NameServer之间通过netty进行网络传输,producer向NameServer发起的请求中添加注册码

RequestCode.GET_ROUTEINFO_BY_TOPIC。

NameServer收到producer发送的请求后,会根据请求中的requestCode进行处理。处理requestCode同样是在默认的网络处理器DefaultRequestProcessor中进行处理,最终通过RouteInfoManager#pickupTopicRouteData来实现。

TopicRouteData结构

在正式解析源码前,我们先看下NameServer返回给producer的数据结构。通过代码可以看到,返回的是一个TopicRouteData对象,具体结构如下:

深入剖析RocketMQ源码-NameServer一、RocketMQ架构简介二、NameServer架构设计三、 启动流程四、路由管理五、小结六、参考资料

其中QueueData,BrokerData,filterServerTable在4.1章节介绍路由元信息时有介绍。

源码分析

在了解了返回给producer的TopicRouteData结构后,我们进入RouteInfoManager#pickupTopicRouteData方法来看下具体如何实现。

上面代码封装了TopicRouteData的queueDatas、BrokerDatas和filterServerTable,还有orderTopicConf字段没封装,我们再看下这个字段是在什么时候封装的,我们向上看RouteInfoManager#pickupTopicRouteData的调用方法DefaultRequestProcessor#getRouteInfoByTopic如下:

结合这两个方法,我们可以总结出查找Topic路由主要分为3个步骤:

调用RouteInfoManager#pickupTopicRouteData,从topicQueueTable,brokerAddrTabl,filterServerTable中获取信息,分别填充queue-Datas、BrokerDatas、filterServerTable。 如果topic为顺序消息,那么从KVconfig中获取关于顺序消息先关的配置填充到orderTopicConf中。 如果找不到路由信息,那么返回code为ResponseCode.TOPIC_NOT_EXIST。

本篇文章主要是从源码的角度给大家介绍了RocketMQ的NameServer,包括NameServer的启动流程、路由注册、路由剔除和路由发现。我们在了解了NameServer的设计原理之后,也可以回过头思考下在设计过程中一些值得我们学习的小技巧,在此我抛砖引玉提出两点:

启动流程注册JVM钩子用于优雅停机。这是一个编程技巧,我们在实际开发过程中,如果有使用线程池或者一些常驻线程任务时,可以考虑通过注册JVM钩子的方式,在JVM关闭前释放资源或者完成一些事情来保证优雅停机。

更新路由表时需要通过加锁防止并发操作,这里使用的是锁粒度较少的读写锁,允许多个消息发送者并发读,保证消息发送时的高并发,但同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行,这也是读写锁经典使用场景。

1、《RocketMQ技术内幕》

2、《RocketMQ核心原理和实践》

3、Apache RocketMQ开发者指南

作者:vivo互联网服务器团队-Ye Wenhao

继续阅读