NameServerController啟動流程總覽
啟動類:org.apache.rocketmq.namesrv.NamesrvStartup#main
java複制代碼public static void main(String[] args) {
main0(args);
}
java複制代碼public static NamesrvController main0(String[] args) {
try {
//建立NamesrvController
NamesrvController controller = createNamesrvController(args);
//初始化并啟動NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" +
RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
1.建立NamesrvController
java複制代碼public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
//設定MQ版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//解析啟動指令 start mqnamesrv.cmd
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine
("mqnamesrv",
args,
buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
//建立NamesrvConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//建立NettyServerConfig
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//設定啟動端口号9876
nettyServerConfig.setListenPort(9876);
//解析啟動-c參數
if (commandLine.hasOption('c')) {
//-c指定配置檔案
String file = commandLine.getOptionValue('c');
if (file != null) {
//加載配置檔案到流
InputStream in = new BufferedInputStream
(new FileInputStream(file));
//加載屬性到InputStream
properties = new Properties();
properties.load(in);
//分别設定屬性到namesrvConfig 和 nettyServerConfig
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
//設定配置檔案存儲位址
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//-p來指定是否列印配置項,在指定該選項時,直接退出。
if (commandLine.hasOption('p')) {
InternalLogger console
= InternalLoggerFactory.getLogger
(LoggerName.NAMESRV_CONSOLE_NAME);
//列印namesrvConfig屬性
MixAll.printObjectProperties(console, namesrvConfig);
//列印nettyServerConfig 屬性
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
//将啟動參數填充到namesrvConfig,nettyServerConfig
MixAll.properties2Object
(ServerUtil.commandLine2Properties(commandLine),
namesrvConfig);
//建立NameServerController
final NamesrvController controller
= new NamesrvController(namesrvConfig, nettyServerConfig);
controller.getConfiguration().registerConfig(properties);
return controller;
}
2.初始化NamesrvController
3.啟動NamesrvController
java複制代碼 public static NamesrvController start(final NamesrvController controller)
throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//注冊JVM鈎子函數代碼
//在JVM程序關閉之前,先将線程池關閉,及時釋放資源
//可以借鑒的地方
Runtime.getRuntime()
.addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
//釋放資源
controller.shutdown();
return null;
}
}));
controller.start();
return controller;
}
public void shutdown() {
//關閉nettyServer
this.remotingServer.shutdown();
//關閉線程池
this.remotingExecutor.shutdown();
//關閉定時任務
this.scheduledExecutorService.shutdown();
//功能實作當檔案内容發生變化時,重新加載檔案,可用于讀取配置類的檔案。
//原理:注冊一個listener,然後新開個線程,定期去掃描檔案
//通過對檔案内容進行hash來判斷檔案内容是否發生變化
//如果變化了,則回調監聽器的onChange方法。
//看源碼主要是監聽證書
//關閉fileWatchService
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}
大緻流程如下圖
上面對源碼做了概覽,大緻知道了NameServerController啟動的流程分為3步,建立-初始化-啟動。
下面一步一步看吧。
1.建立nameServerController
1.1:解析配置檔案,建立NameSrvController
解析配置檔案,填充NameServerConfig、NettyServerConfig屬性值,并建立NamesrvController
注意NameServer建立的是是NettyServerConfig,Broker建立的是NettyClientConfig
NamesrvStartup#createNamesrvController
java複制代碼public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
//設定MQ版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//解析啟動指令 start mqnamesrv.cmd
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine
("mqnamesrv",
args,
buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
//建立NamesrvConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//建立NettyServerConfig
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//設定啟動端口号9876
nettyServerConfig.setListenPort(9876);
//解析啟動-c參數
if (commandLine.hasOption('c')) {
//-c指定配置檔案
String file = commandLine.getOptionValue('c');
if (file != null) {
//加載配置檔案到流
InputStream in = new BufferedInputStream
(new FileInputStream(file));
//加載屬性到InputStream
properties = new Properties();
properties.load(in);
//分别設定屬性到namesrvConfig 和 nettyServerConfig
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
//設定配置檔案存儲位址
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//-p來指定是否列印配置項,在指定該選項時,直接退出。
if (commandLine.hasOption('p')) {
InternalLogger console
= InternalLoggerFactory.getLogger
(LoggerName.NAMESRV_CONSOLE_NAME);
//列印namesrvConfig屬性
MixAll.printObjectProperties(console, namesrvConfig);
//列印nettyServerConfig 屬性
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
//将啟動參數填充到namesrvConfig,nettyServerConfig
MixAll.properties2Object
(ServerUtil.commandLine2Properties(commandLine),
namesrvConfig);
//建立NameServerController
final NamesrvController controller
= new NamesrvController(namesrvConfig, nettyServerConfig);
controller.getConfiguration().registerConfig(properties);
return controller;
}
2.初始化nameServerController
2.1:初始化NamesrvController
根據啟動屬性建立NamesrvController執行個體,并初始化該執行個體。
NameServerController執行個體為NameServer核心控制器。
NamesrvController#initialize
java複制代碼public boolean initialize() {
//加載KV配置
this.kvConfigManager.load();
/***
這裡需要看NettyRemotingServer構造方法
會把netty的啟動輔助類serverBootstrap建立好,這個是重點
儲存了channelEventListener。
建立了netty的boss線程。
建立publicExecutor線程池。
***/
this.remotingServer = new NettyRemotingServer
(this.nettyServerConfig, this.brokerHousekeepingService);
//建立線程池 預設是8個
this.remotingExecutor =
Executors.newFixedThreadPool
(nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
/***
建立DefaultRequestProcessor 作為netty server 請求處理器。
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
處理所有已知request code類型的請求
**/
this.registerProcessor();
//開啟定時任務:每隔10s掃描一次Broker,移除不活躍的Broker
//如果在2分鐘都沒有發送心跳 移除不活躍的Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//開啟定時任務:每隔10min列印一次KV配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
return true;
}
2.2:啟動定時任務:每10秒掃描一次所有broker
Broker30秒向NameServer發送一次心跳。
NamesrvController會開啟定時任務:每隔10s掃描一次Broker,移除不活躍的Broker。
移除broker是根據broker的lastUpdateStamp+2分鐘是否小于目前時間,如果小于就移除。
如果某個broker在2分鐘内都沒有發送心跳 那麼就移除該broker 即連續4次沒有發送心跳就移除
RouteInfoManager#scanNotActiveBroker
java複制代碼//掃描不活躍的broker
public void scanNotActiveBroker() {
//2分鐘
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
//HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
Iterator<Entry<String, BrokerLiveInfo>> it
= this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
//BrokerLiveInfo中的lastUpdateTimestamp存儲上次收到Broker心跳包的時間。
long last = next.getValue().getLastUpdateTimestamp();
//BrokerLiveInfo中的
//lastUpdateTimestamp+2分鐘小于目前時間說明 已經2分鐘沒有心跳了
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
//關閉并移除channel
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
//銷毀channel工作
this.onChannelDestroy(next.getKey(),
next.getValue().getChannel());
}
}
}
移除2分鐘沒心跳的broker的路由元資訊:RouteInfoManager#onChannelDestroy
java複制代碼 //路由元資訊
//類:RouteInfoManager
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
topicQueueTable: Topic消息隊列路由資訊,消息發送時根據路由表進行負載均衡
brokerAddrTable: Broker基礎資訊,包括brokerName、所屬叢集名稱、主備Broker位址
clusterAddrTable: Broker叢集資訊,存儲叢集中所有Broker名稱
brokerLiveTable: Broker狀态資訊,NameServer每次收到心跳包是會替換該資訊
filterServerTable: Broker上的FilterServer清單,用于類模式消息過濾。
一個Topic擁有多個消息隊列,一個Broker為每一個主題建立8個讀隊列和8個寫隊列。
多個Broker組成一個叢集,叢集由相同的多台Broker組成Master-Slave架構。
brokerId為0代表Master,大于0為Slave。
BrokerLiveInfo中的lastUpdateTimestamp存儲上次收到Broker心跳包的時間。
ini複制代碼//主要就是移除路由資訊表相關資訊
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
try {
try {
//申請寫鎖,根據brokerAddress
//從brokerLiveTable和filterServerTable移除
this.lock.readLock().lockInterruptibly();
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry
= itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
if (null == brokerAddrFound) {
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed" +
"clean it's data structure at once");
}
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try {
try {
this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
//維護<String/* brokerName */, BrokerData> brokerAddrTable
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
//周遊brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
//擷取brokerData
BrokerData brokerData = itBrokerAddrTable.next().getValue();
//周遊該broker的所有位址 即主從
Iterator<Entry<Long, String>> it
= brokerData.getBrokerAddrs().entrySet().iterator();
//循環周遊主從
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
//根據broker位址移除brokerAddr
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
break;
}
}
//如果移除以後沒有其他的BrokerAddr 相當于這個broker已經沒有執行個體了
//那麼把brokerData也從BrokerAddrTable 移除
// <String/* brokerName */, BrokerData> brokerAddrTable
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
}
}
/***
維護叢集資訊: key = clusterName value對應的set是 brokerName
<String, Set<String>> clusterAddrTable
這裡移除的條件是 removeBrokerName=true
removeBrokerName 是在移除brokerAddr時 當braokerData中的addrs為空
即該broker的主從都不存在 這個broker已經沒有執行個體了
設定removeBrokerName=true
***/
if (brokerNameFound != null && removeBrokerName) {
Iterator<Entry<String, Set<String>>>
it = this.clusterAddrTable.entrySet().iterator();
//周遊clusterAddrTable
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
//獲得叢集名稱
String clusterName = entry.getKey();
//獲得叢集中brokerName集合
Set<String> brokerNames = entry.getValue();
//從brokerNames中移除brokerNameFound
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
if (brokerNames.isEmpty()) {
//如果叢集中不包含任何broker,則移除該叢集
it.remove();
}
break;
}
}
}
//<String/* topic */, List<QueueData>> topicQueueTable隊列
//這裡移除的條件是 removeBrokerName=true
//removeBrokerName 是在移除brokerAddr時 當brokerData中的addrs為空
//即該broker的主從都不存在,這個broker已經沒有執行個體了
//設定removeBrokerName=true
if (removeBrokerName) {
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
//周遊topicQueueTable
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry
= itTopicQueueTable.next();
//主題名稱
String topic = entry.getKey();
//隊列集合
List<QueueData> queueDataList = entry.getValue();
//周遊該主題隊列
Iterator<QueueData> itQueueData
= queueDataList.iterator();
while (itQueueData.hasNext()) {
//擷取queueData
QueueData queueData = itQueueData.next();
//如果queueData中的brokerName等于本次移除的brokerName
//那麼從隊列中移除該queue
if (queueData.getBrokerName()
.equals(brokerNameFound)) {
itQueueData.remove();
}
}
//如果該topic的隊列為空,則移除該topic
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}
3.注冊jvm鈎子函數,啟動NameServerController
3.1注冊jvm鈎子函數,啟動NameSrvCtr
在JVM程序關閉之前,先将線程池關閉,及時釋放資源
NamesrvStartup#start
java複制代碼 public static NamesrvController start(final NamesrvController controller)
throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//注冊JVM鈎子函數代碼
//在JVM程序關閉之前,先将線程池關閉,及時釋放資源
//可以借鑒的地方
Runtime.getRuntime()
.addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
//釋放資源
controller.shutdown();
return null;
}
}));
controller.start();
return controller;
}
public void shutdown() {
//關閉nettyServer
this.remotingServer.shutdown();
//關閉線程池
this.remotingExecutor.shutdown();
//關閉定時任務
this.scheduledExecutorService.shutdown();
//功能實作當檔案内容發生變化時,重新加載檔案,可用于讀取配置類的檔案。
//原理:注冊一個listener,然後新開個線程,定期去掃描檔案
//通過對檔案内容進行hash來判斷檔案内容是否發生變化
//如果變化了,則回調監聽器的onChange方法。
//看源碼主要是監聽證書
//關閉fileWatchService
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}