天天看點

RocketMQ源碼解析——NameServer

目錄:

  • RocketMQ源碼解析——搭建源碼環境
  • RocketMQ源碼解析——NameServer

1. 了解RocketMQ核心元件——NameServer:

RocketMQ消息中間件的設計思路一般是基于主題訂閱釋出的機制,消息生産者(Producer)發送一個消息到消息伺服器,消息伺服器負責将消息持久化存儲,消息消費者(Consumer)訂閱該興趣的主題,消息伺服器根據訂閱資訊(路由資訊)将消息推送到消費者(Push模式)或者消費者主動向消息伺服器拉去(Pull模式),進而實作消息生産者與消息消費者解耦。

為了避免消息伺服器的單點故障導緻的整個系統癱瘓,通常會部署多台消息伺服器共同承擔消息的存儲。那消息生産者如何知道消息要發送到哪台消息伺服器呢?如果某一台消息伺服器當機了,那麼消息生産者如何在不重新開機服務情況下感覺呢?

而NameServer就是為了解決以上問題設計的,整體設計圖如下所示:

RocketMQ源碼解析——NameServer

Broker消息伺服器在啟動的時向所有NameServer注冊,消息生産者(Producer)在發送消息時之前先從NameServer擷取Broker伺服器位址清單,然後根據負載均衡算法從清單中選擇一台伺服器進行發送。NameServer與每台Broker保持長連接配接,并間隔一定時間檢測Broker是否存活,如果檢測到Broker當機,則從路由系統資料庫中删除。

2. 流程時序圖:

RocketMQ源碼解析——NameServer

3. 源碼解析:

找到NameServer啟動類,及其啟動入口:

public static void main(String[] args) { // NameServer啟動入口
    main0(args);
}
           
public static NamesrvController main0(String[] args) {
    try {
        NamesrvController controller = createNamesrvController(args); // 建立NameServer控制器,前面說到NameServer的主要作用就是作為資訊注冊中心,需要接收其他元件的注冊/擷取資訊請求。是以可以将其看做是一個接收請求的controller
        start(controller); // 啟動controller
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1); // 建立/啟動controller異常,停止JVM
    }
    return null;
}
           
  • 建立NameServerController:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); // 設定RocketMQ版本屬性資訊

    Options options = ServerUtil.buildCommandlineOptions(new Options()); // 建構指令行解析options對象
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); // 建立解析指令行參數對象
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }

    final NamesrvConfig namesrvConfig = new NamesrvConfig(); // 建立NameServer配置對象
    final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 建立Netty伺服器端配置對象,用于接收請求(元件之間的通信主要依賴netty)
    nettyServerConfig.setListenPort(9876); // 預設監聽端口9876
	// 解析啟動指令-c參數
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }
	// 解析啟動指令-p參數
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }

    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);

    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);

    return controller;
}
           

以上代碼就是建立NameServer控制器的整體流程,其中可以發現控制器的建立依賴于NameServer和NettyService配置對象,是以重點來看下這兩個配置對象當中有哪些重要的屬性:

/**
* NamesrvConfig重點屬性
*/
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); // RocketMQ項目的目錄,也就是它的家目錄
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; // NameServer存儲KV配置屬性的持久化路徑
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; // NameServer預設配置檔案路徑
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false; // 是否支援順序消息,預設不支援
           
/**
* NettyServerConfig重點屬性
*/
private int listenPort = 8888; // NameServer監聽端口,預設是8888,但是會被修改為9876
private int serverWorkerThreads = 8; // 線程池線程個數
private int serverCallbackExecutorThreads = 0; // 回調函數處理池線程個數(比如說異步發送消息,會提供一個回調方法)
private int serverSelectorThreads = 3; // IO線程池線程個數,主要是處理網絡請求,解析請求包
private int serverOnewaySemaphoreValue = 256; // 單向消息發送最大并發數
private int serverAsyncSemaphoreValue = 64; // 異步消息發送最大并發數
private int serverChannelMaxIdleTimeSeconds = 120; // 網絡連接配接最大空閑時間,預設120s

private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; // 網絡發送緩沖區大小
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; // 網絡接收緩沖區大小
private boolean serverPooledByteBufAllocatorEnable = true;
           

然後建立控制器:

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
    this.namesrvConfig = namesrvConfig; // 指定NameServer配置對象
    this.nettyServerConfig = nettyServerConfig; //指定NettyServer配置對象
    this.kvConfigManager = new KVConfigManager(this); // 建立KV配置資訊對象
    this.routeInfoManager = new RouteInfoManager(); // 建立路由管理器(核心屬性,用于路由資訊注冊、查找)
    this.brokerHousekeepingService = new BrokerHousekeepingService(this);
    this.configuration = new Configuration(
        log,
        this.namesrvConfig, this.nettyServerConfig
    );
    this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
           

完成控制器的建立後,還需要對其進行初始化:

public static NamesrvController start(final NamesrvController controller) throws Exception {
    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }

    boolean initResult = controller.initialize(); // 初始化控制器
    if (!initResult) { // 初始化失敗
        controller.shutdown(); // 控制器關閉
        System.exit(-3);
    }

    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { // 給目前運作程式添加關閉時的鈎子函數
        @Override
        public Void call() throws Exception {
            controller.shutdown(); // 程式關閉,控制器跟着關閉
            return null;
        }
    }));

    controller.start(); // 正式啟動控制器

    return controller;
}
           
  • 控制器初始化:
public boolean initialize() {
    this.kvConfigManager.load(); // 加載KV配置
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 建立NettyServer網絡處理對象
    this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    this.registerProcessor();

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // 重點方法,定義定時任務(先延遲5s,後執行指定任務,此後每隔10s,執行指定任務)
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker(); // 由路由控制器掃描并排除不活躍的Broker
        }
    }, 5, 10, TimeUnit.SECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // 定義定時任務
        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically(); // 列印所有KV配置資訊
        }
    }, 1, 10, TimeUnit.MINUTES);

    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(
                new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();
                        }
                    }
                    private void reloadServerSslContext() {
                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    }
                });
        } catch (Exception e) {
            log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }
    return true;
}
           

以上代碼便是控制器初始化的全部邏輯,其中我們需要重點關注的方法是 NamesrvController.this.routeInfoManager.scanNotActiveBroker(); 這個方法是NameServer依賴路由管理器定時掃描不活躍的Broker,并将其資訊移除。是NameServer作為RocketMQ的注冊資訊中心最為核心的功能。

首先,來了解下路由管理器的建立,及其核心屬性:

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; // 與Broker通信的長連接配接通道的過期時間
private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 讀寫鎖
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // Topic消息隊列路由資訊,消息發送時根據路由表進行負載均衡
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // Broker基礎資訊,包括broker名、所屬叢集名稱、主備Broker位址
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // Broker叢集資訊,存儲叢集中所有的Broker名稱
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // Broker狀态資訊,NameServer每次檢測完各個Broker資訊後會更新該Table
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; // Broker上的FilterServer清單,用于類模式消息過濾

public RouteInfoManager() { // 構造函數,對以上屬性進行初始化
    this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
    this.brokerAddrTable = new HashMap<String, BrokerData>(128);
    this.clusterAddrTable = new HashMap<String, Set<String>>(32);
    this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
    this.filterServerTable = new HashMap<String, List<String>>(256);
}
           

通過以上代碼可以看到,路由管理器的核心屬性都是HashMap類型對象,為了對路由管理器有更深入的了解,下面來關注其中三個重點value對象:

同一個Topic中會包含多個消息隊列,一個Broker會為每一個主題建立4個讀隊列,和四個寫隊列,是以topicQueueTable的value是list清單

/**
* 消息隊列
*/
public class QueueData implements Comparable<QueueData> {
    private String brokerName; // 所屬的Broker名
    private int readQueueNums; // 讀隊列數量
    private int writeQueueNums; // 寫隊列數量
    private int perm;
    private int topicSysFlag;
	...
}
           

多個Broker組成一個叢集,叢集由相同的多台Broker組成Master-Slave架構,brokerId為0代表master,大于0代表Salve

/**
* Broker
*/
public class BrokerData implements Comparable<BrokerData> {
    private String cluster; // 所屬的叢集名
    private String brokerName; // Broker名
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; // 叢集中同名Broker結點的id和位址的映射表(會存在同名多結點的情況,保證高可用)
	...
}
           

表示Broker相關狀态資訊

/**
* Broker狀态資訊
*/
class BrokerLiveInfo {
    private long lastUpdateTimestamp; // 資訊最新更新時間戳
    private DataVersion dataVersion; // 版本
    private Channel channel; // 長連接配接通道
    private String haServerAddr;
	...
}
           

到此,有了以上對路由管理器核心屬性的了解,再來回顧NameServer是如何通過路由管理器,定時掃描并排除不活躍的Broker:

public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); 
    while (it.hasNext()) { // 周遊所有Broker狀态資訊
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { // Broker資訊最新更新時間戳 + 長連接配接通道過期時間 < 目前時間戳 意味着該Broker已經超過指定時間未與NameServer進行通信,是以被視為故障,需要從删除該Broker對應的資訊
            RemotingUtil.closeChannel(next.getValue().getChannel()); // 關閉與該Broker通信的長連接配接
            it.remove(); // 删除該Broker的狀态資訊
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); // 删除其他table有關該Broker的資訊
        }
    }
}
           
public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                this.lock.readLock().lockInterruptibly(); // 加讀鎖
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) { // 周遊所有Broker狀态資訊
                    Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) { // 要删除的Broker狀态資訊還存在(還沒删除)
                        brokerAddrFound = entry.getKey(); // 則找到該Broker的位址
                        break;
                    }
                }
            } finally {
                this.lock.readLock().unlock(); // 釋放讀鎖
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }

    if (null == brokerAddrFound) { // 為空,說明對應Broker狀态資訊已經删除
        brokerAddrFound = remoteAddr; // 使用傳進來的Broker位址參數
    } else {
        log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
    }

    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
        try {
            try {
                this.lock.writeLock().lockInterruptibly(); // 加寫鎖,保證删除操作的線程安全
                this.brokerLiveTable.remove(brokerAddrFound); // 再次确認删除指定Broker的狀态資訊
                this.filterServerTable.remove(brokerAddrFound); // 删除有關指定Broker的過濾位址資訊
                
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator();
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { // 周遊所有Broker
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();
                    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    while (it.hasNext()) { // 周遊每個Broker中所包含的同名結點的Broker資訊
                        Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                        if (brokerAddr.equals(brokerAddrFound)) { // 找到指定删除的Broker位址
                            brokerNameFound = brokerData.getBrokerName(); // 擷取該Broker名
                            it.remove(); // 将其從同名Broker集合中删除
                            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                brokerId, brokerAddr);
                            break;
                        }
                    }

                    if (brokerData.getBrokerAddrs().isEmpty()) { // 同名Broker集合為空,說明以該Broker命名的結點已經全部挂掉,則這個命名的Broker也沒有存在的必要了
                        removeBrokerName = true; // 辨別需要删除該Broker
                        itBrokerAddrTable.remove(); // 從位址資訊中删除該Broker
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName());
                    }
                }

                if (brokerNameFound != null && removeBrokerName) { // 滿足删除指定名稱的Broker條件,需要删除有關該Broker的其他資訊
                    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                    while (it.hasNext()) { // 周遊所有叢集
                        Entry<String, Set<String>> entry = it.next();
                        String clusterName = entry.getKey();
                        Set<String> brokerNames = entry.getValue();
                        boolean removed = brokerNames.remove(brokerNameFound); // 從叢集中删除指定名稱的Broker名稱資訊
                        if (removed) {
                            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName);
                            if (brokerNames.isEmpty()) { // Broker名稱資訊集合為空,說明這個叢集已經不存在任何Broker結點了,則這個叢集也沒有存在的必要了
                                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName);
                                it.remove(); // 删除該叢集的資訊
                            }
                            break;
                        }
                    }
                }

                if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator();
                    while (itTopicQueueTable.hasNext()) { // 周遊所有Topic主題
                        Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                        String topic = entry.getKey(); // 擷取topic名稱
                        List<QueueData> queueDataList = entry.getValue(); // 擷取Topic下的所有隊列

                        Iterator<QueueData> itQueueData = queueDataList.iterator();
                        while (itQueueData.hasNext()) { // 周遊該Topic下的所有隊列
                            QueueData queueData = itQueueData.next();
                            if (queueData.getBrokerName().equals(brokerNameFound)) { // 隊列屬于指定删除的Broker
                                itQueueData.remove(); // 将該隊列從集合中删除
                                log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData);
                            }
                        }

                        if (queueDataList.isEmpty()) { // 隊列集合為空,則沒有存在的必要了
                            itTopicQueueTable.remove(); // 删除該隊列集合
                            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock(); // 釋放寫鎖
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}
           

到此,NameServer的核心源碼解析結束。