文章目錄
-
- 源碼目錄
- 子產品入口代碼的功能 `NamesrvStartup`
-
- 入口函數:
- 解析指令行參數
- 初始化NameServer 的Controller
- NameServer 的總控邏輯
-
- 初始化執行線程池
- 啟動通信服務
- 核心業務邏輯處理
- 叢集狀态存儲
-
- 具體結構
- 控制通路這些結構的鎖機制
- 事件監聽 `BrokerHousekeepingService`
- 參考
源碼目錄
- 整個功能很簡單,一共就 8 個類
- KVConfigManager
- KVConfigSerializeWrapper
- ClusterTestRequestProcessor
- DefaultRequestProcessor
- BrokerHousekeepingService
- RouteInfoManager.java
- NamesrvController
- NamesrvStartup
- 依賴核心子產品:
rocketmq-remoting
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiIzYhdjYzcTN3YWNmdDOzkzN0YWM2IGZ5EGNjRDZxUjN9kXZLVmchh2cmQWYvxmb39GZ9Q2boRXZt9jN1ITO2M0NERTMGZkMGFkQ5QUM0QkQwATQwIzNzE0ME9CXlxWam9CXsFmbvNnclB3LclGch9CXzdXevwVbvNmLvFGZ19WeuUGdv52Lc9CX6MHc0RHaiojIsJye.jpg)
子產品入口代碼的功能 NamesrvStartup
NamesrvStartup
入口函數:
-
是NamesrvStartup
子產品的啟動入口,NameServer
是用來協塊各個調模功能的代碼NamesrvController
-
類的入口函數NamesrvStartup.java
,發現把邏輯轉到main
函數main0()
-
函數的主要功能:main0()
- 是解析指令行參數,重點是解析
參數-c 和-p
- 初始化
的NameServer
,即初始化了Controller
類NamesrvController.java
解析指令行參數
- 解析指令行參數源碼
# 解析指令行參數
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
# 判斷參數 -c
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
# 判斷 -p 後退出
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
- 簡單分析
-
指令行參數用來指定配置檔案的位置-c
-
指令行參數用來列印所有配置項的值-p
- 注意,用
參數列印配置項的值之後程式就退出了,這是一個幫助調試的選項-p
初始化NameServer 的Controller
-
函數的另外一個功能是初始化main0
,源碼如下NamesrvController
- 根據解析出的配置參數, 調用
來初始化,然後調用controller.initialize()
讓controller.start()
開始服務NameServer
- 還有一個邏輯是注冊
,當程式退出的時候會調用ShutdownHookThread
來做退出前的清理工作controller.shutdown
# createNamesrvController() 方法中,在解析參數後面
# 解析出配置參數
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
# start(controller) 方法中
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
# 初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
# 注冊 `ShutdownHookThread`, 當程式退出的時候會調用`controller.shutdown` 來做退出前的清理工作
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
# 啟動開始服務
controller.start();
return controller;
}
NameServer 的總控邏輯
NameServer
的總控邏輯在
NamesrvController.java
代碼中。
NameServer
是叢集的協調者,它隻是簡單地接收其他角色報上來的狀态,然後根據請求傳回相應的狀态
初始化執行線程池
-
函數完成,源碼如下:NameserverController.initialize()
- 一共3個線程池
- 啟動了一個預設是8 個線程的線程池
- 1個定時線程池,負責掃描失效的Broker(scanNotActiveBroker)
- 1個定時線程池,列印配置資訊(printAllPeriodically)
# 啟動了一個預設是8 個線程的線程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
# 定時線程池,負責掃描失效的Broker(scanNotActiveBroker)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
# 定時線程池,列印配置資訊(printAllPeriodically)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
啟動通信服務
- 啟動負責通信的服務
remotingServer
-
監昕一些端口,收到remotingServer
、Broker
等發過來的請求後,根據請求的指令,調用不同的Client
來處理。這些不同的處理邏輯被放到上面初始化的線程池中執行Processor
-
是基于Netty 封裝的一個網絡通信服務remotingServer
# NameserverController.initialize() 方法中
# 初始化 remotingServer
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
# 注冊 Processor
this.registerProcessor();
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
核心業務邏輯處理
- NameServer 的核心業務邏輯, 在
的DefaultRequestProcessor.java
方法中 中,網絡通信服務子產品收到請求後,就調用這個processRequest
Processor
來處理
2.邏輯主體是個
語句,根據switch
調用不同的函數來處理,從RequestCode
可以了解到RequestCode
的主要功能,比如:Name Server
-
是在叢集中新加入一個Broker 機器REGISTER_BROKER
-
是請求擷取一個Topic 的路由資訊GET_ROUTEINTO_BY_TOPIC
-
是删除一個Broker 的寫權限WIPE_WRITE_PERM_OF_BROKER
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
叢集狀态存儲
具體結構
-
作為叢集的協調者,需要儲存和維護叢集的各種中繼資料, 這是通過NameServer
類來實作的RoutelnfoManager
- 每個結構存儲着一類叢集資訊
屬性 | 描述 |
---|---|
HashMap<String, List> topicQueueTable | topicQueueTable 這個結構的Key 是Topic 的名稱,它存儲了所有Topic的屬性資訊。Value 是個QueueData 隊列, 隊裡的長度等于這個Topic資料存儲的Master Broker 的個數, QueueData 裡存儲着Broker 的名稱、讀寫queue 的數量、同步辨別等 |
HashMap<String, BrokerData> brokerAddrTable; | BrokerAddrTable以BrokerName 為索引,相同名稱的Broker 可能存在多台機器, 一個Master 和多個Slave 。這個結構存儲着一個BrokerName 對應的屬性資訊,包括所屬的Cluster 名稱, 一個Master Broker 和多個Slave Broker的位址資訊 |
HashMap<String>> clusterAddrTable; | ClusterAddrTable存儲的是叢集中C luster 的資訊,結果很簡單,就是一個Cluster 名稱對應一個由BrokerName 組成的集合 |
HashMap<String, BrokerLiveInfo> brokerLiveTable; | 這個結構和BrokerAddrTable 有關系,但是内容完全不同,這個結構的Key 是BrokerAddr ,也就是對應着一台機器, BrokerAddrTable 中的Key是BrokerName , 多個機器的BrokerName 可以相同。BrokerLiveTable存儲的内容是這台Broker 機器的實時狀态,包括上次更新狀态的時間戳, NameServer 會定期檢查這個時間戳,逾時沒有更新就認為這個Broker 無效了,将其從Broker 清單裡清除 |
HashMap<String> filterServerTable; | Filter Server 是過濾伺服器,是RocketMQ 的一種服務端過濾方式,一個Broker 可以有一個或多個F ilter Server 。這個結構的Key 是Broker的位址, Value 是和這個Broker 關聯的多個Filter Server 的位址 |
- 源碼如下
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
控制通路這些結構的鎖機制
- 在
的場景中,讀取操作多,更改操作少(讀多寫少),是以選擇讀寫鎖能大大提高效率NameServer
private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
中使用的是可重人的讀寫鎖,以RoutelnfoManager
函數為例:deleteTopic
public void deleteTopic(final String topic) {
try {
try {
this.lock.writeLock().lockInterruptibly();
this.topicQueueTable.remove(topic);
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("deleteTopic Exception", e);
}
}
事件監聽 BrokerHousekeepingService
BrokerHousekeepingService
- 繼承
類,主要負責連接配接斷開的回調與監聽ChannelEventListener
- 監聽的事件如:
、onChannelConnect
、onChannelClose
、onChannelException
onChannelIdle
- 在監聽的回調方法中,會調用
中NamesrvController
中的方法,更新狀态資訊等RouteInfoManager
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
參考
源碼位址