天天看點

Nacos源碼分析十八、服務端的心跳處理

上篇在服務注冊時我們注意到,每個Service執行個體化時會建立一個心跳檢測任務ClientBeatCheckTask:

@Override
public void run() {
    try {
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }
        
        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }

        // 目前service的所有執行個體
        List<Instance> instances = service.allIPs(true);
        
        // first set health status of instances:
        for (Instance instance : instances) {
            // 超過健康檢查時間
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                        instance.getIp(), instance.getPort(), instance.getClusterName(),
                                        service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                        instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }
        
        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }
        
        // then remove obsolete instances:
        for (Instance instance : instances) {
            
            if (instance.isMarked()) {
                continue;
            }

            // 超過删除檢查時間
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                        JacksonUtils.toJson(instance));
                deleteIp(instance);
            }
        }
        
    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }
    
}
           

當發現lastBeat時間過長後就會有相應處理。

那麼當用戶端發送來心跳消息時,我們隻要能看到正常更新lastBeat和狀态即可。

首先是心跳接收的入口方法,InstanceController的beat方法:

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
    
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //設定要求的心跳間隔 預設5秒
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    
    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    //如果有心跳内容,也就不是輕量級心跳,轉換為RsInfo
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    String clusterName = WebUtils
            .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            //擷取叢集名
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    //擷取相關服務執行個體
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);

    //執行個體不存在
    if (instance == null) {
        //如果心跳内容也沒有就傳回找不到
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }
        
        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);

        //否則根據心跳内容建立一個執行個體
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());

        //注冊執行個體
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    
    Service service = serviceManager.getService(namespaceId, serviceName);

    // 服務未找到
    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
    }
    //不存在的話,要建立一個進行處理
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
    //開啟一次性心跳檢查任務
    service.processClientBeat(clientBeat);
    //成功傳回,
    result.put(CommonParams.CODE, NamingResponseCode.OK);
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        //5秒間隔
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    //告訴用戶端不需要帶上心跳資訊了,變成輕量級心跳了
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}
           

首先如果有beat資訊的話,說明是第一次,這會帶上服務執行個體資訊,進行執行個體注冊。

如果沒有的話,則建立一個用戶端心跳處理器,進行心跳資料處理。 service.processClientBeat(clientBeat):

public void processClientBeat(final RsInfo rsInfo) {
    //建立一個臨時心跳處理器,然後排程處理一次。
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
           

我們看一下這個任務的run方法:

@Override
public void run() {
    Service service = this.service;
    if (Loggers.EVT_LOG.isDebugEnabled()) {
        Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
    }

    // IP
    String ip = rsInfo.getIp();
    // 叢集名字
    String clusterName = rsInfo.getCluster();
    // 端口
    int port = rsInfo.getPort();
    // 擷取叢集
    Cluster cluster = service.getClusterMap().get(clusterName);
    // 擷取叢集所有的臨時服務執行個體
    List<Instance> instances = cluster.allIPs(true);

    // 周遊更新對應的狀态
    for (Instance instance : instances) {
        // 如果IP和端口一緻則更新
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
            }
            //重新整理心跳時間
            instance.setLastBeat(System.currentTimeMillis());
            //沒被标記的
            if (!instance.isMarked()) {
                //不健康的
                if (!instance.isHealthy()) {
                    //設定為健康
                    instance.setHealthy(true);
                    Loggers.EVT_LOG
                            .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                    cluster.getService().getName(), ip, port, cluster.getName(),
                                    UtilsAndCommons.LOCALHOST_SITE);
                    //發送服務改變事件,PushService實作了 ApplicationListener<ServiceChangeEvent>,事件監聽時發生UDP消息通知
                    getPushService().serviceChanged(service);
                }
            }
        }
    }
}
           

周遊所有的執行個體,重新整理lastBeat的時間。如果沒有被标記并且健康狀态不對,則設定健康狀态為true,同時發送ServiceChangeEvent事件。

關于PushService實作這個事件的監聽,之前已經分析過了,就是UDP消息通知用戶端。這裡就不再分析了。