文章目錄
-
- NameServer 的功能
-
- 叢集狀态的存儲結構
- 狀态維護邏輯
- 各個角色間的互動流程(Topic 的建立為例)
-
- 互動流程源碼分析
- 為何不用ZooKeeper
- 底層通信機制
-
- Remoting 子產品
- 協定設計和編解碼
- Netty 庫
- 參考
NameServer 的功能
- NameServer 是整個消息隊列中的狀态伺服器,叢集的各個元件通過它來了解全局的資訊。各個角色的機器都要定期向NameServer 上報自己的狀态,逾時不上報的話, NameServer 會認為某個機器出故障不可用了,其他的元件會把這個機器從可用清單裡移除
- NamServer 可以部署多個,互相之間獨立,其他角色同時向多個NameServer機器上報狀态資訊,進而達到熱備份的目的。
- NameServer 本身是無狀态的, NameServer 中的Broker 、Topic 等狀态資訊不會持久存儲,都是由各個角色定時上報并存儲到記憶體中的( NameServer 支援配置參數的持久化, 一般用不到)。
叢集狀态的存儲結構
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager
類負責儲存叢集的狀态資訊
屬性 | 描述 |
---|---|
HashMap<String, List> topicQueueTable | topicQueueTable 這個結構的Key 是Topic 的名稱,它存儲了所有Topic的屬性資訊。Value 是個QueueData 隊列, 隊裡的長度等于這個Topic資料存儲的Master Broker 的個數, QueueData 裡存儲着Broker 的名稱、讀寫queue 的數量、同步辨別等 |
HashMap<String, BrokerData> brokerAddrTable; | BrokerAddrTable以BrokerName 為索引,相同名稱的Broker 可能存在多台機器, 一個Master 和多個Slave 。這個結構存儲着一個BrokerName 對應的屬性資訊,包括所屬的Cluster 名稱, 一個Master Broker 和多個Slave Broker的位址資訊 |
HashMap<String>> clusterAddrTable; | ClusterAddrTable存儲的是叢集中C luster 的資訊,結果很簡單,就是一個Cluster 名稱對應一個由BrokerName 組成的集合 |
HashMap<String, BrokerLiveInfo> brokerLiveTable; | 這個結構和BrokerAddrTable 有關系,但是内容完全不同,這個結構的Key 是BrokerAddr ,也就是對應着一台機器, BrokerAddrTable 中的Key是BrokerName , 多個機器的BrokerName 可以相同。BrokerLiveTable存儲的内容是這台Broker 機器的實時狀态,包括上次更新狀态的時間戳, NameServer 會定期檢查這個時間戳,逾時沒有更新就認為這個Broker 無效了,将其從Broker 清單裡清除 |
HashMap<String> filterServerTable; | Filter Server 是過濾伺服器,是RocketMQ 的一種服務端過濾方式,一個Broker 可以有一個或多個F ilter Server 。這個結構的Key 是Broker的位址, Value 是和這個Broker 關聯的多個Filter Server 的位址 |
狀态維護邏輯
- 其他角色會主動向NameServer上報狀态,是以NameServer的主要邏輯在
類中,根據上報消息裡的請求碼做相應的處理,更新存儲的對應資訊。org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor
- 連接配接斷開的事件也會觸發狀态更新,具體邏輯在
類中org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService
各個角色間的互動流程(Topic 的建立為例)
互動流程源碼分析
- 建立Topic 的代碼在
類中,建立Topic 的指令是org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand
,而在updateTopic
類中的工作主要是UpdateTopicSubCommand
的元件工作。TopicConfig
- 在
updateTopic
的指令中,其中b 和c 參數比較重要, 而且他們倆隻有一個會起作用( -b 優先), b 參
數指定在哪個Broker 上建立本Topic 的Message Queue , c 參數表示在這個Cluster 下面所有的Master Broker 上建立這個Topic 的Message Queue , 進而達.到高可用性的目的。具體的建立動作是通過發送指令觸發的
- 真正的指令在
類裡面發送的,org.apache.rocketmq.client.impl.MQClientAPIImpl
會作為參數傳入相應的方法中:TopicConfig
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
# 根據 TopicConfig 組裝指令
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
# 建立指令
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
# 發送指令
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
- 建立Topic 的指令被發往對應的Broker , Broker 接到建立Topic 的請求後,執行具體的建立邏輯在
類中。如org.apache.rocketmq.broker.processor.AdminBrokerProcessor
方法updateAndCreateTopic
- 接收傳輸的指令,更新本地的 topicConfig
- 方法最後一步是 向NameServer 發送注冊資訊,NameServer 完成建立Topic 的邏輯後,其他用戶端才能發現新增的Topic
private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
try {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
ctx.writeAndFlush(response);
} catch (Exception e) {
log.error("Failed to produce a proper response", e);
}
TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
# 更新本地的 topicConfig
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
# 向NameServer 發送注冊資訊
this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
return null;
}
- NameServer 建立的操作在
的org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager
方法中:registerBroker()
- 更新 Broker 資訊
- 對每個Master 角色的Broker,建立一個QueueData 對象
- 如果是建立Topic,就是添加QueueData 對象;如果是修改Topic,就是把舊的QueueData 删除,加入新的QueueData 。
- 注意:寫鎖同步的實作,避免在代碼中頻繁的topic的建立銷毀操作
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
# 更新 Broker 資訊,配置的 brokerClusterName,擷取 brokerName
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
# BrokerData 内部維護 cluster、brokerName、 brokerAddrs
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
# 對每個Master 角色的Broker,建立一個QueueData 對象
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
# 更新資訊并且傳回,主要是維護 brokerLiveTable 和 filterServerTable ,用于檢查Broker的狀态與過濾器
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
為何不用ZooKeeper
RocketMQ 的架構設計決定了它不需要進行Master選舉,用不到這些複雜的功能,隻需要一個輕量級的中繼資料伺服器就足夠了
底層通信機制
Remoting 子產品
- 類圖如下:
- 最上層接
RemotingService
- void start();
- void shutdown();
- void registerRPCHook(RPCHook rpcHook);
-
和RemotingClient
接口繼承RemotingServer
接口,并增加了自己特有的方法。RemotingService
和NettyRemotingClient
分别實作了NettyRemotingServer
和RemotingCIient
,而且都繼承了RemotingServer
NettyRemotingAbstract
- 統一格式的自定義消息類:
,并且處理編解碼的RemotingCommand
- 比如NameServer 子產品中
的NamesrvController
變量以及注冊處理器處理器remotingServer
。DefaultRequestProcessor
和NamesrvController
部分代碼如下:DefaultRequestProcessor
# NamesrvController 部分代碼
public class NamesrvController {
private RemotingServer remotingServer;
# 初始化 remotingServer
public boolean initialize() {
... ...
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.registerProcessor();
... ...
}
# 注冊處理器
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
}
# DefaultRequestProcessor 類部分代碼,核心在于 processRequest()方法的邏輯處理
public class DefaultRequestProcessor implements NettyRequestProcessor {
# 拿到請求的 RemotingCommand ,然後處理
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
# 業務處理
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
省略 ... ...
default:
break;
}
return null;
}
}
協定設計和編解碼
- 通信協定設計
- 第一部分是大端4 個位元組整數,值等于第二、三、四部分長度的總和;
- 第二部分是大端4 個位元組整數,值等于第三部分的長度;
- 第三部分是通過 Json 序列化的資料;
- 第四部分是通過應用自定義二進制序列化的資料。
- 資訊的編碼過程在
的RemotingCommand
方法中完成encode()
public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
- 消息的編解碼過程在
的RemotingCommand
方法中完成decode()
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
Netty 庫
RocketMQ 是基于Netty 庫來完成
RemotingServer
和
RemotingClient
具體的通信實作的。主要在于
NettyRemotingServer
和
NettyRemotingClient
兩個類
參考
- Apache RocketMQ 官網
- Best Practice For NameServer