天天看點

RocketMQ4.3.X筆記(6):分布式消息隊列的協調者 NameServer

文章目錄

    • NameServer 的功能
      • 叢集狀态的存儲結構
      • 狀态維護邏輯
    • 各個角色間的互動流程(Topic 的建立為例)
      • 互動流程源碼分析
      • 為何不用ZooKeeper
    • 底層通信機制
      • Remoting 子產品
      • 協定設計和編解碼
      • Netty 庫
    • 參考

NameServer 的功能

  1. NameServer 是整個消息隊列中的狀态伺服器,叢集的各個元件通過它來了解全局的資訊。各個角色的機器都要定期向NameServer 上報自己的狀态,逾時不上報的話, NameServer 會認為某個機器出故障不可用了,其他的元件會把這個機器從可用清單裡移除
  2. NamServer 可以部署多個,互相之間獨立,其他角色同時向多個NameServer機器上報狀态資訊,進而達到熱備份的目的。
  3. NameServer 本身是無狀态的, NameServer 中的Broker 、Topic 等狀态資訊不會持久存儲,都是由各個角色定時上報并存儲到記憶體中的( NameServer 支援配置參數的持久化, 一般用不到)。

叢集狀态的存儲結構

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager

類負責儲存叢集的狀态資訊

屬性 描述
HashMap<String, List> topicQueueTable topicQueueTable 這個結構的Key 是Topic 的名稱,它存儲了所有Topic的屬性資訊。Value 是個QueueData 隊列, 隊裡的長度等于這個Topic資料存儲的Master Broker 的個數, QueueData 裡存儲着Broker 的名稱、讀寫queue 的數量、同步辨別等
HashMap<String, BrokerData> brokerAddrTable; BrokerAddrTable以BrokerName 為索引,相同名稱的Broker 可能存在多台機器, 一個Master 和多個Slave 。這個結構存儲着一個BrokerName 對應的屬性資訊,包括所屬的Cluster 名稱, 一個Master Broker 和多個Slave Broker的位址資訊
HashMap<String>> clusterAddrTable; ClusterAddrTable存儲的是叢集中C luster 的資訊,結果很簡單,就是一個Cluster 名稱對應一個由BrokerName 組成的集合
HashMap<String, BrokerLiveInfo> brokerLiveTable; 這個結構和BrokerAddrTable 有關系,但是内容完全不同,這個結構的Key 是BrokerAddr ,也就是對應着一台機器, BrokerAddrTable 中的Key是BrokerName , 多個機器的BrokerName 可以相同。BrokerLiveTable存儲的内容是這台Broker 機器的實時狀态,包括上次更新狀态的時間戳, NameServer 會定期檢查這個時間戳,逾時沒有更新就認為這個Broker 無效了,将其從Broker 清單裡清除
HashMap<String> filterServerTable; Filter Server 是過濾伺服器,是RocketMQ 的一種服務端過濾方式,一個Broker 可以有一個或多個F ilter Server 。這個結構的Key 是Broker的位址, Value 是和這個Broker 關聯的多個Filter Server 的位址

狀态維護邏輯

  1. 其他角色會主動向NameServer上報狀态,是以NameServer的主要邏輯在

    org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor

    類中,根據上報消息裡的請求碼做相應的處理,更新存儲的對應資訊。
  2. 連接配接斷開的事件也會觸發狀态更新,具體邏輯在

    org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService

    類中

各個角色間的互動流程(Topic 的建立為例)

互動流程源碼分析

  1. 建立Topic 的代碼在

    org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand

    類中,建立Topic 的指令是

    updateTopic

    ,而在

    UpdateTopicSubCommand

    類中的工作主要是

    TopicConfig

    的元件工作。
  2. updateTopic

    的指令中,其中b 和c 參數比較重要, 而且他們倆隻有一個會起作用( -b 優先), b 參

    數指定在哪個Broker 上建立本Topic 的Message Queue , c 參數表示在這個Cluster 下面所有的Master Broker 上建立這個Topic 的Message Queue , 進而達.到高可用性的目的。具體的建立動作是通過發送指令觸發的

  3. 真正的指令在

    org.apache.rocketmq.client.impl.MQClientAPIImpl

    類裡面發送的,

    TopicConfig

    會作為參數傳入相應的方法中:
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        # 根據 TopicConfig 組裝指令
        CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
        requestHeader.setTopic(topicConfig.getTopicName());
        requestHeader.setDefaultTopic(defaultTopic);
        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
        requestHeader.setPerm(topicConfig.getPerm());
        requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
        requestHeader.setOrder(topicConfig.isOrder());

        # 建立指令
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
        # 發送指令
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }
           
  1. 建立Topic 的指令被發往對應的Broker , Broker 接到建立Topic 的請求後,執行具體的建立邏輯在

    org.apache.rocketmq.broker.processor.AdminBrokerProcessor

    類中。如

    updateAndCreateTopic

    方法
  • 接收傳輸的指令,更新本地的 topicConfig
  • 方法最後一步是 向NameServer 發送注冊資訊,NameServer 完成建立Topic 的邏輯後,其他用戶端才能發現新增的Topic
private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final CreateTopicRequestHeader requestHeader =
            (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
        log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

        if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
            String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
            log.warn(errorMsg);
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(errorMsg);
            return response;
        }

        try {
            response.setCode(ResponseCode.SUCCESS);
            response.setOpaque(request.getOpaque());
            response.markResponseType();
            response.setRemark(null);
            ctx.writeAndFlush(response);
        } catch (Exception e) {
            log.error("Failed to produce a proper response", e);
        }

        TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
        topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
        topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
        topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
        topicConfig.setPerm(requestHeader.getPerm());
        topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());

        # 更新本地的 topicConfig
        this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
        
        # 向NameServer 發送注冊資訊
        this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());

        return null;
    }
           
  1. NameServer 建立的操作在

    org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager

    registerBroker()

    方法中:
  • 更新 Broker 資訊
  • 對每個Master 角色的Broker,建立一個QueueData 對象
  • 如果是建立Topic,就是添加QueueData 對象;如果是修改Topic,就是把舊的QueueData 删除,加入新的QueueData 。
  • 注意:寫鎖同步的實作,避免在代碼中頻繁的topic的建立銷毀操作
public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                this.lock.writeLock().lockInterruptibly();

                # 更新 Broker 資訊,配置的 brokerClusterName,擷取 brokerName
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);

                boolean registerFirst = false;

                #  BrokerData 内部維護 cluster、brokerName、 brokerAddrs
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);

                # 對每個Master 角色的Broker,建立一個QueueData 對象
                if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
                
                # 更新資訊并且傳回,主要是維護 brokerLiveTable 和 filterServerTable ,用于檢查Broker的狀态與過濾器
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }
           

為何不用ZooKeeper

RocketMQ 的架構設計決定了它不需要進行Master選舉,用不到這些複雜的功能,隻需要一個輕量級的中繼資料伺服器就足夠了

底層通信機制

Remoting 子產品

  1. 類圖如下:
RocketMQ4.3.X筆記(6):分布式消息隊列的協調者 NameServer
  1. 最上層接

    RemotingService

  • void start();
  • void shutdown();
  • void registerRPCHook(RPCHook rpcHook);
  1. RemotingClient

    RemotingServer

    接口繼承

    RemotingService

    接口,并增加了自己特有的方法。

    NettyRemotingClient

    NettyRemotingServer

    分别實作了

    RemotingCIient

    RemotingServer

    ,而且都繼承了

    NettyRemotingAbstract

  2. 統一格式的自定義消息類:

    RemotingCommand

    ,并且處理編解碼的
  3. 比如NameServer 子產品中

    NamesrvController

    remotingServer

    變量以及注冊處理器處理器

    DefaultRequestProcessor

    NamesrvController

    DefaultRequestProcessor

    部分代碼如下:
# NamesrvController 部分代碼
public class NamesrvController {
    private RemotingServer remotingServer;
    
    # 初始化 remotingServer
    public boolean initialize() {
        ... ...
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        
        this.registerProcessor();
        ... ...
    }
    
    # 注冊處理器
    private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor);
        } else {
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }

}    

           
# DefaultRequestProcessor 類部分代碼,核心在于 processRequest()方法的邏輯處理
public class DefaultRequestProcessor implements NettyRequestProcessor {
    # 拿到請求的 RemotingCommand ,然後處理
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        # 業務處理
        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            省略 ... ...
            default:
                break;
        }
        return null;
    }
    
}
           

協定設計和編解碼

  1. 通信協定設計
    RocketMQ4.3.X筆記(6):分布式消息隊列的協調者 NameServer
  • 第一部分是大端4 個位元組整數,值等于第二、三、四部分長度的總和;
  • 第二部分是大端4 個位元組整數,值等于第三部分的長度;
  • 第三部分是通過 Json 序列化的資料;
  • 第四部分是通過應用自定義二進制序列化的資料。
  1. 資訊的編碼過程在

    RemotingCommand

    encode()

    方法中完成
public ByteBuffer encode() {
        // 1> header length size
        int length = 4;

        // 2> header data length
        byte[] headerData = this.headerEncode();
        length += headerData.length;

        // 3> body data length
        if (this.body != null) {
            length += body.length;
        }

        ByteBuffer result = ByteBuffer.allocate(4 + length);

        // length
        result.putInt(length);

        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        // body data;
        if (this.body != null) {
            result.put(this.body);
        }

        result.flip();

        return result;
    }
           
  1. 消息的編解碼過程在

    RemotingCommand

    decode()

    方法中完成
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }
           

Netty 庫

RocketMQ 是基于Netty 庫來完成

RemotingServer

RemotingClient

具體的通信實作的。主要在于

NettyRemotingServer

NettyRemotingClient

兩個類

參考

  1. Apache RocketMQ 官網
  2. Best Practice For NameServer

繼續閱讀