天天看点

RocketMQ-笔记 1:namesrv1. namesrv的来源2. 解析namesrv的启动流程3. 两主两从的部署结构图,下面的解析都是按照这个图进行解析的4. 解析路由元数据的数据结构4. 路由信息的注册5. 路由信息的删除6. 路由信息的发现7.总结

1. namesrv的来源

  • namesrv整体来说和zookeeper的类似,为了保证消息的高可用,我们通常会布置多台broker,那么客户端在发送消息的时候怎么去选择,或者某一台宕机了,客户端怎么去感知到,namesrv就是为了解决这种问题,负责管理集群,broker,topic等元数据,动态的维护broker列表,剔除已经宕机的broker,根据负载均衡算法可以选择到一台broker。

2. 解析namesrv的启动流程

  • 这个我们需要参考一下源码看一下启动方法和重要的参数信息:.NamesrvStartup
    • 1,解析配置文件填充属性到NamesrvConfig和NettyServerConfig两个对象中,配置文件的来源是主要是在启动的时候通过-c 指定文件的。
    • 2,根据启动的属性创建namesrvController对象,并且进行初始化
    • 3,加载kv配置,并且创建netty网络传输对象,然后启动两个定时任务。
      • namesrv每隔10s中扫描一次broker,根据最后一次的心跳检测时间剔除已经挂掉的broker;
      • namesrv每隔10分钟打印一次kv配置信息
    • 4,注册JVM 钩子函数并启动服务器,以便监听Broker、消息生产者的网络请求。

3. 两主两从的部署结构图,下面的解析都是按照这个图进行解析的

RocketMQ-笔记 1:namesrv1. namesrv的来源2. 解析namesrv的启动流程3. 两主两从的部署结构图,下面的解析都是按照这个图进行解析的4. 解析路由元数据的数据结构4. 路由信息的注册5. 路由信息的删除6. 路由信息的发现7.总结

4. 解析路由元数据的数据结构

  • 下面主要是namesrv主要记录了哪些元数据,其主要是在RouteInfoManager中的5个hashMap中
    • HashMap> topicQueueTable : topic 消息队列路由信息
{
    "topicTest": [
        {
            "brokerName": "brokerName-a",
            "readQueueNums": 4,
            "writeQueueNums": 4,
            "perm": 6,
            "topicSynFlag": 0
        }
    ]
}           
  • HashMap brokerAddrTable:broker的集群信息,包含集群,name,地址
{
    "broker-a": {
        "cluster": "c1",
        "brokerName": "broker-a",
        "brokerAddrs": [
            "0:192.168.8.101:10000",
            "1:192.168.8.102:10000"
        ]
    },

    "broker-b": {
        "cluster": "c1",
        "brokerName": "broker-b",
        "brokerAddrs": [
            "0:192.168.8.103:10000",
            "1:192.168.8.104:10000"
        ]
    }
}           
  • HashMap> clusterAddrTable : Broker 集群信息
{
    "c1": [
        "broker-a",
        "broker-b"
    ]
}           
  • HashMap brokerLiveTable : broker状态信息
{
    "192.168.8.101:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": "192.168.8.102:10000"
    },

    "192.168.8.102:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": ""
    },
    "192.168.8.103:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": "192.168.8.104:10000"
    },

    "192.168.8.104:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": ""
    }
}           
  • HashMap> filterServerTable : broker上的FilterServer列表。
{
    "192.168.8.101:10000": ["192.168.9.101:10000"],

    "192.168.8.102:10000": ["192.168.9.102:10000"]
}           

4. 路由信息的注册

  • RocketMQ的路由注册主要是broker与nameSrv的心跳功能检测,当broker启动的时候会向集群中所有的namesrv发送心跳语句,每隔30s向namesrv发送心跳包,namesrv收到心跳包之后会更新brokerLiveTable缓存中的lastUpdateTimestamp时间,然后nameSrv的定时任务会每隔10s扫描brokerLiveTable,如果超过120s没有收到心跳包则将其剔除并且关闭socket连接。
  • 源码层次解析步骤
    • 1,brokerController的start方法中发送心跳包注册registerBrokerAll,
    • 2,调用BrokerOuterAPI中的registerBrokerAll中主要遍历所有的nameSrv列表,BrokerSrv依次向nameSrv列表发送心跳包
    • 3,调用BrokerOuterAPI的registerBroker发送request信息,RocketMQ是基于netty传输的,如果我们需要网络跟踪,rocketMQ会为每个消息生成一个requestCode,然后服务端会有对应的网络处理器(processs包中),只需整库搜索 questCode 即可找到相应的处理逻辑
      • 心跳包request的信息
        • brokerAddr:地址信息
        • brokerId:0代表master,大于0代表slave
        • brokerName:名称
        • clusterName:集群名称
        • haServerAddr:master地址,初次请求时为空,slave向nameSrv注册之后返回
        • requestBody:
          • filterServerList:消息过滤列表
          • topicConfigWrapper:主体配置
        • body:消息体
    • nameSrv接收到网络请求之后开始进行处理,DefaultRequestProcessor默认的处理器,如果请求类型为 RequestCode REGISTER_BROKER,那么最终会被转发到RoutelnfoManager#registerBroker。
      • 1,首先在方法开始加入一个写锁,防止并发的修改RouteInfoManager中的路由表,然后开始维护clusterAddrTable信息,首先判断broker所属的集群是不是已经存在,如果不存在则创建并且将broker名称放入到集合中。
      • 2,维护BrokerData,首先从brokerAddrTable中根据brokerName获取broker信息,如果获取不到则将新建的brokerData信息放入并且将registerFirst设置为true,表示第一次注册,如果已经存在则将其替换并且将registerFirst设置为false,
      • 3,如果broker为master,并且Broker Topic配置信息发生变化或者是初次注册,那么则需要创建或者更新topicQueueTable的信息,如果在发送消息的时候topic是不存在的,那么如果设置了brokerConfig中的autoCreateTopicEnable为true,就会返回一个rocketMQ的默认的路由信息。
      • 4,更新brokerLiveInfo的最后更新时间,这个是路由删除的最重要的列表
      • 5,注册Broker上的filter过滤列表
      • 6,完成路由的注册

5. 路由信息的删除

  • 路由删除的两种方式
    • broker主动方式:当broker正常关机的时候,会发送一个unregisterBroker指令
    • nameSrv主动方式:nameSrv在启动的时候,会开启两个定时任务,其中有一个是每隔10s扫描一次brokerLiveInfo列表,然后获取列表中每一个元素的lastupdateTime,如果超过120s没有收到broker的心跳包则会将其剔除列表关闭channel,
      • 判断时间超过120s
      • 申请写锁
      • 根据brokerAddress删除brokerLiveTable和FilterServerTable
      • 维护brokerAddrTable,遍历brokerAddrTable获取BrokeData,然后从brokeData中的属性brokerAddr(map集合)中剔除,如果剔除之后集合为空则从brokerAddrTable一处brokerName
      • 根据 BrokerName,从 clusterAddrTable 中找到 Broker并从集群中移除,如果移

        除后,集群中不包含任何 Broker,则将改集群从clusterAddrTable 中一处

      • 根据 brokerName遍历所有主题的队列,如果队列中包含了当前 Broker的队列则移除,如果 topic 只包含待移除 Broker 的队列的话,从路由表中删除该 topic
      • 释放写锁

        然后申请一个写锁(防止并发更新)删除brokerLiveTable,FilterServerTable信息,然后维护brokerAddrTable,clusterAddrTable。

6. 路由信息的发现

  • RocketMQ路由并非是实时的,当topic路由发生变化的时候,不会主动推送给客户端,而是由客户端定时的拉取最新的路由信息。
  • 返回给客户端的对象是TopicRouteData,其结构是
orderTopicConf:顺序消息配置内容,来自于kvConfig
List<QueueData> queueDatas:topic队列元数据
List<BrokerData> brokerDatas:topic 分布的 broker 元数据
HashMap< String/ * brokerAdress*/,List<String> /* filt rServer* /> : broker 上过滤服务器地址列表           
  • 路由发现实现类DefaultRequestProcessor的getRoutelnfoByTopic
    • 调用 RouterlnfoManager 的方法,从路由 topicQueueTable brokerAddrTable
  1. terServerTable 中分别填充 TopicRouteData 中的 List

filterServer 地址表

  • 如果找到主题对应的路由信息并且该主题为顺序消息,则从 NameServer

    KVconfig 中获取关于顺序消息相 的配置填充路由信息

  • 如果找不到路由信息 CODE 则使用 TOPIC NOT_EXISTS ,表示没有找到对应的路由

7.总结

RocketMQ-笔记 1:namesrv1. namesrv的来源2. 解析namesrv的启动流程3. 两主两从的部署结构图,下面的解析都是按照这个图进行解析的4. 解析路由元数据的数据结构4. 路由信息的注册5. 路由信息的删除6. 路由信息的发现7.总结