天天看點

RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析

文章目錄

  • 版本聲明
  • NameServer介紹
  • NameServer源碼分析
    • KVConfigManager
    • RouteInfoManager
    • BrokerHouseKeepingService
    • DefaultRequestProcessor
    • NamesrvStartup

版本聲明

  1. 基于

    rocketmq-all-4.3.1

    版本
  2. 如有發現分析不正确的地方歡迎指正,謝謝!

NameServer介紹

  1. NameServer

    本身的高可用是通過部署多台

    NameServer

    服務。

    NameServer

    互相獨立,彼此之間不會通信(即多台

    NameServer

    的資料并不是強一緻的),任意一台當機并不會影響其他的

    NameServer

  2. 作用
    • 維護活躍的Broker位址清單,包括Master和Slave
    • 維護所有

      Topic

      Topic

      對應隊列的位址清單
    • 維護所有

      Broker

      Filter

      清單
  3. Broker

    NameServer

    關系
    • 單個

      Broker

      與所有

      NameServer

      保持長連接配接
    • Broker

      每隔30秒向所有

      NameServer

      發送心跳,心跳包含了自身的

      topic

      資訊
    • NameServer

      每隔10秒鐘掃描所有存活的Broker連接配接,若2min内沒有發送心跳資訊,則斷開連接配接
    • Broker

      在啟動後向所有NameServer注冊,

      Producer

      在發送消息之前先從

      NameServer

      擷取

      Broker

      伺服器的位址清單,然後根據負載均衡算法從清單中選擇一台

      Broker

      進行消息發送
  4. 穩定性
    • nameserver互相獨立,無狀态
    • nameserver不會有頻繁的讀寫,穩定性相對高

NameServer源碼分析

KVConfigManager

  1. 記憶體級的KV存儲,提供增删改查以及持久化資料的能力。本質就是一個HashMap
    public class KVConfigManager {
         
            private final NamesrvController namesrvController;
        
            private final ReadWriteLock lock = new ReentrantReadWriteLock();
            // 
            private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
                new HashMap<String, HashMap<String, String>>();
        
            public KVConfigManager(NamesrvController namesrvController) {
                this.namesrvController = namesrvController;
            }
        
            public void load() {
                String content = null;
                try {
                    content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
                } catch (IOException e) {
                    log.warn("Load KV config table exception", e);
                }
                if (content != null) {
                    KVConfigSerializeWrapper kvConfigSerializeWrapper =
                        KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
                    if (null != kvConfigSerializeWrapper) {
                        this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
                        log.info("load KV config table OK");
                    }
                }
            }  
        }  
               

RouteInfoManager

  1. 路由資訊即Broker向NameServer注冊後儲存的資訊,

    RouteInfoManager

    儲存所有的

    Topic

    Broker

    資訊
    public class RouteInfoManager {
            private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
            private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
            private final ReadWriteLock lock = new ReentrantReadWriteLock();
            //topic清單對應的隊列資訊
            private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
            //Broker位址資訊
            private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
            //broker叢集資訊
            private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
            //Broker目前存活的Broker(非實時)
            private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
            //Broker過濾資訊
            private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
        
            public RouteInfoManager() {
                this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
                this.brokerAddrTable = new HashMap<String, BrokerData>(128);
                this.clusterAddrTable = new HashMap<String, Set<String>>(32);
                this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
                this.filterServerTable = new HashMap<String, List<String>>(256);
            }
       ...省略... 
       } 
    
    
               
    RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析
  2. 成員變量解析
    • topicQueueTable:Topic消息隊列路由資訊,包括topic所在的broker名稱,讀隊列數量,寫隊列數量,同步标記等資訊,rocketmq根據topicQueueTable的資訊進行負載均衡消息發送。
    • brokerAddrTable:Broker節點資訊,包括brokerName,所在叢集名稱,還有主備節點資訊。
    • clusterAddrTable:Broker叢集資訊,存儲了叢集中所有的BrokerName。
    • brokerLiveTable:Broker狀态資訊,Nameserver每次收到Broker的心跳包就會更新該資訊。
  3. 通過遠端調試檢視具體内容(雙主雙從,兩個nameserver)
    • ip位址清單
      • rocketmq-slave2 172.19.0.7
      • rocketmq-slave1 172.19.0.6
      • rocketmq-master2 172.19.0.5
      • rocketmq-master1 172.19.0.4
      • rocketmq-nameserver2 172.19.0.3
      • rocketmq-nameserver1 172.19.0.2
    • topicQueueTable

      内容如下
      RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析
      topicQueueTable資訊
      
      {
          "RMQ_SYS_TRANS_HALF_TOPIC": [
              {
                  "brokerName": "rocketmq-master1",
                  "perm": 6,
                  "readQueueNums": 1,
                  "topicSynFlag": 0,
                  "writeQueueNums": 1
              },
              {
                  "brokerName": "rocketmq-master2",
                  "perm": 6,
                  "readQueueNums": 1,
                  "topicSynFlag": 0,
                  "writeQueueNums": 1
              }
          ],
          "rocketmq-master1": [
              {
                  "brokerName": "rocketmq-master1",
                  "perm": 7,
                  "readQueueNums": 1,
                  "topicSynFlag": 0,
                  "writeQueueNums": 1
              }
          ],
          "rocketmq-master2": [
              {
                  "brokerName": "rocketmq-master2",
                  "perm": 7,
                  "readQueueNums": 1,
                  "topicSynFlag": 0,
                  "writeQueueNums": 1
              }
          ],
          "SELF_TEST_TOPIC": [
              {
                  "brokerName": "rocketmq-master1",
                  "perm": 6,
                  "readQueueNums": 1,
                  "topicSynFlag": 0,
                  "writeQueueNums": 1
              },
              {
                  "brokerName": "rocketmq-master2",
                  "perm": 6,
                  "readQueueNums": 1,
                  "topicSynFlag": 0,
                  "writeQueueNums": 1
              }
          ],
          "TBW102": [
              {
                  "brokerName": "rocketmq-master1",
                  "perm": 7,
                  "readQueueNums": 4,
                  "topicSynFlag": 0,
                  "writeQueueNums": 4
              },
              {
                  "brokerName": "rocketmq-master2",
                  "perm": 7,
                  "readQueueNums": 4,
                  "topicSynFlag": 0,
                  "writeQueueNums": 4
              }
          ],
          "testTopic": [
              {
                  "brokerName": "rocketmq-master1",
                  "perm": 6,
                  "readQueueNums": 16,
                  "topicSynFlag": 0,
                  "writeQueueNums": 16
              },
              {
                  "brokerName": "rocketmq-master2",
                  "perm": 6,
                  "readQueueNums": 16,
                  "topicSynFlag": 0,
                  "writeQueueNums": 16
              }
          ],
          "BenchmarkTest": [
              {
                  "brokerName": "rocketmq-master1",
                  "perm": 6,
                  "readQueueNums": 1024,
                  "topicSynFlag": 0,
                  "writeQueueNums": 1024
              },
              {
                  "brokerName": "rocketmq-master2",
                  "perm": 6,
                  "readQueueNums": 1024,
                  "topicSynFlag": 0,
                  "writeQueueNums": 1024
              }
          ],
          "DefaultCluster": [
              {
                  "brokerName": "rocketmq-master1",
                  "perm": 7,
                  "readQueueNums": 16,
                  "topicSynFlag": 0,
                  "writeQueueNums": 16
              },
              {
                  "brokerName": "rocketmq-master2",
                  "perm": 7,
                  "readQueueNums": 16,
                  "topicSynFlag": 0,
                  "writeQueueNums": 16
              }
          ],
          "OFFSET_MOVED_EVENT": [
              {
                  "brokerName": "rocketmq-master1",
                  "perm": 6,
                  "readQueueNums": 1,
                  "topicSynFlag": 0,
                  "writeQueueNums": 1
              },
              {
                  "brokerName": "rocketmq-master2",
                  "perm": 6,
                  "readQueueNums": 1,
                  "topicSynFlag": 0,
                  "writeQueueNums": 1
              }
          ]
      }
                 
    • BrokerAddrTable

      内容如下
      RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析
      brokerAddrTable資訊
      
      {
          "rocketmq-master1": {
              "brokerAddrs": {
                  0: "172.19.0.4:10911",
                  1: "172.19.0.6:10921"
              },
              "brokerName": "rocketmq-master1",
              "cluster": "DefaultCluster"
          },
          "rocketmq-master2": {
              "brokerAddrs": {
                  0: "172.19.0.5:10912",
                  1: "172.19.0.7:10922"
              },
              "brokerName": "rocketmq-master2",
              "cluster": "DefaultCluster"
          }
      }
                 
    • clusterAddrTable

      内容如下
      RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析
      clusterAddrTable 資訊
      
      {
          "DefaultCluster": [
              "rocketmq-master1",
              "rocketmq-master2"
          ]
      }
                 
    • brokerLiveTable

      内容如下
      RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析
      brokerLiveTable資訊
      {
          "172.19.0.7:10922": {
              "channel": {
                  "active": true,
                  "inputShutdown": false,
                  "open": true,
                  "outputShutdown": false,
                  "registered": true,
                  "writable": true
              },
              "dataVersion": {
                  "counter": 3,
                  "timestamp": 1562476312530
              },
              "haServerAddr": "172.19.0.7:10923",
              "lastUpdateTimestamp": 1562476361146
          },
          "172.19.0.5:10912": {
              "channel": {
                  "active": true,
                  "inputShutdown": false,
                  "open": true,
                  "outputShutdown": false,
                  "registered": true,
                  "writable": true
              },
              "dataVersion": {
                  "counter": 3,
                  "timestamp": 1562476312530
              },
              "haServerAddr": "172.19.0.5:10913",
              "lastUpdateTimestamp": 1562476360402
          },
          "172.19.0.4:10911": {
              "channel": {
                  "active": true,
                  "inputShutdown": false,
                  "open": true,
                  "outputShutdown": false,
                  "registered": true,
                  "writable": true
              },
              "dataVersion": {
                  "counter": 3,
                  "timestamp": 1562476312525
              },
              "haServerAddr": "172.19.0.4:10912",
              "lastUpdateTimestamp": 1562476359516
          },
          "172.19.0.6:10921": {
              "channel": {
                  "active": true,
                  "inputShutdown": false,
                  "open": true,
                  "outputShutdown": false,
                  "registered": true,
                  "writable": true
              },
              "dataVersion": {
                  "counter": 3,
                  "timestamp": 1562476312525
              },
              "haServerAddr": "172.19.0.6:10922",
              "lastUpdateTimestamp": 1562476360541
          }
      }
                 

BrokerHouseKeepingService

  1. BrokerHouseKeepingService

    :實作了

    ChannelEventListener

    接口,用于處理

    Broker

    狀态事件,當

    Broker

    失效、異常或者關閉,則将

    Broker

    RouteInfoManager

    中移除。
    public class BrokerHousekeepingService implements ChannelEventListener {
            private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
            private final NamesrvController namesrvController;
        
            public BrokerHousekeepingService(NamesrvController namesrvController) {
                this.namesrvController = namesrvController;
            }
        
            @Override
            public void onChannelConnect(String remoteAddr, Channel channel) {
            }
        
            @Override
            public void onChannelClose(String remoteAddr, Channel channel) {
                //通道關閉從RouteInfoManager中移除Broker
                this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
            }
        
            @Override
            public void onChannelException(String remoteAddr, Channel channel) {
                //通道發生異常從RouteInfoManager中移除Broker
                this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
            }
        
            @Override
            public void onChannelIdle(String remoteAddr, Channel channel) {
                //通道失效從RouteInfoManager中移除Broker
                this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
            }
        }
        
    
               

DefaultRequestProcessor

  1. DefaultRequestProcessor

    預設請求處理器,根據

    RequestCode

    執行相應的處理,核心處理方法

    processRequest()

    @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
    
            if (ctx != null) {
                log.debug("receive request, {} {} {}",
                    request.getCode(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    request);
            }
    
    
            switch (request.getCode()) {
                //向NameServer追加KV配置
                case RequestCode.PUT_KV_CONFIG:
                    return this.putKVConfig(ctx, request);
                //從NameServer擷取KV配置
                case RequestCode.GET_KV_CONFIG:
                    return this.getKVConfig(ctx, request);
                //從NameServer擷取KV配置
                case RequestCode.DELETE_KV_CONFIG:
                    return this.deleteKVConfig(ctx, request);
                //擷取版本資訊
                case RequestCode.QUERY_DATA_VERSION:
                    return queryBrokerTopicConfig(ctx, request);
                //注冊一個Broker,資料都是持久化的,如果存在則覆寫配置
                case RequestCode.REGISTER_BROKER:
                    Version brokerVersion = MQVersion.value2Version(request.getVersion());
                    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                        return this.registerBrokerWithFilterServer(ctx, request);
                    } else {
                        return this.registerBroker(ctx, request);
                    }
                //解除安裝一個Broker,資料都是持久化的
                case RequestCode.UNREGISTER_BROKER:
                    return this.unregisterBroker(ctx, request);
                //根據Topic擷取Broker Name、topic配置資訊
                case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                    return this.getRouteInfoByTopic(ctx, request);
                //擷取注冊到Name Server的所有Broker叢集資訊
                case RequestCode.GET_BROKER_CLUSTER_INFO:
                    return this.getBrokerClusterInfo(ctx, request);
                //去掉BrokerName的寫權限
                case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                    return this.wipeWritePermOfBroker(ctx, request);
                //從Name Server擷取完整Topic清單
                case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                    return getAllTopicListFromNameserver(ctx, request);
                //從Namesrv删除Topic配置
                case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                    return deleteTopicInNamesrv(ctx, request);
                //通過Namespace擷取所有的KV List
                case RequestCode.GET_KVLIST_BY_NAMESPACE:
                    return this.getKVListByNamespace(ctx, request);
                //擷取指定叢集下的所有 topic
                case RequestCode.GET_TOPICS_BY_CLUSTER:
                    return this.getTopicsByCluster(ctx, request);
                //擷取所有系統内置 Topic 清單
                case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                    return this.getSystemTopicListFromNs(ctx, request);
                //單元化相關 topic
                case RequestCode.GET_UNIT_TOPIC_LIST:
                    return this.getUnitTopicList(ctx, request);
                //擷取含有單元化訂閱組的 Topic 清單
                case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                    return this.getHasUnitSubTopicList(ctx, request);
                //擷取含有單元化訂閱組的非單元化
                case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                    return this.getHasUnitSubUnUnitTopicList(ctx, request);
                //更新Name Server配置
                case RequestCode.UPDATE_NAMESRV_CONFIG:
                    return this.updateConfig(ctx, request);
                case RequestCode.GET_NAMESRV_CONFIG:
                    return this.getConfig(ctx, request);
                default:
                    break;
            }
            return null;
        }
    
    
               

NamesrvStartup

  1. NamesrvStartup

    NameServer

    的啟動入口,啟動的核心是調用

    NamesrvController

    initialize()

    方法
    public boolean initialize() {
        
                //從檔案中加載資料到記憶體中,預設從${user.home}/namesrv/kvConfig.json檔案加載
                this.kvConfigManager.load();
                //建立服務Server,傳入處理連接配接的ChannelEventListener
                this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        
                //預設任務處理器的線程池,每一個RequestCode可以單獨設定一個線程池,如果不設定就使用預設的線程池
                this.remotingExecutor =
                    Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        
                //注冊預設處理器,根據requestCode執行相應的處理
                this.registerProcessor();
        
                //啟動後延遲5秒開始執行,每隔10秒執行一次,對于兩分鐘沒有活躍的broker,關閉連接配接
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        
                    @Override
                    public void run() {
                        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                    }
                }, 5, 10, TimeUnit.SECONDS);
        
                //啟動後延遲1min,每隔10分鐘執行列印configTable
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        
                    @Override
                    public void run() {
                        NamesrvController.this.kvConfigManager.printAllPeriodically();
                    }
                }, 1, 10, TimeUnit.MINUTES);
        
                if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                    // Register a listener to reload SslContext
                    try {
                        fileWatchService = new FileWatchService(
                            new String[] {
                                TlsSystemConfig.tlsServerCertPath,
                                TlsSystemConfig.tlsServerKeyPath,
                                TlsSystemConfig.tlsServerTrustCertPath
                            },
                            new FileWatchService.Listener() {
                                boolean certChanged, keyChanged = false;
                                @Override
                                public void onChanged(String path) {
                                    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                        log.info("The trust certificate changed, reload the ssl context");
                                        reloadServerSslContext();
                                    }
                                    if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                        certChanged = true;
                                    }
                                    if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                        keyChanged = true;
                                    }
                                    if (certChanged && keyChanged) {
                                        log.info("The certificate and private key changed, reload the ssl context");
                                        certChanged = keyChanged = false;
                                        reloadServerSslContext();
                                    }
                                }
                                private void reloadServerSslContext() {
                                    ((NettyRemotingServer) remotingServer).loadSslContext();
                                }
                            });
                    } catch (Exception e) {
                        log.warn("FileWatchService created error, can't load the certificate dynamically");
                    }
                }
        
                return true;
            }
        
    
               
    RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析

繼續閱讀