文章目錄
- 版本聲明
- NameServer介紹
- NameServer源碼分析
-
- KVConfigManager
- RouteInfoManager
- BrokerHouseKeepingService
- DefaultRequestProcessor
- NamesrvStartup
版本聲明
- 基于
版本rocketmq-all-4.3.1
- 如有發現分析不正确的地方歡迎指正,謝謝!
NameServer介紹
-
本身的高可用是通過部署多台NameServer
服務。NameServer
互相獨立,彼此之間不會通信(即多台NameServer
的資料并不是強一緻的),任意一台當機并不會影響其他的NameServer
NameServer
- 作用
- 維護活躍的Broker位址清單,包括Master和Slave
- 維護所有
和Topic
對應隊列的位址清單Topic
- 維護所有
的Broker
清單Filter
-
與Broker
關系NameServer
- 單個
與所有Broker
保持長連接配接NameServer
-
每隔30秒向所有Broker
發送心跳,心跳包含了自身的NameServer
資訊topic
-
每隔10秒鐘掃描所有存活的Broker連接配接,若2min内沒有發送心跳資訊,則斷開連接配接NameServer
-
在啟動後向所有NameServer注冊,Broker
在發送消息之前先從Producer
擷取NameServer
伺服器的位址清單,然後根據負載均衡算法從清單中選擇一台Broker
進行消息發送Broker
- 單個
- 穩定性
- nameserver互相獨立,無狀态
- nameserver不會有頻繁的讀寫,穩定性相對高
NameServer源碼分析
KVConfigManager
- 記憶體級的KV存儲,提供增删改查以及持久化資料的能力。本質就是一個HashMap
public class KVConfigManager { private final NamesrvController namesrvController; private final ReadWriteLock lock = new ReentrantReadWriteLock(); // private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>(); public KVConfigManager(NamesrvController namesrvController) { this.namesrvController = namesrvController; } public void load() { String content = null; try { content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath()); } catch (IOException e) { log.warn("Load KV config table exception", e); } if (content != null) { KVConfigSerializeWrapper kvConfigSerializeWrapper = KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class); if (null != kvConfigSerializeWrapper) { this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable()); log.info("load KV config table OK"); } } } }
RouteInfoManager
- 路由資訊即Broker向NameServer注冊後儲存的資訊,
儲存所有的RouteInfoManager
和Topic
資訊Broker
public class RouteInfoManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; private final ReadWriteLock lock = new ReentrantReadWriteLock(); //topic清單對應的隊列資訊 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //Broker位址資訊 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //broker叢集資訊 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //Broker目前存活的Broker(非實時) private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //Broker過濾資訊 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; public RouteInfoManager() { this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); this.brokerAddrTable = new HashMap<String, BrokerData>(128); this.clusterAddrTable = new HashMap<String, Set<String>>(32); this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); this.filterServerTable = new HashMap<String, List<String>>(256); } ...省略... }
RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析 - 成員變量解析
- topicQueueTable:Topic消息隊列路由資訊,包括topic所在的broker名稱,讀隊列數量,寫隊列數量,同步标記等資訊,rocketmq根據topicQueueTable的資訊進行負載均衡消息發送。
- brokerAddrTable:Broker節點資訊,包括brokerName,所在叢集名稱,還有主備節點資訊。
- clusterAddrTable:Broker叢集資訊,存儲了叢集中所有的BrokerName。
- brokerLiveTable:Broker狀态資訊,Nameserver每次收到Broker的心跳包就會更新該資訊。
- 通過遠端調試檢視具體内容(雙主雙從,兩個nameserver)
- ip位址清單
- rocketmq-slave2 172.19.0.7
- rocketmq-slave1 172.19.0.6
- rocketmq-master2 172.19.0.5
- rocketmq-master1 172.19.0.4
- rocketmq-nameserver2 172.19.0.3
- rocketmq-nameserver1 172.19.0.2
-
内容如下topicQueueTable
RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析 topicQueueTable資訊 { "RMQ_SYS_TRANS_HALF_TOPIC": [ { "brokerName": "rocketmq-master1", "perm": 6, "readQueueNums": 1, "topicSynFlag": 0, "writeQueueNums": 1 }, { "brokerName": "rocketmq-master2", "perm": 6, "readQueueNums": 1, "topicSynFlag": 0, "writeQueueNums": 1 } ], "rocketmq-master1": [ { "brokerName": "rocketmq-master1", "perm": 7, "readQueueNums": 1, "topicSynFlag": 0, "writeQueueNums": 1 } ], "rocketmq-master2": [ { "brokerName": "rocketmq-master2", "perm": 7, "readQueueNums": 1, "topicSynFlag": 0, "writeQueueNums": 1 } ], "SELF_TEST_TOPIC": [ { "brokerName": "rocketmq-master1", "perm": 6, "readQueueNums": 1, "topicSynFlag": 0, "writeQueueNums": 1 }, { "brokerName": "rocketmq-master2", "perm": 6, "readQueueNums": 1, "topicSynFlag": 0, "writeQueueNums": 1 } ], "TBW102": [ { "brokerName": "rocketmq-master1", "perm": 7, "readQueueNums": 4, "topicSynFlag": 0, "writeQueueNums": 4 }, { "brokerName": "rocketmq-master2", "perm": 7, "readQueueNums": 4, "topicSynFlag": 0, "writeQueueNums": 4 } ], "testTopic": [ { "brokerName": "rocketmq-master1", "perm": 6, "readQueueNums": 16, "topicSynFlag": 0, "writeQueueNums": 16 }, { "brokerName": "rocketmq-master2", "perm": 6, "readQueueNums": 16, "topicSynFlag": 0, "writeQueueNums": 16 } ], "BenchmarkTest": [ { "brokerName": "rocketmq-master1", "perm": 6, "readQueueNums": 1024, "topicSynFlag": 0, "writeQueueNums": 1024 }, { "brokerName": "rocketmq-master2", "perm": 6, "readQueueNums": 1024, "topicSynFlag": 0, "writeQueueNums": 1024 } ], "DefaultCluster": [ { "brokerName": "rocketmq-master1", "perm": 7, "readQueueNums": 16, "topicSynFlag": 0, "writeQueueNums": 16 }, { "brokerName": "rocketmq-master2", "perm": 7, "readQueueNums": 16, "topicSynFlag": 0, "writeQueueNums": 16 } ], "OFFSET_MOVED_EVENT": [ { "brokerName": "rocketmq-master1", "perm": 6, "readQueueNums": 1, "topicSynFlag": 0, "writeQueueNums": 1 }, { "brokerName": "rocketmq-master2", "perm": 6, "readQueueNums": 1, "topicSynFlag": 0, "writeQueueNums": 1 } ] }
-
内容如下BrokerAddrTable
RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析 brokerAddrTable資訊 { "rocketmq-master1": { "brokerAddrs": { 0: "172.19.0.4:10911", 1: "172.19.0.6:10921" }, "brokerName": "rocketmq-master1", "cluster": "DefaultCluster" }, "rocketmq-master2": { "brokerAddrs": { 0: "172.19.0.5:10912", 1: "172.19.0.7:10922" }, "brokerName": "rocketmq-master2", "cluster": "DefaultCluster" } }
-
内容如下clusterAddrTable
RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析 clusterAddrTable 資訊 { "DefaultCluster": [ "rocketmq-master1", "rocketmq-master2" ] }
-
内容如下brokerLiveTable
RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析 brokerLiveTable資訊 { "172.19.0.7:10922": { "channel": { "active": true, "inputShutdown": false, "open": true, "outputShutdown": false, "registered": true, "writable": true }, "dataVersion": { "counter": 3, "timestamp": 1562476312530 }, "haServerAddr": "172.19.0.7:10923", "lastUpdateTimestamp": 1562476361146 }, "172.19.0.5:10912": { "channel": { "active": true, "inputShutdown": false, "open": true, "outputShutdown": false, "registered": true, "writable": true }, "dataVersion": { "counter": 3, "timestamp": 1562476312530 }, "haServerAddr": "172.19.0.5:10913", "lastUpdateTimestamp": 1562476360402 }, "172.19.0.4:10911": { "channel": { "active": true, "inputShutdown": false, "open": true, "outputShutdown": false, "registered": true, "writable": true }, "dataVersion": { "counter": 3, "timestamp": 1562476312525 }, "haServerAddr": "172.19.0.4:10912", "lastUpdateTimestamp": 1562476359516 }, "172.19.0.6:10921": { "channel": { "active": true, "inputShutdown": false, "open": true, "outputShutdown": false, "registered": true, "writable": true }, "dataVersion": { "counter": 3, "timestamp": 1562476312525 }, "haServerAddr": "172.19.0.6:10922", "lastUpdateTimestamp": 1562476360541 } }
- ip位址清單
BrokerHouseKeepingService
-
:實作了BrokerHouseKeepingService
接口,用于處理ChannelEventListener
狀态事件,當Broker
失效、異常或者關閉,則将Broker
從Broker
中移除。RouteInfoManager
public class BrokerHousekeepingService implements ChannelEventListener { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final NamesrvController namesrvController; public BrokerHousekeepingService(NamesrvController namesrvController) { this.namesrvController = namesrvController; } @Override public void onChannelConnect(String remoteAddr, Channel channel) { } @Override public void onChannelClose(String remoteAddr, Channel channel) { //通道關閉從RouteInfoManager中移除Broker this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelException(String remoteAddr, Channel channel) { //通道發生異常從RouteInfoManager中移除Broker this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelIdle(String remoteAddr, Channel channel) { //通道失效從RouteInfoManager中移除Broker this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } }
DefaultRequestProcessor
-
預設請求處理器,根據DefaultRequestProcessor
執行相應的處理,核心處理方法RequestCode
processRequest()
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { //向NameServer追加KV配置 case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); //從NameServer擷取KV配置 case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); //從NameServer擷取KV配置 case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); //擷取版本資訊 case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); //注冊一個Broker,資料都是持久化的,如果存在則覆寫配置 case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } //解除安裝一個Broker,資料都是持久化的 case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); //根據Topic擷取Broker Name、topic配置資訊 case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); //擷取注冊到Name Server的所有Broker叢集資訊 case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); //去掉BrokerName的寫權限 case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); //從Name Server擷取完整Topic清單 case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); //從Namesrv删除Topic配置 case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); //通過Namespace擷取所有的KV List case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); //擷取指定叢集下的所有 topic case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); //擷取所有系統内置 Topic 清單 case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); //單元化相關 topic case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); //擷取含有單元化訂閱組的 Topic 清單 case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); //擷取含有單元化訂閱組的非單元化 case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); //更新Name Server配置 case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null; }
NamesrvStartup
-
是NamesrvStartup
的啟動入口,啟動的核心是調用NameServer
的NamesrvController
方法initialize()
public boolean initialize() { //從檔案中加載資料到記憶體中,預設從${user.home}/namesrv/kvConfig.json檔案加載 this.kvConfigManager.load(); //建立服務Server,傳入處理連接配接的ChannelEventListener this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //預設任務處理器的線程池,每一個RequestCode可以單獨設定一個線程池,如果不設定就使用預設的線程池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注冊預設處理器,根據requestCode執行相應的處理 this.registerProcessor(); //啟動後延遲5秒開始執行,每隔10秒執行一次,對于兩分鐘沒有活躍的broker,關閉連接配接 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //啟動後延遲1min,每隔10分鐘執行列印configTable this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } return true; }
RocketMQ源碼分析(四)之NameServer版本聲明NameServer介紹NameServer源碼分析