1. namesrv的来源
- namesrv整体来说和zookeeper的类似,为了保证消息的高可用,我们通常会布置多台broker,那么客户端在发送消息的时候怎么去选择,或者某一台宕机了,客户端怎么去感知到,namesrv就是为了解决这种问题,负责管理集群,broker,topic等元数据,动态的维护broker列表,剔除已经宕机的broker,根据负载均衡算法可以选择到一台broker。
2. 解析namesrv的启动流程
- 这个我们需要参考一下源码看一下启动方法和重要的参数信息:.NamesrvStartup
- 1,解析配置文件填充属性到NamesrvConfig和NettyServerConfig两个对象中,配置文件的来源是主要是在启动的时候通过-c 指定文件的。
- 2,根据启动的属性创建namesrvController对象,并且进行初始化
- 3,加载kv配置,并且创建netty网络传输对象,然后启动两个定时任务。
- namesrv每隔10s中扫描一次broker,根据最后一次的心跳检测时间剔除已经挂掉的broker;
- namesrv每隔10分钟打印一次kv配置信息
- 4,注册JVM 钩子函数并启动服务器,以便监听Broker、消息生产者的网络请求。
3. 两主两从的部署结构图,下面的解析都是按照这个图进行解析的

4. 解析路由元数据的数据结构
- 下面主要是namesrv主要记录了哪些元数据,其主要是在RouteInfoManager中的5个hashMap中
- HashMap> topicQueueTable : topic 消息队列路由信息
{
"topicTest": [
{
"brokerName": "brokerName-a",
"readQueueNums": 4,
"writeQueueNums": 4,
"perm": 6,
"topicSynFlag": 0
}
]
}
- HashMap brokerAddrTable:broker的集群信息,包含集群,name,地址
{
"broker-a": {
"cluster": "c1",
"brokerName": "broker-a",
"brokerAddrs": [
"0:192.168.8.101:10000",
"1:192.168.8.102:10000"
]
},
"broker-b": {
"cluster": "c1",
"brokerName": "broker-b",
"brokerAddrs": [
"0:192.168.8.103:10000",
"1:192.168.8.104:10000"
]
}
}
- HashMap> clusterAddrTable : Broker 集群信息
{
"c1": [
"broker-a",
"broker-b"
]
}
- HashMap brokerLiveTable : broker状态信息
{
"192.168.8.101:10000": {
"lastUpdateTimestamp": 20190909099999,
"dataVersion": {
"timestamp": 20190909099999,
"counter": 0
},
"Channel": "channel-a",
"haServerAddr": "192.168.8.102:10000"
},
"192.168.8.102:10000": {
"lastUpdateTimestamp": 20190909099999,
"dataVersion": {
"timestamp": 20190909099999,
"counter": 0
},
"Channel": "channel-a",
"haServerAddr": ""
},
"192.168.8.103:10000": {
"lastUpdateTimestamp": 20190909099999,
"dataVersion": {
"timestamp": 20190909099999,
"counter": 0
},
"Channel": "channel-a",
"haServerAddr": "192.168.8.104:10000"
},
"192.168.8.104:10000": {
"lastUpdateTimestamp": 20190909099999,
"dataVersion": {
"timestamp": 20190909099999,
"counter": 0
},
"Channel": "channel-a",
"haServerAddr": ""
}
}
- HashMap> filterServerTable : broker上的FilterServer列表。
{
"192.168.8.101:10000": ["192.168.9.101:10000"],
"192.168.8.102:10000": ["192.168.9.102:10000"]
}
4. 路由信息的注册
- RocketMQ的路由注册主要是broker与nameSrv的心跳功能检测,当broker启动的时候会向集群中所有的namesrv发送心跳语句,每隔30s向namesrv发送心跳包,namesrv收到心跳包之后会更新brokerLiveTable缓存中的lastUpdateTimestamp时间,然后nameSrv的定时任务会每隔10s扫描brokerLiveTable,如果超过120s没有收到心跳包则将其剔除并且关闭socket连接。
- 源码层次解析步骤
- 1,brokerController的start方法中发送心跳包注册registerBrokerAll,
- 2,调用BrokerOuterAPI中的registerBrokerAll中主要遍历所有的nameSrv列表,BrokerSrv依次向nameSrv列表发送心跳包
- 3,调用BrokerOuterAPI的registerBroker发送request信息,RocketMQ是基于netty传输的,如果我们需要网络跟踪,rocketMQ会为每个消息生成一个requestCode,然后服务端会有对应的网络处理器(processs包中),只需整库搜索 questCode 即可找到相应的处理逻辑
- 心跳包request的信息
- brokerAddr:地址信息
- brokerId:0代表master,大于0代表slave
- brokerName:名称
- clusterName:集群名称
- haServerAddr:master地址,初次请求时为空,slave向nameSrv注册之后返回
- requestBody:
- filterServerList:消息过滤列表
- topicConfigWrapper:主体配置
- body:消息体
- 心跳包request的信息
- nameSrv接收到网络请求之后开始进行处理,DefaultRequestProcessor默认的处理器,如果请求类型为 RequestCode REGISTER_BROKER,那么最终会被转发到RoutelnfoManager#registerBroker。
- 1,首先在方法开始加入一个写锁,防止并发的修改RouteInfoManager中的路由表,然后开始维护clusterAddrTable信息,首先判断broker所属的集群是不是已经存在,如果不存在则创建并且将broker名称放入到集合中。
- 2,维护BrokerData,首先从brokerAddrTable中根据brokerName获取broker信息,如果获取不到则将新建的brokerData信息放入并且将registerFirst设置为true,表示第一次注册,如果已经存在则将其替换并且将registerFirst设置为false,
- 3,如果broker为master,并且Broker Topic配置信息发生变化或者是初次注册,那么则需要创建或者更新topicQueueTable的信息,如果在发送消息的时候topic是不存在的,那么如果设置了brokerConfig中的autoCreateTopicEnable为true,就会返回一个rocketMQ的默认的路由信息。
- 4,更新brokerLiveInfo的最后更新时间,这个是路由删除的最重要的列表
- 5,注册Broker上的filter过滤列表
- 6,完成路由的注册
5. 路由信息的删除
- 路由删除的两种方式
- broker主动方式:当broker正常关机的时候,会发送一个unregisterBroker指令
- nameSrv主动方式:nameSrv在启动的时候,会开启两个定时任务,其中有一个是每隔10s扫描一次brokerLiveInfo列表,然后获取列表中每一个元素的lastupdateTime,如果超过120s没有收到broker的心跳包则会将其剔除列表关闭channel,
- 判断时间超过120s
- 申请写锁
- 根据brokerAddress删除brokerLiveTable和FilterServerTable
- 维护brokerAddrTable,遍历brokerAddrTable获取BrokeData,然后从brokeData中的属性brokerAddr(map集合)中剔除,如果剔除之后集合为空则从brokerAddrTable一处brokerName
-
根据 BrokerName,从 clusterAddrTable 中找到 Broker并从集群中移除,如果移
除后,集群中不包含任何 Broker,则将改集群从clusterAddrTable 中一处
- 根据 brokerName遍历所有主题的队列,如果队列中包含了当前 Broker的队列则移除,如果 topic 只包含待移除 Broker 的队列的话,从路由表中删除该 topic
-
释放写锁
然后申请一个写锁(防止并发更新)删除brokerLiveTable,FilterServerTable信息,然后维护brokerAddrTable,clusterAddrTable。
6. 路由信息的发现
- RocketMQ路由并非是实时的,当topic路由发生变化的时候,不会主动推送给客户端,而是由客户端定时的拉取最新的路由信息。
- 返回给客户端的对象是TopicRouteData,其结构是
orderTopicConf:顺序消息配置内容,来自于kvConfig
List<QueueData> queueDatas:topic队列元数据
List<BrokerData> brokerDatas:topic 分布的 broker 元数据
HashMap< String/ * brokerAdress*/,List<String> /* filt rServer* /> : broker 上过滤服务器地址列表
- 路由发现实现类DefaultRequestProcessor的getRoutelnfoByTopic
- 调用 RouterlnfoManager 的方法,从路由 topicQueueTable brokerAddrTable
- terServerTable 中分别填充 TopicRouteData 中的 List
filterServer 地址表
-
如果找到主题对应的路由信息并且该主题为顺序消息,则从 NameServer
KVconfig 中获取关于顺序消息相 的配置填充路由信息
- 如果找不到路由信息 CODE 则使用 TOPIC NOT_EXISTS ,表示没有找到对应的路由