天天看點

RocketMQ之NameServer啟動流程

作者:搬山道猿
RocketMQ之NameServer啟動流程

NameServerController啟動流程總覽

啟動類:org.apache.rocketmq.namesrv.NamesrvStartup#main

java複制代碼public static void main(String[] args) {
    main0(args);
}

           
java複制代碼public static NamesrvController main0(String[] args) {
    try {
        //建立NamesrvController
        NamesrvController controller = createNamesrvController(args);
        //初始化并啟動NamesrvController
        start(controller);
        String tip = "The Name Server boot success. serializeType=" +   
                        RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}
           

1.建立NamesrvController

java複制代碼public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    //設定MQ版本号
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    //解析啟動指令 start mqnamesrv.cmd
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine
            ("mqnamesrv",
             args, 
             buildCommandlineOptions(options), 
             new PosixParser());

    if (null == commandLine) {
        System.exit(-1);
        return null;
    }

    //建立NamesrvConfig
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //建立NettyServerConfig
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    //設定啟動端口号9876
    nettyServerConfig.setListenPort(9876);
    //解析啟動-c參數
    if (commandLine.hasOption('c')) {
        //-c指定配置檔案
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            //加載配置檔案到流
            InputStream in = new BufferedInputStream
                                    (new FileInputStream(file));
            //加載屬性到InputStream
            properties = new Properties();
            properties.load(in);
            //分别設定屬性到namesrvConfig 和 nettyServerConfig  
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
            //設定配置檔案存儲位址
            namesrvConfig.setConfigStorePath(file);
            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    //-p來指定是否列印配置項,在指定該選項時,直接退出。
    if (commandLine.hasOption('p')) {
        InternalLogger console 
                = InternalLoggerFactory.getLogger
                        (LoggerName.NAMESRV_CONSOLE_NAME);
        //列印namesrvConfig屬性
        MixAll.printObjectProperties(console, namesrvConfig);
        //列印nettyServerConfig 屬性
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    //将啟動參數填充到namesrvConfig,nettyServerConfig
    MixAll.properties2Object
                (ServerUtil.commandLine2Properties(commandLine), 
                                                        namesrvConfig);

    //建立NameServerController
    final NamesrvController controller 
            = new NamesrvController(namesrvConfig, nettyServerConfig);

    controller.getConfiguration().registerConfig(properties);
    return controller;
}
           

2.初始化NamesrvController

3.啟動NamesrvController

java複制代碼 public static NamesrvController start(final NamesrvController controller) 
     															throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        //注冊JVM鈎子函數代碼
     	//在JVM程序關閉之前,先将線程池關閉,及時釋放資源
     	//可以借鑒的地方
        Runtime.getRuntime()
            .addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                //釋放資源
                controller.shutdown();
                return null;
            }
        }));
        controller.start();
        return controller;
    }

 public void shutdown() {
     	//關閉nettyServer
        this.remotingServer.shutdown();
       	//關閉線程池
        this.remotingExecutor.shutdown();
     	//關閉定時任務
        this.scheduledExecutorService.shutdown();
		//功能實作當檔案内容發生變化時,重新加載檔案,可用于讀取配置類的檔案。
     	//原理:注冊一個listener,然後新開個線程,定期去掃描檔案
     	//通過對檔案内容進行hash來判斷檔案内容是否發生變化
     	//如果變化了,則回調監聽器的onChange方法。
     	//看源碼主要是監聽證書
     	//關閉fileWatchService
        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
    }
           

大緻流程如下圖

RocketMQ之NameServer啟動流程

上面對源碼做了概覽,大緻知道了NameServerController啟動的流程分為3步,建立-初始化-啟動。

下面一步一步看吧。

1.建立nameServerController

1.1:解析配置檔案,建立NameSrvController

解析配置檔案,填充NameServerConfig、NettyServerConfig屬性值,并建立NamesrvController

注意NameServer建立的是是NettyServerConfig,Broker建立的是NettyClientConfig

NamesrvStartup#createNamesrvController

java複制代碼public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    //設定MQ版本号
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    //解析啟動指令 start mqnamesrv.cmd
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine
            ("mqnamesrv",
             args, 
             buildCommandlineOptions(options), 
             new PosixParser());

    if (null == commandLine) {
        System.exit(-1);
        return null;
    }

    //建立NamesrvConfig
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //建立NettyServerConfig
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    //設定啟動端口号9876
    nettyServerConfig.setListenPort(9876);
    //解析啟動-c參數
    if (commandLine.hasOption('c')) {
        //-c指定配置檔案
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            //加載配置檔案到流
            InputStream in = new BufferedInputStream
                                    (new FileInputStream(file));
            //加載屬性到InputStream
            properties = new Properties();
            properties.load(in);
            //分别設定屬性到namesrvConfig 和 nettyServerConfig  
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
            //設定配置檔案存儲位址
            namesrvConfig.setConfigStorePath(file);
            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    //-p來指定是否列印配置項,在指定該選項時,直接退出。
    if (commandLine.hasOption('p')) {
        InternalLogger console 
                = InternalLoggerFactory.getLogger
                        (LoggerName.NAMESRV_CONSOLE_NAME);
        //列印namesrvConfig屬性
        MixAll.printObjectProperties(console, namesrvConfig);
        //列印nettyServerConfig 屬性
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    //将啟動參數填充到namesrvConfig,nettyServerConfig
    MixAll.properties2Object
                (ServerUtil.commandLine2Properties(commandLine), 
                                                        namesrvConfig);

    //建立NameServerController
    final NamesrvController controller 
            = new NamesrvController(namesrvConfig, nettyServerConfig);

    controller.getConfiguration().registerConfig(properties);
    return controller;
}
           

2.初始化nameServerController

2.1:初始化NamesrvController

根據啟動屬性建立NamesrvController執行個體,并初始化該執行個體。

NameServerController執行個體為NameServer核心控制器。

NamesrvController#initialize

java複制代碼public boolean initialize() {

    //加載KV配置
    this.kvConfigManager.load();

    /***
        這裡需要看NettyRemotingServer構造方法
        會把netty的啟動輔助類serverBootstrap建立好,這個是重點
        儲存了channelEventListener。
        建立了netty的boss線程。
        建立publicExecutor線程池。
    ***/
    this.remotingServer = new NettyRemotingServer
                            (this.nettyServerConfig, this.brokerHousekeepingService);

    //建立線程池 預設是8個
    this.remotingExecutor =
        Executors.newFixedThreadPool
        (nettyServerConfig.getServerWorkerThreads(),  
                            new ThreadFactoryImpl("RemotingExecutorThread_"));

    /***
    建立DefaultRequestProcessor 作為netty server 請求處理器。
    org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest 
    處理所有已知request code類型的請求
    **/
    this.registerProcessor();

    //開啟定時任務:每隔10s掃描一次Broker,移除不活躍的Broker
    //如果在2分鐘都沒有發送心跳 移除不活躍的Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    //開啟定時任務:每隔10min列印一次KV配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
    return true;
}
           

2.2:啟動定時任務:每10秒掃描一次所有broker

Broker30秒向NameServer發送一次心跳。

NamesrvController會開啟定時任務:每隔10s掃描一次Broker,移除不活躍的Broker。

移除broker是根據broker的lastUpdateStamp+2分鐘是否小于目前時間,如果小于就移除。

如果某個broker在2分鐘内都沒有發送心跳 那麼就移除該broker 即連續4次沒有發送心跳就移除

RouteInfoManager#scanNotActiveBroker

java複制代碼//掃描不活躍的broker
public void scanNotActiveBroker() {
//2分鐘
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

    //HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    Iterator<Entry<String, BrokerLiveInfo>> it 
                            = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {

        Entry<String, BrokerLiveInfo> next = it.next();
        //BrokerLiveInfo中的lastUpdateTimestamp存儲上次收到Broker心跳包的時間。
        long last = next.getValue().getLastUpdateTimestamp();
        //BrokerLiveInfo中的 
       //lastUpdateTimestamp+2分鐘小于目前時間說明 已經2分鐘沒有心跳了
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            //關閉并移除channel
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            //銷毀channel工作
            this.onChannelDestroy(next.getKey(), 
                                    next.getValue().getChannel());
        }
    }
}
           

移除2分鐘沒心跳的broker的路由元資訊:RouteInfoManager#onChannelDestroy

java複制代碼    //路由元資訊
    //類:RouteInfoManager
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
           

topicQueueTable: Topic消息隊列路由資訊,消息發送時根據路由表進行負載均衡

brokerAddrTable: Broker基礎資訊,包括brokerName、所屬叢集名稱、主備Broker位址

clusterAddrTable: Broker叢集資訊,存儲叢集中所有Broker名稱

brokerLiveTable: Broker狀态資訊,NameServer每次收到心跳包是會替換該資訊

filterServerTable: Broker上的FilterServer清單,用于類模式消息過濾。

一個Topic擁有多個消息隊列,一個Broker為每一個主題建立8個讀隊列和8個寫隊列。

多個Broker組成一個叢集,叢集由相同的多台Broker組成Master-Slave架構。

brokerId為0代表Master,大于0為Slave。

BrokerLiveInfo中的lastUpdateTimestamp存儲上次收到Broker心跳包的時間。

RocketMQ之NameServer啟動流程
RocketMQ之NameServer啟動流程
ini複制代碼//主要就是移除路由資訊表相關資訊
public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                //申請寫鎖,根據brokerAddress
                //從brokerLiveTable和filterServerTable移除
                this.lock.readLock().lockInterruptibly();
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                    this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) {
                    Entry<String, BrokerLiveInfo> entry
                                        = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }

    if (null == brokerAddrFound) {
        brokerAddrFound = remoteAddr;
    } else {
        log.info("the broker's channel destroyed" + 
                            "clean it's data structure at once");
    }

    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                this.brokerLiveTable.remove(brokerAddrFound);
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;

                //維護<String/* brokerName */, BrokerData> brokerAddrTable
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                    this.brokerAddrTable.entrySet().iterator();

                //周遊brokerAddrTable
              while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    //擷取brokerData
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();
                    //周遊該broker的所有位址 即主從
                    Iterator<Entry<Long, String>> it
                            = brokerData.getBrokerAddrs().entrySet().iterator();
                    //循環周遊主從
                    while (it.hasNext()) {
                        Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                         //根據broker位址移除brokerAddr
                        if (brokerAddr.equals(brokerAddrFound)) {
                            brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            break;
                        }
                    }
                    //如果移除以後沒有其他的BrokerAddr 相當于這個broker已經沒有執行個體了
                    //那麼把brokerData也從BrokerAddrTable 移除
                   // <String/* brokerName */, BrokerData> brokerAddrTable
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                    }
                }

                /***
                   維護叢集資訊: key = clusterName value對應的set是 brokerName 
                   <String, Set<String>> clusterAddrTable
                   這裡移除的條件是 removeBrokerName=true
                   removeBrokerName 是在移除brokerAddr時 當braokerData中的addrs為空
                   即該broker的主從都不存在 這個broker已經沒有執行個體了
                   設定removeBrokerName=true
               ***/
                if (brokerNameFound != null && removeBrokerName) {
                    Iterator<Entry<String, Set<String>>> 
                            it = this.clusterAddrTable.entrySet().iterator();
                    //周遊clusterAddrTable
                    while (it.hasNext()) {
                        Entry<String, Set<String>> entry = it.next();
                          //獲得叢集名稱
                        String clusterName = entry.getKey();
                         //獲得叢集中brokerName集合
                        Set<String> brokerNames = entry.getValue();
                        //從brokerNames中移除brokerNameFound
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {
                            if (brokerNames.isEmpty()) {
                                //如果叢集中不包含任何broker,則移除該叢集
                                it.remove();
                            }
                            break;
                        }
                    }
                }

                //<String/* topic */, List<QueueData>> topicQueueTable隊列
                //這裡移除的條件是 removeBrokerName=true
                //removeBrokerName 是在移除brokerAddr時 當brokerData中的addrs為空
                //即該broker的主從都不存在,這個broker已經沒有執行個體了
                //設定removeBrokerName=true
                if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                        this.topicQueueTable.entrySet().iterator();
                    //周遊topicQueueTable
                    while (itTopicQueueTable.hasNext()) {
                        Entry<String, List<QueueData>> entry
                                            = itTopicQueueTable.next();
                        //主題名稱
                        String topic = entry.getKey();
                        //隊列集合
                        List<QueueData> queueDataList = entry.getValue();
                        //周遊該主題隊列
                        Iterator<QueueData> itQueueData
                                        = queueDataList.iterator();

                        while (itQueueData.hasNext()) {
                            //擷取queueData
                            QueueData queueData = itQueueData.next();
                            //如果queueData中的brokerName等于本次移除的brokerName
                            //那麼從隊列中移除該queue
                            if (queueData.getBrokerName()
                                        .equals(brokerNameFound)) {
                                itQueueData.remove();
                            }
                        }
                        //如果該topic的隊列為空,則移除該topic
                        if (queueDataList.isEmpty()) {
                            itTopicQueueTable.remove();
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}
           

3.注冊jvm鈎子函數,啟動NameServerController

3.1注冊jvm鈎子函數,啟動NameSrvCtr

在JVM程序關閉之前,先将線程池關閉,及時釋放資源

NamesrvStartup#start

java複制代碼 public static NamesrvController start(final NamesrvController controller) 
                                                                throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        //注冊JVM鈎子函數代碼
        //在JVM程序關閉之前,先将線程池關閉,及時釋放資源
        //可以借鑒的地方
        Runtime.getRuntime()
            .addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                //釋放資源
                controller.shutdown();
                return null;
            }
        }));
        controller.start();
        return controller;
    }

 public void shutdown() {
        //關閉nettyServer
        this.remotingServer.shutdown();
        //關閉線程池
        this.remotingExecutor.shutdown();
        //關閉定時任務
        this.scheduledExecutorService.shutdown();
        //功能實作當檔案内容發生變化時,重新加載檔案,可用于讀取配置類的檔案。
        //原理:注冊一個listener,然後新開個線程,定期去掃描檔案
        //通過對檔案内容進行hash來判斷檔案内容是否發生變化
        //如果變化了,則回調監聽器的onChange方法。
        //看源碼主要是監聽證書
        //關閉fileWatchService
        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
    }           

繼續閱讀