天天看點

RocketMQ4.3.X筆記(11):NameServer 源碼解析

文章目錄

    • 源碼目錄
    • 子產品入口代碼的功能 `NamesrvStartup`
      • 入口函數:
      • 解析指令行參數
      • 初始化NameServer 的Controller
    • NameServer 的總控邏輯
      • 初始化執行線程池
      • 啟動通信服務
    • 核心業務邏輯處理
    • 叢集狀态存儲
      • 具體結構
      • 控制通路這些結構的鎖機制
    • 事件監聽 `BrokerHousekeepingService`
    • 參考

源碼目錄

  1. 整個功能很簡單,一共就 8 個類
  • KVConfigManager
  • KVConfigSerializeWrapper
  • ClusterTestRequestProcessor
  • DefaultRequestProcessor
  • BrokerHousekeepingService
  • RouteInfoManager.java
  • NamesrvController
  • NamesrvStartup
  1. 依賴核心子產品:

    rocketmq-remoting

RocketMQ4.3.X筆記(11):NameServer 源碼解析

子產品入口代碼的功能

NamesrvStartup

入口函數:

  1. NamesrvStartup

    NameServer

    子產品的啟動入口,

    NamesrvController

    是用來協塊各個調模功能的代碼
  2. NamesrvStartup.java

    類的入口函數

    main

    ,發現把邏輯轉到

    main0()

    函數
  3. main0()

    函數的主要功能:
  • 是解析指令行參數,重點是解析

    -c 和-p

    參數
  • 初始化

    NameServer

    Controller

    ,即初始化了

    NamesrvController.java

解析指令行參數

  1. 解析指令行參數源碼
# 解析指令行參數
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
    System.exit(-1);
    return null;
}

# 判斷參數 -c
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);

        namesrvConfig.setConfigStorePath(file);

        System.out.printf("load config properties file OK, %s%n", file);
        in.close();
    }
}

# 判斷 -p 後退出
if (commandLine.hasOption('p')) {
    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
    MixAll.printObjectProperties(console, namesrvConfig);
    MixAll.printObjectProperties(console, nettyServerConfig);
    System.exit(0);
}
           
  1. 簡單分析
  • -c

    指令行參數用來指定配置檔案的位置
  • -p

    指令行參數用來列印所有配置項的值
  • 注意,用

    -p

    參數列印配置項的值之後程式就退出了,這是一個幫助調試的選項

初始化NameServer 的Controller

  1. main0

    函數的另外一個功能是初始化

    NamesrvController

    ,源碼如下
  • 根據解析出的配置參數, 調用

    controller.initialize()

    來初始化,然後調用

    controller.start()

    NameServer

    開始服務
  • 還有一個邏輯是注冊

    ShutdownHookThread

    ,當程式退出的時候會調用

    controller.shutdown

    來做退出前的清理工作
# createNamesrvController() 方法中,在解析參數後面
# 解析出配置參數
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

           
# start(controller) 方法中
public static NamesrvController start(final NamesrvController controller) throws Exception {

    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }
    # 初始化
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }

    # 注冊 `ShutdownHookThread`, 當程式退出的時候會調用`controller.shutdown` 來做退出前的清理工作
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            return null;
        }
    }));
    # 啟動開始服務
    controller.start();

    return controller;
}
           

NameServer 的總控邏輯

NameServer

的總控邏輯在

NamesrvController.java

代碼中。

NameServer

是叢集的協調者,它隻是簡單地接收其他角色報上來的狀态,然後根據請求傳回相應的狀态

初始化執行線程池

  1. NameserverController.initialize()

    函數完成,源碼如下:
  2. 一共3個線程池
  • 啟動了一個預設是8 個線程的線程池
  • 1個定時線程池,負責掃描失效的Broker(scanNotActiveBroker)
  • 1個定時線程池,列印配置資訊(printAllPeriodically)
# 啟動了一個預設是8 個線程的線程池
this.remotingExecutor =
    Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

# 定時線程池,負責掃描失效的Broker(scanNotActiveBroker)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);

# 定時線程池,列印配置資訊(printAllPeriodically)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        NamesrvController.this.kvConfigManager.printAllPeriodically();
    }
}, 1, 10, TimeUnit.MINUTES);

           

啟動通信服務

  1. 啟動負責通信的服務

    remotingServer

  2. remotingServer

    監昕一些端口,收到

    Broker

    Client

    等發過來的請求後,根據請求的指令,調用不同的

    Processor

    來處理。這些不同的處理邏輯被放到上面初始化的線程池中執行
  3. remotingServer

    是基于Netty 封裝的一個網絡通信服務
# NameserverController.initialize() 方法中
# 初始化 remotingServer
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

# 注冊 Processor
this.registerProcessor();
           
private void registerProcessor() {
    if (namesrvConfig.isClusterTest()) {
        this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
            this.remotingExecutor);
    } else {
        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
    }
}
           

核心業務邏輯處理

  1. NameServer 的核心業務邏輯, 在

    DefaultRequestProcessor.java

    processRequest

    方法中 中,網絡通信服務子產品收到請求後,就調用這個

    Processor

    來處理

    2.邏輯主體是個

    switch

    語句,根據

    RequestCode

    調用不同的函數來處理,從

    RequestCode

    可以了解到

    Name Server

    的主要功能,比如:
  • REGISTER_BROKER

    是在叢集中新加入一個Broker 機器
  • GET_ROUTEINTO_BY_TOPIC

    是請求擷取一個Topic 的路由資訊
  • WIPE_WRITE_PERM_OF_BROKER

    是删除一個Broker 的寫權限
switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:
        return this.putKVConfig(ctx, request);
    case RequestCode.GET_KV_CONFIG:
        return this.getKVConfig(ctx, request);
    case RequestCode.DELETE_KV_CONFIG:
        return this.deleteKVConfig(ctx, request);
    case RequestCode.QUERY_DATA_VERSION:
        return queryBrokerTopicConfig(ctx, request);
    case RequestCode.REGISTER_BROKER:
        Version brokerVersion = MQVersion.value2Version(request.getVersion());
        if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
            return this.registerBrokerWithFilterServer(ctx, request);
        } else {
            return this.registerBroker(ctx, request);
        }
    case RequestCode.UNREGISTER_BROKER:
        return this.unregisterBroker(ctx, request);
    case RequestCode.GET_ROUTEINTO_BY_TOPIC:
        return this.getRouteInfoByTopic(ctx, request);
    case RequestCode.GET_BROKER_CLUSTER_INFO:
        return this.getBrokerClusterInfo(ctx, request);
    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
        return this.wipeWritePermOfBroker(ctx, request);
    case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
        return getAllTopicListFromNameserver(ctx, request);
    case RequestCode.DELETE_TOPIC_IN_NAMESRV:
        return deleteTopicInNamesrv(ctx, request);
    case RequestCode.GET_KVLIST_BY_NAMESPACE:
        return this.getKVListByNamespace(ctx, request);
    case RequestCode.GET_TOPICS_BY_CLUSTER:
        return this.getTopicsByCluster(ctx, request);
    case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
        return this.getSystemTopicListFromNs(ctx, request);
    case RequestCode.GET_UNIT_TOPIC_LIST:
        return this.getUnitTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
        return this.getHasUnitSubTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
        return this.getHasUnitSubUnUnitTopicList(ctx, request);
    case RequestCode.UPDATE_NAMESRV_CONFIG:
        return this.updateConfig(ctx, request);
    case RequestCode.GET_NAMESRV_CONFIG:
        return this.getConfig(ctx, request);
    default:
        break;
}
           

叢集狀态存儲

具體結構

  1. NameServer

    作為叢集的協調者,需要儲存和維護叢集的各種中繼資料, 這是通過

    RoutelnfoManager

    類來實作的
  2. 每個結構存儲着一類叢集資訊
屬性 描述
HashMap<String, List> topicQueueTable topicQueueTable 這個結構的Key 是Topic 的名稱,它存儲了所有Topic的屬性資訊。Value 是個QueueData 隊列, 隊裡的長度等于這個Topic資料存儲的Master Broker 的個數, QueueData 裡存儲着Broker 的名稱、讀寫queue 的數量、同步辨別等
HashMap<String, BrokerData> brokerAddrTable; BrokerAddrTable以BrokerName 為索引,相同名稱的Broker 可能存在多台機器, 一個Master 和多個Slave 。這個結構存儲着一個BrokerName 對應的屬性資訊,包括所屬的Cluster 名稱, 一個Master Broker 和多個Slave Broker的位址資訊
HashMap<String>> clusterAddrTable; ClusterAddrTable存儲的是叢集中C luster 的資訊,結果很簡單,就是一個Cluster 名稱對應一個由BrokerName 組成的集合
HashMap<String, BrokerLiveInfo> brokerLiveTable; 這個結構和BrokerAddrTable 有關系,但是内容完全不同,這個結構的Key 是BrokerAddr ,也就是對應着一台機器, BrokerAddrTable 中的Key是BrokerName , 多個機器的BrokerName 可以相同。BrokerLiveTable存儲的内容是這台Broker 機器的實時狀态,包括上次更新狀态的時間戳, NameServer 會定期檢查這個時間戳,逾時沒有更新就認為這個Broker 無效了,将其從Broker 清單裡清除
HashMap<String> filterServerTable; Filter Server 是過濾伺服器,是RocketMQ 的一種服務端過濾方式,一個Broker 可以有一個或多個F ilter Server 。這個結構的Key 是Broker的位址, Value 是和這個Broker 關聯的多個Filter Server 的位址
  1. 源碼如下
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

public RouteInfoManager() {
    this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
    this.brokerAddrTable = new HashMap<String, BrokerData>(128);
    this.clusterAddrTable = new HashMap<String, Set<String>>(32);
    this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
    this.filterServerTable = new HashMap<String, List<String>>(256);
}
           

控制通路這些結構的鎖機制

  1. NameServer

    的場景中,讀取操作多,更改操作少(讀多寫少),是以選擇讀寫鎖能大大提高效率
private final ReadWriteLock lock = new ReentrantReadWriteLock();
           
  1. RoutelnfoManager

    中使用的是可重人的讀寫鎖,以

    deleteTopic

    函數為例:
public void deleteTopic(final String topic) {
    try {
        try {
            this.lock.writeLock().lockInterruptibly();
            this.topicQueueTable.remove(topic);
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("deleteTopic Exception", e);
    }
}
           

事件監聽

BrokerHousekeepingService

  1. 繼承

    ChannelEventListener

    類,主要負責連接配接斷開的回調與監聽
  2. 監聽的事件如:

    onChannelConnect

    onChannelClose

    onChannelException

    onChannelIdle

  3. 在監聽的回調方法中,會調用

    NamesrvController

    RouteInfoManager

    中的方法,更新狀态資訊等
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
    this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

@Override
public void onChannelException(String remoteAddr, Channel channel) {
    this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
    this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
           

參考

源碼位址

繼續閱讀