天天看點

rocketmq學習筆記 ---- NameServer總結:

部落格從RocketMQ我們學到了什麼之NameServer以郵局的功能作為類比,通俗易懂地介紹了RocketMQ中的NameServer在整個架構中的作用。而本篇文章,是以源碼閱讀筆記的形式,記錄學習RocketMQ的過程。

啟動流程

首先,NameServer的啟動類為org.apache.rocketmq.namesrv.NamesrvStartup,方法的流程很簡單:

1. 讀取配置。從啟動指令中讀取配置檔案,如果配置檔案中有修改NamesrvConfig或NettyServerConfig設定的預設值,會将配置檔案中的值覆寫NamesrvConfig或NettyServerConfig中的選項。NamesrvConfig、NettyServerConfig的配置項如下:

public class NamesrvConfig {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
    private boolean orderMessageEnable = false;
}
           
public class NettyServerConfig implements Cloneable {
    private int listenPort = 8888;  // 服務端監聽端口
    // 執行ChannelHandler的方法(例如channelRead()等方法)的線程組
    private int serverWorkerThreads = 8;   
    // 執行請求回調方法的線程組線程數
    private int serverCallbackExecutorThreads = 0;
    // netty worker線程的數量(另外,Boss線程組的數量為1)
    private int serverSelectorThreads = 3; 

    // 單向請求、異步請求的最大并發量,超過預設數值,會阻塞
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;

    // 監聽用戶端心跳的最大空閑時間,機關s
    private int serverChannelMaxIdleTimeSeconds = 120;  
    // netty發送/接收資料的緩沖區大小, 預設值65535,可通過設定系統屬性修改
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    // netty是否使用PooledByteBufAllocator也就是記憶體池,預設使用
    private boolean serverPooledByteBufAllocatorEnable = true;
    // netty使用EpollEventLoopGroup還是NioEventLoopGroup
    private boolean useEpollNativeSelector = false;
}
           

2. 建立NamesrvController。使用NamesrvConfig、NettyServerConfig建立執行個體。

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
           

3. 啟動NamesrvController。代碼如下:

public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        controller.start();

        return controller;
    }
           

首先,初始化NamesrvController的各個元件,如果失敗,直接停止;然後,添加關閉鈎子(所謂關閉鈎子,是指程序在停止時執行的一個任務),在程序關閉時停止NameServer服務。最後,真正啟動NamesrvController。

下面主要關心initialize()以及start()方法:

public boolean initialize() {
        // KVConfigManager裝載寫入檔案中的key和value
        this.kvConfigManager.load();
        // 啟動netty伺服器
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        // NameServer接收到RemotingCommand請求後,在此線程組中執行請求,并傳回結果
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        // 注冊NameServer的請求處理器DefaultRequestProcessor
        this.registerProcessor();
        // 設定定時任務,掃描不活躍的Broker,5秒後開始執行,以後每10秒掃描一次
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        // 1分鐘後每隔10分鐘,列印一次KVConfigManager的屬性
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        // 初始化TLS配置監聽線程FileWatchService,當TLS證書或密碼檔案有修改時,通過HASH碼校驗重載配置
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }
           
public void start() throws Exception {
        // 啟動Netty伺服器,具體内容參考介紹NettyRemotingServer的文章
        this.remotingServer.start();
        // 啟動監聽TLS配置檔案的線程
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }
           

到此為止,NameServer的啟動流程就結束了。相對而言,NameServer元件較少,功能比較簡單,就像官網上說的NameServer是一種無狀态的服務,它僅僅記錄所有的Broker的注冊位址,topic的分布情況等。暫時對KVConfigManager的功能未知,看代碼也是作為一種集中式的存儲key-value,像是一個簡單的配置中心。

目前為止,RouteInfoManager元件沒有介紹到,這就是NameServer用來存儲Broker的注冊位址和topic的位置的元件。

之前在NettyRemotingServer的文章中介紹到,所有的請求封裝為RemotingCommand後,交給對應的NettyRequestProcessor處理。在NameServer中,設定了一個DefaultRequestProcessor,在其processRequest()方法中有請求的處理方法,其中注冊Broker請求如下:

switch (request.getCode()) {
            // 其他請求略過
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
            case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
            // 其他請求略過...
            default:
                break;
        }
           

看看Broker注冊的代碼:

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        // 建立response對象
        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
        // 從RemotingCommand中解析出Broker注冊參數RegisterBrokerResponseHeader 
        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
        final RegisterBrokerRequestHeader requestHeader =
            (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

        if (!checksum(ctx, request, requestHeader)) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("crc32 not match");
            return response;
        }

        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

        if (request.getBody() != null) {
            try {
                registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
            } catch (Exception e) {
                throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
            }
        } else {
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
        }

        // 注冊Broker
        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            registerBrokerBody.getTopicConfigSerializeWrapper(),
            registerBrokerBody.getFilterServerList(),
            ctx.channel());

        responseHeader.setHaServerAddr(result.getHaServerAddr());
        responseHeader.setMasterAddr(result.getMasterAddr());

        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);

        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
           

繼續關注this.namesrvController.getRouteInfoManager().registerBroker()方法:

public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                // 注冊叢集資訊
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);

                boolean registerFirst = false;
                // 注冊Broker
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);

                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    // 檢查Version,如果version不一緻,說明有更新。第一次注冊也要更新queueData
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        // 擷取Broker目前的topic配置
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                // 在NameServer上,更新Broker的topic資訊
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
                // 更新Broker存活資訊
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }
                // 更新伺服器過濾清單,其功能暫時未知,需結合broker了解其功能
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }
                // 根據master的屬性設定haServer
                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }
           

NameServer中的線程模型:

Netty的線程模型也就是多線程Reactor模型,使用擁有少量線程的、獨立的Acceptor線程池專門處理NIO的accept事件,生成channel。使用線程組處理消息的拆包、解碼、業務處理和傳回消息等。

rocketmq學習筆記 ---- NameServer總結:

RocketMQ中使用Netty作為網絡通訊工具,它使用的線程模型與Netty一般使用的線程模型略有不同,模型如圖所示:

rocketmq學習筆記 ---- NameServer總結:

上圖畫出了用戶端發起連接配接請求,到發送Request請求,再到傳回response結果以及執行回調的過程。其步驟如下:

  1. 用戶端連接配接伺服器。當用戶端連接配接到伺服器時,Acceptor中select處理一個Nio的ACCEPT事件,生成一個channel。這一步的線程組專門處理網絡連接配接。
  2. Netty将該channel依次綁定到Selector線程組的EventLoop上(比如假設Selector線程組線程數量為8,那麼連接配接到的第8 * n + 1 ~  8 * n + 8個channel分别綁定到線程1 ~ 線程8上)。當用戶端發送一個請求時,服務端的Selector在執行select時會接收到READ事件,Selector讀取準别好的位元組封裝到ByteBuf中,然後發送到ChannelPipeline中。這一步的線程組專門處理網絡IO。
  3. 在RocketMQ的NettyRemotingServer啟動伺服器過程中,添加ChannelHandler時指定了使用defaultEventExecutorGroup作為消息在ChannelPipeline中傳遞的線程組,也就是調用每個ChannelHandler的channelRead()方法都在線上程組中執行。那麼,RocketMQ的消息的拆包、解碼是在該線程組中進行處理的。
  4. 在第3步中,NettyDecoder将傳入的ByteBuf解碼為RemotingCommand,這是遠端請求統一使用的資料結構。然後在NettyServerHandler處理channelRead()時,将對遠端請求的處理封裝成一個RequestTask,這是一個實作Runnable接口的請求處理任務,最後投入remotingExecutor中執行。
  5. 在RequestTask請求任務執行完成後,會獲得一個執行相應結果,通過ctx.writeAndFlush(response)方法,将傳回結果重新整理會用戶端。4和5兩步主要在remotingExecutor中處理遠端請求。
  6. 假如用戶端發起的RemotingCommand請求是設定了回調的異步請求,用戶端接收到伺服器端傳回的消息後,根據請求id找到相應的回調方法,并在publicExecutor中執行回調。這一步用戶端在publicExecutor中執行消息響應的回調。

綜上,可以看出RocketMQ線程模型一個很重要的想法:針對不同的用途設定專用的線程組,這樣可以根據業務需求精确的調整用于每個部分的線程組的數量。

總結:

首先還是來看RocketMQ的架構:

rocketmq學習筆記 ---- NameServer總結:

1.每個Broker與NameServer都有一個連接配接,而NameServer與NameServer之間沒有連接配接,從代碼上來說也沒有資料的同步,說明NameServer和Zookeeper不一樣,不保證資料的一緻性,是AP的。

2.從Broker的注冊代碼來看,一個BrokerName可以包含一個Matser Broker和多個Slave Broker,其中Master的id為0,slave的id為1,2,3...

3.Broker心跳逾時時,會關閉Channel。從NameServer的定時任務來看,RocketMQ每隔10秒鐘掃描一次Broker檢測最後一次發消息的時間,如果超過兩分鐘,則會關閉channel。說明RocketMQ的預設失效時間為2分鐘。

4.當Broker失效後,NameServer隻能在2分鐘後将其從Broker存活清單中删除,在這段時間中,發送消息時通過失敗選擇下一個Broker重試的方法,規避NameServer保證資料一緻性的複雜設計。

5.RocketMQ針對不同的用途設定專用的線程組,可以根據業務需求精确的調整用于每個部分的線程組的數量。

疑惑點:

1.KVConfigManager是一個包含命名空間的KV配置存儲管理元件,他在RocketMQ中起到什麼樣的作用?

2.暫時不了解注冊Broker時,傳入的參數filterServerList的含義。

3.在注冊Broker的代碼的後面,有這樣一行代碼:

result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
           

其中brokerLiveInfo是master,說明如果broker不是Master,那麼這裡傳回的result中設定的負載均衡的server位址是Master傳過來的位址,但是往brokerLiveTable中添加的又是請求傳過來的haServer,就是說兩者有可能不一樣,不知道這裡有什麼差別。另外,當同屬于brokerName為broker-a的一個slave broker先于master啟動,那麼這裡result的haServer又是空的,不知道這種情況是否有影響,或者RocketMQ是否有master必定先于slave啟動的要求?