天天看點

Nacos源碼解析 -- 服務注冊源碼以及服務發現最全分析Nacos整合Spring Cloud的服務注冊Nacos 整合Spring Cloud 的服務發現

public void run() {

try {

ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

if (serviceObj == null) {
        updateServiceNow(serviceName, clusters);
        executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
        return;
    }

    if (serviceObj.getLastRefTime() <= lastRefTime) {
        updateServiceNow(serviceName, clusters);
        serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
    } else {
        // if serviceName already updated by push, we should not override it
        // since the push data may be different from pull through force push
        refreshOnly(serviceName, clusters);
    }

    lastRefTime = serviceObj.getLastRefTime();

    if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
        !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
        // abort the update task:
        NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
        return;
    }

    executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);


    NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
           

}# 前言

作為Spring Cloud Alibaba的重要元件之一,Nacos的到底是如何工作的呢?接下來走進Nacos的神秘世界

Nacos整合Spring Cloud的服務注冊

Nacos作為Spring Cloud Alibaba 的核心元件之一,必然也會遵循cloud的一些規範,在Spring Cloud中存在一個ServiceRegistry的接口,顧名思義,該接口就是用于服務注冊的,是以在Nacos在Spring Cloud的環境下,也存在一個實作了該接口的類

NacosServiceRegistry

,該方法實作了ServiceRegistry的所有方法。如下:

public interface ServiceRegistry<R extends Registration> {

   void register(R registration);

   void deregister(R registration);

   void close();

   void setStatus(R registration, String status);

   <T> T getStatus(R registration);
           

目前主題之讨論服務的注冊方法

register

在讨論該方法之前,我們還是先簡單說明一下

NacosServiceRegistry

是怎麼裝配到spring 的環境中的

這裡采用的是spring boot的自動裝配的原理。我們可以在

spring-cloud-starter-alibaba-nacos-discovery-2.2.1.RELEASE.jar

中的META-INF/spring.factories檔案中存在一個

NacosServiceRegistryAutoConfiguration

的類,該類就是Nacos整合Spring Cloud的自動裝配的類,進入到該類

public class NacosServiceRegistryAutoConfiguration {

   @Bean
   public NacosServiceRegistry nacosServiceRegistry(
         NacosDiscoveryProperties nacosDiscoveryProperties) {
      return new NacosServiceRegistry(nacosDiscoveryProperties);
   }

   @Bean
   @ConditionalOnBean(AutoServiceRegistrationProperties.class)
   public NacosRegistration nacosRegistration(
         NacosDiscoveryProperties nacosDiscoveryProperties,
         ApplicationContext context) {
      return new NacosRegistration(nacosDiscoveryProperties, context);
   }

   @Bean
   @ConditionalOnBean(AutoServiceRegistrationProperties.class)
   public NacosAutoServiceRegistration nacosAutoServiceRegistration(
         NacosServiceRegistry registry,
         AutoServiceRegistrationProperties autoServiceRegistrationProperties,
         NacosRegistration registration) {
      return new NacosAutoServiceRegistration(registry,
            autoServiceRegistrationProperties, registration);
   }
           

該類是一個配置類,注入了

NacosServiceRegistry

NacosRegistration

NacosAutoServiceRegistration

等bean。

其中

  • NacosServiceRegistry

    : serviceRegistry接口,作為nacos服務注冊的實作
  • NacosRegistration

    : 實作了

    Registration

    ServiceInstance

    ,使用者存儲服務執行個體資訊,如端口,ip等等
  • NacosAutoServiceRegistration

    :繼承自

    AbstractAutoServiceRegistration

    類,用于通過事件的機制觸發服務的自動注冊。

是以入口我們基本上已經找到了,會在

NacosAutoServiceRegistration

中觸發服務的注冊。

進入到

AbstractAutoServiceRegistration

類我們可以看到該類實作了

ApplicationListener<WebServerInitializedEvent>

接口,是以會監聽到

WebServerInitializedEvent

的事件,如果使用dubbo作為注冊中心且沒有web的環境時,将會走

DubboServiceRegistrationNonWebApplicationAutoConfiguration

類的自動裝配,該類中會監聽

ApplicationStartedEvent

事件,然後調用

serviceRegistry.register()

方法。這裡我們從web環境的入口進入。

進入到

AbstractAutoServiceRegistration

類的監聽方法

onApplicationEvent

。該方法會調用一個bind方法

public void bind(WebServerInitializedEvent event) {
   ApplicationContext context = event.getApplicationContext();
   if (context instanceof ConfigurableWebServerApplicationContext) {
      if ("management".equals(((ConfigurableWebServerApplicationContext) context)
            .getServerNamespace())) {
         return;
      }
   }
   this.port.compareAndSet(0, event.getWebServer().getPort());
   this.start();
}
           
  • 為port指派,擷取web項目監聽的端口
  • 調用start()方法開始注冊服務

進入到start()方法

該方法最終會調用子類的

register

方法,對端口進行校驗,校驗之後調用父類的

register

方法,在父類的

register

方法中,調用注入的

serviceRegistry

register

方法,并傳入注入的

Registration

.如下:

protected void register() {
   this.serviceRegistry.register(getRegistration());
}
           

現在進入到

serviceRegistry.register

方法中。因為他的實作是

NacosServiceRegistry

,是以我們先進入到

NacosServiceRegistry

的構造方法中看看該類初始化時做了什麼

public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
   this.nacosDiscoveryProperties = nacosDiscoveryProperties;
   this.namingService = nacosDiscoveryProperties.namingServiceInstance();
}
           

該類在注入時,會傳入一個nacosDiscoveryProperties,同時可以通過nacosDiscoveryProperties.namingServiceInstance();方法擷取到一個namingService,該對象是nacos提供的服務注冊的API。在namingService對象的初始化時初始化的是

NacosNamingService

類,将會初始化如下資訊

private void init(Properties properties) {
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    this.initServerAddr(properties);
    InitUtils.initWebRootContext();
    this.initCacheDir();
    this.initLogName(properties);
    this.eventDispatcher = new EventDispatcher();
    this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
    this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));
    this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, this.cacheDir, this.isLoadCacheAtStart(properties), this.initPollingThreadCount(properties));
}
           

然後回到

NacosServiceRegistry

中的register方法中

public void register(Registration registration) {

   if (StringUtils.isEmpty(registration.getServiceId())) {
      log.warn("No service to register for nacos client...");
      return;
   }

   String serviceId = registration.getServiceId();
   String group = nacosDiscoveryProperties.getGroup();

   Instance instance = getNacosInstanceFromRegistration(registration);

   try {
      namingService.registerInstance(serviceId, group, instance);
      log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
            instance.getIp(), instance.getPort());
   }
   catch (Exception e) {
      log.error("nacos registry, {} register failed...{},", serviceId,
            registration.toString(), e);
      // rethrow a RuntimeException if the registration is failed.
      // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
      rethrowRuntimeException(e);
   }
}
           
  • 參數校驗
  • 擷取

    serviceId

    ,作為服務名,預設會擷取

    spring.application.name

    配置的名字
  • 擷取group,可配置,預設為Default,
  • 調用

    getNacosInstanceFromRegistration

    組裝一個Instance,表示一個服務的具體的是一個執行個體對象instance.該方法中主要攜帶了post,ip,權重,中繼資料資訊,叢集名稱等。如下
    Instance instance = new Instance();
    instance.setIp(registration.getHost());
    instance.setPort(registration.getPort());
    instance.setWeight(nacosDiscoveryProperties.getWeight());
    instance.setClusterName(nacosDiscoveryProperties.getClusterName());
    instance.setMetadata(registration.getMetadata());
               
  • 調用

    namingService.registerInstance(serviceId, group, instance);

    進行具體的服務注冊

進入到

namingService.registerInstance(serviceId, group, instance);

方法,該對象中有很多的重載方法,最終都是建構一個instance的執行個體,是以我們直接進入到該方法

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
        this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }

    this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
           

如果執行個體是一個臨時節點,需要盡心心跳檢測,配置心跳資訊對象

BeatInfo

,調用this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);方法将心跳資訊傳入。該方法會通過線程池建立一個scheduler.發送心跳,這裡就不具體分析了。

進入到

this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);

方法,看看具體的服務注冊

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
        namespaceId, serviceName, instance);

    final Map<String, String> params = new HashMap<String, String>(9);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));

    reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}
           

封裝發起請求的參數及注冊的api:這裡傳入的是/nacos/v1/ns/instance,因為在nacos中的服務注冊其實是通過http請求進行的。接下來調用callServer方法

public String callServer(String api, Map<String, String> params, String body, String curServer, String method)
    throws NacosException {
    long start = System.currentTimeMillis();
    long end = 0;
    injectSecurityInfo(params);
    List<String> headers = builderHeaders();

    String url;
    if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
        url = curServer + api;
    } else {
        if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
            curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
        }
        url = HttpClient.getPrefix() + curServer + api;
    }

    HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);
    end = System.currentTimeMillis();

    MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
        .observe(end - start);

    if (HttpURLConnection.HTTP_OK == result.code) {
        return result.content;
    }

    if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
        return StringUtils.EMPTY;
    }

    throw new NacosException(result.code, result.content);
}
           

使用HttpClient發起一個http接口的調用。到這裡,關于服務注冊的client已經完成了。

Nacos 整合Spring Cloud 的服務發現

在Spring Cloud中同樣存在一個服務發現的接口

DiscoveryClient

,作為服務的發現接口,而在nacos中同樣存在實作了該接口的類,

NacosDiscoveryClient

,它的注入依然是通過spring boot的自動裝配實作的,進入到該類的

getInstances

方法中

public List<ServiceInstance> getInstances(String serviceId) {
   try {
      return serviceDiscovery.getInstances(serviceId);
   }
   catch (Exception e) {
      throw new RuntimeException(
            "Can not get hosts from nacos server. serviceId: " + serviceId, e);
   }
}
           

該方法中會調用

serviceDiscovery

該方法根據服務名傳回該服務的所有執行個體。服務名的擷取方式有很多,比如restTemplate攔截指定的服務名,feign用戶端指定等等。

現在直接進入到

serviceDiscovery.getInstances(serviceId);

方法中。

public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
   String group = discoveryProperties.getGroup();
   List<Instance> instances = discoveryProperties.namingServiceInstance()
         .selectInstances(serviceId, group, true);
   return hostToServiceInstanceList(instances, serviceId);
}
           

從配置

discoveryProperties

擷取分組資訊,并通過配置初始化一個

namingServiceInstance

的對象,通過該對象調用

selectInstances

方法擷取所有執行個體。namingService的擷取方式跟服務注冊一樣,這裡就不再詳述。

進入到

selectInstances

方法。該方法會根據是否訂閱通過不同的方式擷取到ServiceInfo.SerivceInfo中包含了我們服務的所有資訊,如叢集名稱,執行個體list,分組名稱,服務名等。selectInstances如下

public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    if (subscribe) {
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    }
    return selectInstances(serviceInfo, healthy);
}
           

如果沒有訂閱時,将調用

hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));

直接從遠端服務端拉取服務。最終調用

serverProxy.queryList(serviceName, clusters, 0, false);

方法擷取服務。在

queryList

中将會組裝參數和遠端調用的接口api

/v1/ns/instance/list

接口,然後遠端調用接口。傳回ServiceInfo。

我們重點來看看當訂閱時,做了些什麼東西。

進入到

hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));

方法中。

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }

    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);

        serviceInfoMap.put(serviceObj.getKey(), serviceObj);

        updatingMap.put(serviceName, new Object());
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {

        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }

    scheduleUpdateIfAbsent(serviceName, clusters);

    return serviceInfoMap.get(serviceObj.getKey());
}
           
  • 現根據服務名和叢集名擷取ServiceInfo.如果不null,則如果服務需要根性,就根性,否則就傳回該服務對應的ServiceInfo.
  • 如果為null,則立刻拉取服務

進入

updateServiceNow

方法中。

public void updateServiceNow(String serviceName, String clusters) {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {

        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);

        if (StringUtils.isNotEmpty(result)) {
            processServiceJSON(result);
        }
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}
           

該方法将會立刻調用

serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);

方法查詢最新的服務。如果擷取到,就進入到

processServiceJSON(result);

進行處理。我們注意到,這裡傳入了一個UDP的端口,該端口會在hostReactor初始化時初始化一個

PushReceiver

,該對象中初始化一個

DatagramSocket

,并使用線程池處理接收到的請求,當注冊中心服務提供者出現變化時,會主動回調該接口,發送變動的服務提供者執行個體,這裡接到請求後在進行處理。

該方法比較長,主要的功能就是做一些判斷,然後将新擷取到的ServiceInfo組裝起來。緩存到

serviceInfoMap

集合中。

回到

getServiceInfo

方法,此時

serviceInfoMap

中已經存在ServiceInfo對象了,進入到

scheduleUpdateIfAbsent

方法

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
        return;
    }

    synchronized (futureMap) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }

        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }
}
           

該方法會添加一個排程任務,定時的去更新遠端服務.因為添加的是一個任務,是以我們直接去看

UpdateTask

中的run方法

public void run() {
    try {
        ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

        if (serviceObj == null) {
            updateServiceNow(serviceName, clusters);
            executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
            return;
        }

        if (serviceObj.getLastRefTime() <= lastRefTime) {
            updateServiceNow(serviceName, clusters);
            serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        } else {
            // if serviceName already updated by push, we should not override it
            // since the push data may be different from pull through force push
            refreshOnly(serviceName, clusters);
        }

        lastRefTime = serviceObj.getLastRefTime();

        if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
            !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
            // abort the update task:
            NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
            return;
        }

        executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);


    } catch (Throwable e) {
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
    }

}
           

定時的去遠端拉取服務。每隔10s執行一次,根據擷取到的服務進行判斷,是否更新本地緩存。

到這裡,服務發現就講完了,同時用戶端從注冊中心更新服務的兩種方式都已經得到了展現。