天天看點

RocketMq源碼分析(一)NameServer

在講NameServer之前先了解一個概念——服務發現,可以把一個服務查找另一個服務的過程叫做服務發現。但是如何查找呢?又要說到另一個詞注冊中心,由于服務的雙方都是不固定的,我不在乎過程中是誰服務,我隻在乎最後拿到的結果是我想要的。注冊中心就是幹這個的,首先一個服務想要調用另一個服務第一步需要拿到可擷取的服務清單,然後根據一定的負載均衡算法,指定給一個服務。注冊中心就是這樣一個地方,服務提供者們将自己在注冊中心注冊,說我們可以提供服務,然後呢,消費者就去挑了一個,了解成一個菜市場也不為過。好了,說正題,最常用的注冊中心飛zookeeper莫屬了,常見的中間件比如dubbo,kafka等都是支援zookeeper作為注冊中心的。那RocketMq是不是也是支援的呢?當然不是,如果是就不用說NameServer了(早期的RocketMq看網友說是支援的,後面移除了)。

為什麼自己編寫NameServer而不用Zookeeper?

我覺得原因可能是追求簡單,高效。Topic路由資訊無需在叢集中保持強一緻,隻追求最終一緻性。并且能容忍分鐘級别的不一緻。也正因為如此NameServer叢集之間互不通信!确實是,這樣也很大程度的降低了它實作的複雜度,降低了對網絡的依賴。性能比zookeeper提升了很多。當然具體的原因肯定不止這些,大家可以自行了解。

源碼解析:

先看一下NameSrv啟動過程,在源碼中找到org.apache.rocketmq.namesrv.NamesrvStartup,這個是NameServer的啟動類,如下源碼可知:

public static void main(String[] args) {
        main0(args);
    }
public static NamesrvController main0(String[] args) {
    try {
    		// 建立一個controller類
            NamesrvController controller = createNamesrvController(args);
            //開始啟動
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
           

從代碼上看大緻有兩個過程,建立NamesrvController,以及start它,建立NameSrvController的主要就是加載配置

// nameSrv配置類
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 初始化監聽端口
nettyServerConfig.setListenPort(9876);
//  擷取啟動nameSrv時 -c 指定的配置檔案 
if (commandLine.hasOption('c')) {
       String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                // 根據配置檔案初始化對應config的Object, 沒有配置的使用預設值
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }
           

MixAll.properties2Object方法主要作用就是将properties檔案轉換成Object類,方法中比較核心的代碼就是将所有set屬性方法周遊擷取屬性名,然後從配置檔案裡面取出對應的值進行設定,代碼如下

// 擷取屬性名字setName => 先擷取ame,然後擷取N,最後将N->變n 最後生成key->name
  String tmp = mn.substring(4);
  String first = mn.substring(3, 4);
  String key = first.toLowerCase() + tmp;
           

再看一下start的過程,代碼如下,三個步驟-初始化、注冊的jvm鈎子,啟動。

public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        // 添加jvm鈎子,優雅停機
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        controller.start();

        return controller;
    }
           

初始化中,主要是加載kv配置,建立NettyServer對象,然後開啟兩個定時任務——每隔10秒掃描一下不活躍的broker和每隔10分鐘列印一下kv配置。其次是注冊jvm鈎子,使其能夠在虛拟機關閉之前進行資源的釋放。最後進行啟動過程,可以看一下start方法

public void start() throws Exception {
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }
           

首先啟動的是網絡服務,之前在初始化階段有一行代碼就是初始化網絡服務的執行個體,remotingServer.

start方法就是對相應的設定相應配置的參數并且開始監聽網絡端口,NameServer預設的端口是8888。

this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
           

兩外則是啟動了一個檔案監聽服務,一旦發生變動,就會通知相應的監聽器進行處理。

public void start() throws Exception {
	    // 啟動網絡服務
        this.remotingServer.start();
		// 檔案監聽
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }
           

路由注冊及故障剔除

上面簡單的說到了NameServer的一個啟動過程,那麼路由注冊是如何實作的呢?Broker在啟動時會向叢集中所有的NameServer發送心跳語句,然後每隔30秒發送一次心跳包。NameServer在收到心跳包時會更新brokerLiveTable中緩存的BrokerLiveInfo的lastUpdateTimestamp。下面是路由注冊的源碼:

BrokerController的start方法
// enableDLegerCommitLog 預設為false
  if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        startProcessorByHa(messageStoreConfig.getBrokerRole());
        // 處理主從資料同步,該方法會将master設定為null,然後啟動一個每隔10秒的定時任務
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
        // 将broker注冊到nameSrv,真正處理邏輯的是doRegisterBrokerAll, 在其内會重新設定master位址
        // 當master位址為null時不會進行資料同步
        this.registerBrokerAll(true, false, true);
     }
     // isForceRegister預設是設定為true的
     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                	// 發送心跳包
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
            // getRegisterNameServerPeriod預設值是30秒
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

           

那麼broker發生故障時是如何剔除的呢?之前NameSrvController的initialize方法啟動了一個每隔10秒掃描不活躍的broker的任務,那麼這個不活躍又是如何定義的呢?大體就是周遊brokerLiveTable,上一次收到心跳包的加上broker的有效時間如果小于目前時間那麼就定義為發生了故障,從緩存表中剔除。

public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            // 上一次更新時間加上兩分鐘小于目前時間說明該broker當機,将其連接配接關閉
            // BROKER_CHANNEL_EXPIRED_TIME值為1000*60*2也就是120秒
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }
           

以上就是路由的注冊與剔除。

RocketMq源碼分析(一)NameServer

NameServer的源碼很少,大家有興趣可以深入學習一下。