部落格從RocketMQ我們學到了什麼之NameServer以郵局的功能作為類比,通俗易懂地介紹了RocketMQ中的NameServer在整個架構中的作用。而本篇文章,是以源碼閱讀筆記的形式,記錄學習RocketMQ的過程。
啟動流程
首先,NameServer的啟動類為org.apache.rocketmq.namesrv.NamesrvStartup,方法的流程很簡單:
1. 讀取配置。從啟動指令中讀取配置檔案,如果配置檔案中有修改NamesrvConfig或NettyServerConfig設定的預設值,會将配置檔案中的值覆寫NamesrvConfig或NettyServerConfig中的選項。NamesrvConfig、NettyServerConfig的配置項如下:
public class NamesrvConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;
}
public class NettyServerConfig implements Cloneable {
private int listenPort = 8888; // 服務端監聽端口
// 執行ChannelHandler的方法(例如channelRead()等方法)的線程組
private int serverWorkerThreads = 8;
// 執行請求回調方法的線程組線程數
private int serverCallbackExecutorThreads = 0;
// netty worker線程的數量(另外,Boss線程組的數量為1)
private int serverSelectorThreads = 3;
// 單向請求、異步請求的最大并發量,超過預設數值,會阻塞
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
// 監聽用戶端心跳的最大空閑時間,機關s
private int serverChannelMaxIdleTimeSeconds = 120;
// netty發送/接收資料的緩沖區大小, 預設值65535,可通過設定系統屬性修改
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
// netty是否使用PooledByteBufAllocator也就是記憶體池,預設使用
private boolean serverPooledByteBufAllocatorEnable = true;
// netty使用EpollEventLoopGroup還是NioEventLoopGroup
private boolean useEpollNativeSelector = false;
}
2. 建立NamesrvController。使用NamesrvConfig、NettyServerConfig建立執行個體。
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
3. 啟動NamesrvController。代碼如下:
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
return controller;
}
首先,初始化NamesrvController的各個元件,如果失敗,直接停止;然後,添加關閉鈎子(所謂關閉鈎子,是指程序在停止時執行的一個任務),在程序關閉時停止NameServer服務。最後,真正啟動NamesrvController。
下面主要關心initialize()以及start()方法:
public boolean initialize() {
// KVConfigManager裝載寫入檔案中的key和value
this.kvConfigManager.load();
// 啟動netty伺服器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// NameServer接收到RemotingCommand請求後,在此線程組中執行請求,并傳回結果
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注冊NameServer的請求處理器DefaultRequestProcessor
this.registerProcessor();
// 設定定時任務,掃描不活躍的Broker,5秒後開始執行,以後每10秒掃描一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 1分鐘後每隔10分鐘,列印一次KVConfigManager的屬性
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 初始化TLS配置監聽線程FileWatchService,當TLS證書或密碼檔案有修改時,通過HASH碼校驗重載配置
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
public void start() throws Exception {
// 啟動Netty伺服器,具體内容參考介紹NettyRemotingServer的文章
this.remotingServer.start();
// 啟動監聽TLS配置檔案的線程
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
到此為止,NameServer的啟動流程就結束了。相對而言,NameServer元件較少,功能比較簡單,就像官網上說的NameServer是一種無狀态的服務,它僅僅記錄所有的Broker的注冊位址,topic的分布情況等。暫時對KVConfigManager的功能未知,看代碼也是作為一種集中式的存儲key-value,像是一個簡單的配置中心。
目前為止,RouteInfoManager元件沒有介紹到,這就是NameServer用來存儲Broker的注冊位址和topic的位置的元件。
之前在NettyRemotingServer的文章中介紹到,所有的請求封裝為RemotingCommand後,交給對應的NettyRequestProcessor處理。在NameServer中,設定了一個DefaultRequestProcessor,在其processRequest()方法中有請求的處理方法,其中注冊Broker請求如下:
switch (request.getCode()) {
// 其他請求略過
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
// 其他請求略過...
default:
break;
}
看看Broker注冊的代碼:
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// 建立response對象
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
// 從RemotingCommand中解析出Broker注冊參數RegisterBrokerResponseHeader
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
try {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
} catch (Exception e) {
throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
}
} else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
}
// 注冊Broker
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
繼續關注this.namesrvController.getRouteInfoManager().registerBroker()方法:
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// 注冊叢集資訊
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
// 注冊Broker
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
// 檢查Version,如果version不一緻,說明有更新。第一次注冊也要更新queueData
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
// 擷取Broker目前的topic配置
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
// 在NameServer上,更新Broker的topic資訊
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新Broker存活資訊
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
// 更新伺服器過濾清單,其功能暫時未知,需結合broker了解其功能
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 根據master的屬性設定haServer
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
NameServer中的線程模型:
Netty的線程模型也就是多線程Reactor模型,使用擁有少量線程的、獨立的Acceptor線程池專門處理NIO的accept事件,生成channel。使用線程組處理消息的拆包、解碼、業務處理和傳回消息等。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyROBlL0MjMwUTN0EjM1IzMwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
RocketMQ中使用Netty作為網絡通訊工具,它使用的線程模型與Netty一般使用的線程模型略有不同,模型如圖所示:
上圖畫出了用戶端發起連接配接請求,到發送Request請求,再到傳回response結果以及執行回調的過程。其步驟如下:
- 用戶端連接配接伺服器。當用戶端連接配接到伺服器時,Acceptor中select處理一個Nio的ACCEPT事件,生成一個channel。這一步的線程組專門處理網絡連接配接。
- Netty将該channel依次綁定到Selector線程組的EventLoop上(比如假設Selector線程組線程數量為8,那麼連接配接到的第8 * n + 1 ~ 8 * n + 8個channel分别綁定到線程1 ~ 線程8上)。當用戶端發送一個請求時,服務端的Selector在執行select時會接收到READ事件,Selector讀取準别好的位元組封裝到ByteBuf中,然後發送到ChannelPipeline中。這一步的線程組專門處理網絡IO。
- 在RocketMQ的NettyRemotingServer啟動伺服器過程中,添加ChannelHandler時指定了使用defaultEventExecutorGroup作為消息在ChannelPipeline中傳遞的線程組,也就是調用每個ChannelHandler的channelRead()方法都在線上程組中執行。那麼,RocketMQ的消息的拆包、解碼是在該線程組中進行處理的。
- 在第3步中,NettyDecoder将傳入的ByteBuf解碼為RemotingCommand,這是遠端請求統一使用的資料結構。然後在NettyServerHandler處理channelRead()時,将對遠端請求的處理封裝成一個RequestTask,這是一個實作Runnable接口的請求處理任務,最後投入remotingExecutor中執行。
- 在RequestTask請求任務執行完成後,會獲得一個執行相應結果,通過ctx.writeAndFlush(response)方法,将傳回結果重新整理會用戶端。4和5兩步主要在remotingExecutor中處理遠端請求。
- 假如用戶端發起的RemotingCommand請求是設定了回調的異步請求,用戶端接收到伺服器端傳回的消息後,根據請求id找到相應的回調方法,并在publicExecutor中執行回調。這一步用戶端在publicExecutor中執行消息響應的回調。
綜上,可以看出RocketMQ線程模型一個很重要的想法:針對不同的用途設定專用的線程組,這樣可以根據業務需求精确的調整用于每個部分的線程組的數量。
總結:
首先還是來看RocketMQ的架構:
1.每個Broker與NameServer都有一個連接配接,而NameServer與NameServer之間沒有連接配接,從代碼上來說也沒有資料的同步,說明NameServer和Zookeeper不一樣,不保證資料的一緻性,是AP的。
2.從Broker的注冊代碼來看,一個BrokerName可以包含一個Matser Broker和多個Slave Broker,其中Master的id為0,slave的id為1,2,3...
3.Broker心跳逾時時,會關閉Channel。從NameServer的定時任務來看,RocketMQ每隔10秒鐘掃描一次Broker檢測最後一次發消息的時間,如果超過兩分鐘,則會關閉channel。說明RocketMQ的預設失效時間為2分鐘。
4.當Broker失效後,NameServer隻能在2分鐘後将其從Broker存活清單中删除,在這段時間中,發送消息時通過失敗選擇下一個Broker重試的方法,規避NameServer保證資料一緻性的複雜設計。
5.RocketMQ針對不同的用途設定專用的線程組,可以根據業務需求精确的調整用于每個部分的線程組的數量。
疑惑點:
1.KVConfigManager是一個包含命名空間的KV配置存儲管理元件,他在RocketMQ中起到什麼樣的作用?
2.暫時不了解注冊Broker時,傳入的參數filterServerList的含義。
3.在注冊Broker的代碼的後面,有這樣一行代碼:
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
其中brokerLiveInfo是master,說明如果broker不是Master,那麼這裡傳回的result中設定的負載均衡的server位址是Master傳過來的位址,但是往brokerLiveTable中添加的又是請求傳過來的haServer,就是說兩者有可能不一樣,不知道這裡有什麼差別。另外,當同屬于brokerName為broker-a的一個slave broker先于master啟動,那麼這裡result的haServer又是空的,不知道這種情況是否有影響,或者RocketMQ是否有master必定先于slave啟動的要求?