目錄:
- RocketMQ源碼解析——搭建源碼環境
- RocketMQ源碼解析——NameServer
1. 了解RocketMQ核心元件——NameServer:
RocketMQ消息中間件的設計思路一般是基于主題訂閱釋出的機制,消息生産者(Producer)發送一個消息到消息伺服器,消息伺服器負責将消息持久化存儲,消息消費者(Consumer)訂閱該興趣的主題,消息伺服器根據訂閱資訊(路由資訊)将消息推送到消費者(Push模式)或者消費者主動向消息伺服器拉去(Pull模式),進而實作消息生産者與消息消費者解耦。
為了避免消息伺服器的單點故障導緻的整個系統癱瘓,通常會部署多台消息伺服器共同承擔消息的存儲。那消息生産者如何知道消息要發送到哪台消息伺服器呢?如果某一台消息伺服器當機了,那麼消息生産者如何在不重新開機服務情況下感覺呢?
而NameServer就是為了解決以上問題設計的,整體設計圖如下所示:

Broker消息伺服器在啟動的時向所有NameServer注冊,消息生産者(Producer)在發送消息時之前先從NameServer擷取Broker伺服器位址清單,然後根據負載均衡算法從清單中選擇一台伺服器進行發送。NameServer與每台Broker保持長連接配接,并間隔一定時間檢測Broker是否存活,如果檢測到Broker當機,則從路由系統資料庫中删除。
2. 流程時序圖:
3. 源碼解析:
找到NameServer啟動類,及其啟動入口:
public static void main(String[] args) { // NameServer啟動入口
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args); // 建立NameServer控制器,前面說到NameServer的主要作用就是作為資訊注冊中心,需要接收其他元件的注冊/擷取資訊請求。是以可以将其看做是一個接收請求的controller
start(controller); // 啟動controller
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1); // 建立/啟動controller異常,停止JVM
}
return null;
}
- 建立NameServerController:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); // 設定RocketMQ版本屬性資訊
Options options = ServerUtil.buildCommandlineOptions(new Options()); // 建構指令行解析options對象
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); // 建立解析指令行參數對象
if (null == commandLine) {
System.exit(-1);
return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig(); // 建立NameServer配置對象
final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 建立Netty伺服器端配置對象,用于接收請求(元件之間的通信主要依賴netty)
nettyServerConfig.setListenPort(9876); // 預設監聽端口9876
// 解析啟動指令-c參數
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 解析啟動指令-p參數
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
以上代碼就是建立NameServer控制器的整體流程,其中可以發現控制器的建立依賴于NameServer和NettyService配置對象,是以重點來看下這兩個配置對象當中有哪些重要的屬性:
/**
* NamesrvConfig重點屬性
*/
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); // RocketMQ項目的目錄,也就是它的家目錄
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; // NameServer存儲KV配置屬性的持久化路徑
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; // NameServer預設配置檔案路徑
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false; // 是否支援順序消息,預設不支援
/**
* NettyServerConfig重點屬性
*/
private int listenPort = 8888; // NameServer監聽端口,預設是8888,但是會被修改為9876
private int serverWorkerThreads = 8; // 線程池線程個數
private int serverCallbackExecutorThreads = 0; // 回調函數處理池線程個數(比如說異步發送消息,會提供一個回調方法)
private int serverSelectorThreads = 3; // IO線程池線程個數,主要是處理網絡請求,解析請求包
private int serverOnewaySemaphoreValue = 256; // 單向消息發送最大并發數
private int serverAsyncSemaphoreValue = 64; // 異步消息發送最大并發數
private int serverChannelMaxIdleTimeSeconds = 120; // 網絡連接配接最大空閑時間,預設120s
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; // 網絡發送緩沖區大小
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; // 網絡接收緩沖區大小
private boolean serverPooledByteBufAllocatorEnable = true;
然後建立控制器:
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig; // 指定NameServer配置對象
this.nettyServerConfig = nettyServerConfig; //指定NettyServer配置對象
this.kvConfigManager = new KVConfigManager(this); // 建立KV配置資訊對象
this.routeInfoManager = new RouteInfoManager(); // 建立路由管理器(核心屬性,用于路由資訊注冊、查找)
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
完成控制器的建立後,還需要對其進行初始化:
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
boolean initResult = controller.initialize(); // 初始化控制器
if (!initResult) { // 初始化失敗
controller.shutdown(); // 控制器關閉
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { // 給目前運作程式添加關閉時的鈎子函數
@Override
public Void call() throws Exception {
controller.shutdown(); // 程式關閉,控制器跟着關閉
return null;
}
}));
controller.start(); // 正式啟動控制器
return controller;
}
- 控制器初始化:
public boolean initialize() {
this.kvConfigManager.load(); // 加載KV配置
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 建立NettyServer網絡處理對象
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // 重點方法,定義定時任務(先延遲5s,後執行指定任務,此後每隔10s,執行指定任務)
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker(); // 由路由控制器掃描并排除不活躍的Broker
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // 定義定時任務
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically(); // 列印所有KV配置資訊
}
}, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
以上代碼便是控制器初始化的全部邏輯,其中我們需要重點關注的方法是 NamesrvController.this.routeInfoManager.scanNotActiveBroker(); 這個方法是NameServer依賴路由管理器定時掃描不活躍的Broker,并将其資訊移除。是NameServer作為RocketMQ的注冊資訊中心最為核心的功能。
首先,來了解下路由管理器的建立,及其核心屬性:
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; // 與Broker通信的長連接配接通道的過期時間
private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 讀寫鎖
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // Topic消息隊列路由資訊,消息發送時根據路由表進行負載均衡
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // Broker基礎資訊,包括broker名、所屬叢集名稱、主備Broker位址
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // Broker叢集資訊,存儲叢集中所有的Broker名稱
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // Broker狀态資訊,NameServer每次檢測完各個Broker資訊後會更新該Table
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; // Broker上的FilterServer清單,用于類模式消息過濾
public RouteInfoManager() { // 構造函數,對以上屬性進行初始化
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
通過以上代碼可以看到,路由管理器的核心屬性都是HashMap類型對象,為了對路由管理器有更深入的了解,下面來關注其中三個重點value對象:
同一個Topic中會包含多個消息隊列,一個Broker會為每一個主題建立4個讀隊列,和四個寫隊列,是以topicQueueTable的value是list清單
/**
* 消息隊列
*/
public class QueueData implements Comparable<QueueData> {
private String brokerName; // 所屬的Broker名
private int readQueueNums; // 讀隊列數量
private int writeQueueNums; // 寫隊列數量
private int perm;
private int topicSysFlag;
...
}
多個Broker組成一個叢集,叢集由相同的多台Broker組成Master-Slave架構,brokerId為0代表master,大于0代表Salve
/**
* Broker
*/
public class BrokerData implements Comparable<BrokerData> {
private String cluster; // 所屬的叢集名
private String brokerName; // Broker名
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; // 叢集中同名Broker結點的id和位址的映射表(會存在同名多結點的情況,保證高可用)
...
}
表示Broker相關狀态資訊
/**
* Broker狀态資訊
*/
class BrokerLiveInfo {
private long lastUpdateTimestamp; // 資訊最新更新時間戳
private DataVersion dataVersion; // 版本
private Channel channel; // 長連接配接通道
private String haServerAddr;
...
}
到此,有了以上對路由管理器核心屬性的了解,再來回顧NameServer是如何通過路由管理器,定時掃描并排除不活躍的Broker:
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) { // 周遊所有Broker狀态資訊
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { // Broker資訊最新更新時間戳 + 長連接配接通道過期時間 < 目前時間戳 意味着該Broker已經超過指定時間未與NameServer進行通信,是以被視為故障,需要從删除該Broker對應的資訊
RemotingUtil.closeChannel(next.getValue().getChannel()); // 關閉與該Broker通信的長連接配接
it.remove(); // 删除該Broker的狀态資訊
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); // 删除其他table有關該Broker的資訊
}
}
}
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
try {
try {
this.lock.readLock().lockInterruptibly(); // 加讀鎖
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) { // 周遊所有Broker狀态資訊
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) { // 要删除的Broker狀态資訊還存在(還沒删除)
brokerAddrFound = entry.getKey(); // 則找到該Broker的位址
break;
}
}
} finally {
this.lock.readLock().unlock(); // 釋放讀鎖
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
if (null == brokerAddrFound) { // 為空,說明對應Broker狀态資訊已經删除
brokerAddrFound = remoteAddr; // 使用傳進來的Broker位址參數
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try {
try {
this.lock.writeLock().lockInterruptibly(); // 加寫鎖,保證删除操作的線程安全
this.brokerLiveTable.remove(brokerAddrFound); // 再次确認删除指定Broker的狀态資訊
this.filterServerTable.remove(brokerAddrFound); // 删除有關指定Broker的過濾位址資訊
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { // 周遊所有Broker
BrokerData brokerData = itBrokerAddrTable.next().getValue();
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) { // 周遊每個Broker中所包含的同名結點的Broker資訊
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
if (brokerAddr.equals(brokerAddrFound)) { // 找到指定删除的Broker位址
brokerNameFound = brokerData.getBrokerName(); // 擷取該Broker名
it.remove(); // 将其從同名Broker集合中删除
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
if (brokerData.getBrokerAddrs().isEmpty()) { // 同名Broker集合為空,說明以該Broker命名的結點已經全部挂掉,則這個命名的Broker也沒有存在的必要了
removeBrokerName = true; // 辨別需要删除該Broker
itBrokerAddrTable.remove(); // 從位址資訊中删除該Broker
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName());
}
}
if (brokerNameFound != null && removeBrokerName) { // 滿足删除指定名稱的Broker條件,需要删除有關該Broker的其他資訊
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) { // 周遊所有叢集
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
boolean removed = brokerNames.remove(brokerNameFound); // 從叢集中删除指定名稱的Broker名稱資訊
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName);
if (brokerNames.isEmpty()) { // Broker名稱資訊集合為空,說明這個叢集已經不存在任何Broker結點了,則這個叢集也沒有存在的必要了
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName);
it.remove(); // 删除該叢集的資訊
}
break;
}
}
}
if (removeBrokerName) {
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) { // 周遊所有Topic主題
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey(); // 擷取topic名稱
List<QueueData> queueDataList = entry.getValue(); // 擷取Topic下的所有隊列
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) { // 周遊該Topic下的所有隊列
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) { // 隊列屬于指定删除的Broker
itQueueData.remove(); // 将該隊列從集合中删除
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData);
}
}
if (queueDataList.isEmpty()) { // 隊列集合為空,則沒有存在的必要了
itTopicQueueTable.remove(); // 删除該隊列集合
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic);
}
}
}
} finally {
this.lock.writeLock().unlock(); // 釋放寫鎖
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}
到此,NameServer的核心源碼解析結束。