天天看点

RocketMQ之NameServer启动流程

作者:搬山道猿
RocketMQ之NameServer启动流程

NameServerController启动流程总览

启动类:org.apache.rocketmq.namesrv.NamesrvStartup#main

java复制代码public static void main(String[] args) {
    main0(args);
}

           
java复制代码public static NamesrvController main0(String[] args) {
    try {
        //创建NamesrvController
        NamesrvController controller = createNamesrvController(args);
        //初始化并启动NamesrvController
        start(controller);
        String tip = "The Name Server boot success. serializeType=" +   
                        RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}
           

1.创建NamesrvController

java复制代码public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    //设置MQ版本号
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    //解析启动命令 start mqnamesrv.cmd
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine
            ("mqnamesrv",
             args, 
             buildCommandlineOptions(options), 
             new PosixParser());

    if (null == commandLine) {
        System.exit(-1);
        return null;
    }

    //创建NamesrvConfig
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //创建NettyServerConfig
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    //设置启动端口号9876
    nettyServerConfig.setListenPort(9876);
    //解析启动-c参数
    if (commandLine.hasOption('c')) {
        //-c指定配置文件
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            //加载配置文件到流
            InputStream in = new BufferedInputStream
                                    (new FileInputStream(file));
            //加载属性到InputStream
            properties = new Properties();
            properties.load(in);
            //分别设置属性到namesrvConfig 和 nettyServerConfig  
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
            //设置配置文件存储地址
            namesrvConfig.setConfigStorePath(file);
            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    //-p来指定是否打印配置项,在指定该选项时,直接退出。
    if (commandLine.hasOption('p')) {
        InternalLogger console 
                = InternalLoggerFactory.getLogger
                        (LoggerName.NAMESRV_CONSOLE_NAME);
        //打印namesrvConfig属性
        MixAll.printObjectProperties(console, namesrvConfig);
        //打印nettyServerConfig 属性
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    //将启动参数填充到namesrvConfig,nettyServerConfig
    MixAll.properties2Object
                (ServerUtil.commandLine2Properties(commandLine), 
                                                        namesrvConfig);

    //创建NameServerController
    final NamesrvController controller 
            = new NamesrvController(namesrvConfig, nettyServerConfig);

    controller.getConfiguration().registerConfig(properties);
    return controller;
}
           

2.初始化NamesrvController

3.启动NamesrvController

java复制代码 public static NamesrvController start(final NamesrvController controller) 
     															throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        //注册JVM钩子函数代码
     	//在JVM进程关闭之前,先将线程池关闭,及时释放资源
     	//可以借鉴的地方
        Runtime.getRuntime()
            .addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                //释放资源
                controller.shutdown();
                return null;
            }
        }));
        controller.start();
        return controller;
    }

 public void shutdown() {
     	//关闭nettyServer
        this.remotingServer.shutdown();
       	//关闭线程池
        this.remotingExecutor.shutdown();
     	//关闭定时任务
        this.scheduledExecutorService.shutdown();
		//功能实现当文件内容发生变化时,重新加载文件,可用于读取配置类的文件。
     	//原理:注册一个listener,然后新开个线程,定期去扫描文件
     	//通过对文件内容进行hash来判断文件内容是否发生变化
     	//如果变化了,则回调监听器的onChange方法。
     	//看源码主要是监听证书
     	//关闭fileWatchService
        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
    }
           

大致流程如下图

RocketMQ之NameServer启动流程

上面对源码做了概览,大致知道了NameServerController启动的流程分为3步,创建-初始化-启动。

下面一步一步看吧。

1.创建nameServerController

1.1:解析配置文件,创建NameSrvController

解析配置文件,填充NameServerConfig、NettyServerConfig属性值,并创建NamesrvController

注意NameServer创建的是是NettyServerConfig,Broker创建的是NettyClientConfig

NamesrvStartup#createNamesrvController

java复制代码public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    //设置MQ版本号
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    //解析启动命令 start mqnamesrv.cmd
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine
            ("mqnamesrv",
             args, 
             buildCommandlineOptions(options), 
             new PosixParser());

    if (null == commandLine) {
        System.exit(-1);
        return null;
    }

    //创建NamesrvConfig
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //创建NettyServerConfig
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    //设置启动端口号9876
    nettyServerConfig.setListenPort(9876);
    //解析启动-c参数
    if (commandLine.hasOption('c')) {
        //-c指定配置文件
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            //加载配置文件到流
            InputStream in = new BufferedInputStream
                                    (new FileInputStream(file));
            //加载属性到InputStream
            properties = new Properties();
            properties.load(in);
            //分别设置属性到namesrvConfig 和 nettyServerConfig  
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
            //设置配置文件存储地址
            namesrvConfig.setConfigStorePath(file);
            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    //-p来指定是否打印配置项,在指定该选项时,直接退出。
    if (commandLine.hasOption('p')) {
        InternalLogger console 
                = InternalLoggerFactory.getLogger
                        (LoggerName.NAMESRV_CONSOLE_NAME);
        //打印namesrvConfig属性
        MixAll.printObjectProperties(console, namesrvConfig);
        //打印nettyServerConfig 属性
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    //将启动参数填充到namesrvConfig,nettyServerConfig
    MixAll.properties2Object
                (ServerUtil.commandLine2Properties(commandLine), 
                                                        namesrvConfig);

    //创建NameServerController
    final NamesrvController controller 
            = new NamesrvController(namesrvConfig, nettyServerConfig);

    controller.getConfiguration().registerConfig(properties);
    return controller;
}
           

2.初始化nameServerController

2.1:初始化NamesrvController

根据启动属性创建NamesrvController实例,并初始化该实例。

NameServerController实例为NameServer核心控制器。

NamesrvController#initialize

java复制代码public boolean initialize() {

    //加载KV配置
    this.kvConfigManager.load();

    /***
        这里需要看NettyRemotingServer构造方法
        会把netty的启动辅助类serverBootstrap创建好,这个是重点
        保存了channelEventListener。
        新建了netty的boss线程。
        创建publicExecutor线程池。
    ***/
    this.remotingServer = new NettyRemotingServer
                            (this.nettyServerConfig, this.brokerHousekeepingService);

    //创建线程池 默认是8个
    this.remotingExecutor =
        Executors.newFixedThreadPool
        (nettyServerConfig.getServerWorkerThreads(),  
                            new ThreadFactoryImpl("RemotingExecutorThread_"));

    /***
    创建DefaultRequestProcessor 作为netty server 请求处理器。
    org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest 
    处理所有已知request code类型的请求
    **/
    this.registerProcessor();

    //开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker
    //如果在2分钟都没有发送心跳 移除不活跃的Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    //开启定时任务:每隔10min打印一次KV配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
    return true;
}
           

2.2:启动定时任务:每10秒扫描一次所有broker

Broker30秒向NameServer发送一次心跳。

NamesrvController会开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker。

移除broker是根据broker的lastUpdateStamp+2分钟是否小于当前时间,如果小于就移除。

如果某个broker在2分钟内都没有发送心跳 那么就移除该broker 即连续4次没有发送心跳就移除

RouteInfoManager#scanNotActiveBroker

java复制代码//扫描不活跃的broker
public void scanNotActiveBroker() {
//2分钟
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

    //HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    Iterator<Entry<String, BrokerLiveInfo>> it 
                            = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {

        Entry<String, BrokerLiveInfo> next = it.next();
        //BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。
        long last = next.getValue().getLastUpdateTimestamp();
        //BrokerLiveInfo中的 
       //lastUpdateTimestamp+2分钟小于当前时间说明 已经2分钟没有心跳了
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            //关闭并移除channel
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            //销毁channel工作
            this.onChannelDestroy(next.getKey(), 
                                    next.getValue().getChannel());
        }
    }
}
           

移除2分钟没心跳的broker的路由元信息:RouteInfoManager#onChannelDestroy

java复制代码    //路由元信息
    //类:RouteInfoManager
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
           

topicQueueTable: Topic消息队列路由信息,消息发送时根据路由表进行负载均衡

brokerAddrTable: Broker基础信息,包括brokerName、所属集群名称、主备Broker地址

clusterAddrTable: Broker集群信息,存储集群中所有Broker名称

brokerLiveTable: Broker状态信息,NameServer每次收到心跳包是会替换该信息

filterServerTable: Broker上的FilterServer列表,用于类模式消息过滤。

一个Topic拥有多个消息队列,一个Broker为每一个主题创建8个读队列和8个写队列。

多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave架构。

brokerId为0代表Master,大于0为Slave。

BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。

RocketMQ之NameServer启动流程
RocketMQ之NameServer启动流程
ini复制代码//主要就是移除路由信息表相关信息
public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                //申请写锁,根据brokerAddress
                //从brokerLiveTable和filterServerTable移除
                this.lock.readLock().lockInterruptibly();
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                    this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) {
                    Entry<String, BrokerLiveInfo> entry
                                        = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }

    if (null == brokerAddrFound) {
        brokerAddrFound = remoteAddr;
    } else {
        log.info("the broker's channel destroyed" + 
                            "clean it's data structure at once");
    }

    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                this.brokerLiveTable.remove(brokerAddrFound);
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;

                //维护<String/* brokerName */, BrokerData> brokerAddrTable
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                    this.brokerAddrTable.entrySet().iterator();

                //遍历brokerAddrTable
              while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    //获取brokerData
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();
                    //遍历该broker的所有地址 即主从
                    Iterator<Entry<Long, String>> it
                            = brokerData.getBrokerAddrs().entrySet().iterator();
                    //循环遍历主从
                    while (it.hasNext()) {
                        Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                         //根据broker地址移除brokerAddr
                        if (brokerAddr.equals(brokerAddrFound)) {
                            brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            break;
                        }
                    }
                    //如果移除以后没有其他的BrokerAddr 相当于这个broker已经没有实例了
                    //那么把brokerData也从BrokerAddrTable 移除
                   // <String/* brokerName */, BrokerData> brokerAddrTable
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                    }
                }

                /***
                   维护集群信息: key = clusterName value对应的set是 brokerName 
                   <String, Set<String>> clusterAddrTable
                   这里移除的条件是 removeBrokerName=true
                   removeBrokerName 是在移除brokerAddr时 当braokerData中的addrs为空
                   即该broker的主从都不存在 这个broker已经没有实例了
                   设置removeBrokerName=true
               ***/
                if (brokerNameFound != null && removeBrokerName) {
                    Iterator<Entry<String, Set<String>>> 
                            it = this.clusterAddrTable.entrySet().iterator();
                    //遍历clusterAddrTable
                    while (it.hasNext()) {
                        Entry<String, Set<String>> entry = it.next();
                          //获得集群名称
                        String clusterName = entry.getKey();
                         //获得集群中brokerName集合
                        Set<String> brokerNames = entry.getValue();
                        //从brokerNames中移除brokerNameFound
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {
                            if (brokerNames.isEmpty()) {
                                //如果集群中不包含任何broker,则移除该集群
                                it.remove();
                            }
                            break;
                        }
                    }
                }

                //<String/* topic */, List<QueueData>> topicQueueTable队列
                //这里移除的条件是 removeBrokerName=true
                //removeBrokerName 是在移除brokerAddr时 当brokerData中的addrs为空
                //即该broker的主从都不存在,这个broker已经没有实例了
                //设置removeBrokerName=true
                if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                        this.topicQueueTable.entrySet().iterator();
                    //遍历topicQueueTable
                    while (itTopicQueueTable.hasNext()) {
                        Entry<String, List<QueueData>> entry
                                            = itTopicQueueTable.next();
                        //主题名称
                        String topic = entry.getKey();
                        //队列集合
                        List<QueueData> queueDataList = entry.getValue();
                        //遍历该主题队列
                        Iterator<QueueData> itQueueData
                                        = queueDataList.iterator();

                        while (itQueueData.hasNext()) {
                            //获取queueData
                            QueueData queueData = itQueueData.next();
                            //如果queueData中的brokerName等于本次移除的brokerName
                            //那么从队列中移除该queue
                            if (queueData.getBrokerName()
                                        .equals(brokerNameFound)) {
                                itQueueData.remove();
                            }
                        }
                        //如果该topic的队列为空,则移除该topic
                        if (queueDataList.isEmpty()) {
                            itTopicQueueTable.remove();
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}
           

3.注册jvm钩子函数,启动NameServerController

3.1注册jvm钩子函数,启动NameSrvCtr

在JVM进程关闭之前,先将线程池关闭,及时释放资源

NamesrvStartup#start

java复制代码 public static NamesrvController start(final NamesrvController controller) 
                                                                throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        //注册JVM钩子函数代码
        //在JVM进程关闭之前,先将线程池关闭,及时释放资源
        //可以借鉴的地方
        Runtime.getRuntime()
            .addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                //释放资源
                controller.shutdown();
                return null;
            }
        }));
        controller.start();
        return controller;
    }

 public void shutdown() {
        //关闭nettyServer
        this.remotingServer.shutdown();
        //关闭线程池
        this.remotingExecutor.shutdown();
        //关闭定时任务
        this.scheduledExecutorService.shutdown();
        //功能实现当文件内容发生变化时,重新加载文件,可用于读取配置类的文件。
        //原理:注册一个listener,然后新开个线程,定期去扫描文件
        //通过对文件内容进行hash来判断文件内容是否发生变化
        //如果变化了,则回调监听器的onChange方法。
        //看源码主要是监听证书
        //关闭fileWatchService
        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
    }           

继续阅读