天天看點

Nacos服務發現原理分析

作者:碼上code
微服務将自己的執行個體注冊到nacos注冊中心,nacos服務端存儲了注冊清單,然後通過ribbon調用服務,具體是如何調用?如果nacos服務挂了,還能正常調用服務嗎?調用的服務清單發生變化,調用方是如何感覺變化的?帶着這些問題,來探索一下服務發現的原理。

版本 2.1.1

  • Nacos Server:2.1.1
  • spring-cloud-starter-alibaba:2.1.1.RELEASE
  • spring-boot:2.1.1.RELEASE
  • spring-cloud-starter-netflix-ribbon:2.1.1.RELEASE
用戶端和服務端版本号都為2.1.1。

從 Ribbon 講起

使用ribbon來調用服務,就添加ribbon依賴:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
           

ribbon依賴包含spring-cloud-commons依賴,而在spring-cloud-commons包中spring.factories自動配置LoadBalancerAutoConfiguration類:

@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();

@Bean
public LoadBalancerInterceptor ribbonInterceptor(
    LoadBalancerClient loadBalancerClient,
    LoadBalancerRequestFactory requestFactory) {
  return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
           
隻要标注了@LoadBalanced注解的restTemplates都會添加負載均衡攔截器LoadBalancerInterceptor。

使用Ribbon元件調用服務:

restTemplate.getForObject("http://service-name",String.class);
           

restTemplate的http請求方法,最終會調用到doExecute方法。doExecute在發起http請求之前,會先執行LoadBalancerInterceptor負載均衡攔截器的intercept方法。 該方法調用execute方法。

而在execute方法中,主要有兩個方法:

ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
           

execute先通過getLoadBalancer擷取ILoadBalancer執行個體,然後再通過getServer擷取Server執行個體。

getLoadBalancer最終會調用Ribbon的ServerList接口,具體調用流程:

getLoadBalancer() ->
ZoneAwareLoadBalancer -> 
DynamicServerListLoadBalancer -> 
restOfInit()->
updateListOfServers()->
ServerList.getUpdatedListOfServers()->
           

Nacos實作類NacosServerList實作了ServerList接口。

總之我們在進行微服務調用的時候,Ribbon最終會調用NacosServerList類中的getUpdatedListOfServers方法。

Nacos 擷取服務

NacosServerList類的getUpdatedListOfServers方法調用了該類的getServers方法:

private List<NacosServer> getServers() {
  try {
    // 擷取分組 
    String group = discoveryProperties.getGroup();
    // 重點,查詢執行個體清單
    List<Instance> instances = discoveryProperties.namingServiceInstance()
        .selectInstances(serviceId, group, true);
    return instancesToServerList(instances);
  }
  catch (Exception e) {
    throw new IllegalStateException(
        "Can not get service instances from nacos, serviceId=" + serviceId,
        e);
  }
}
           

重點看NacosNamingService類的selectInstances方法,會調用以下selectInstances三個重載方法:

@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
    return selectInstances(serviceName, groupName, healthy, true);
}
    
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
    return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
}
    
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    // 預設訂閱
    if (subscribe) {
        // 擷取服務,這是重點
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    }
    return selectInstances(serviceInfo, healthy);
}
           

最後一個selectInstances方法裡面的hostReactor.getServiceInfo方法是擷取服務的核心方法:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // 先在本地緩存查詢
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    // 查詢不到 
    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);

        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        updatingMap.put(serviceName, new Object());
        // 請求Nacos Server執行個體,并更新服務執行個體
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {

        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
    // 定時更新本地緩存
    scheduleUpdateIfAbsent(serviceName, clusters);

    return serviceInfoMap.get(serviceObj.getKey());
}
           

getServiceInfo是服務發現的核心方法,先查詢serviceInfoMap集合中查詢本地緩存,本地緩存查詢不到就請求Nacos Server執行個體,并更新本地緩存。

Nacos服務發現原理分析

請求Nacos Server執行個體,實際就是發送http請求Nacos Server:

public void updateServiceNow(String serviceName, String clusters) {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        // 調用 Nacos Server 查詢服務
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
        // 結果不為空,更新緩存  
        if (StringUtils.isNotEmpty(result)) {
            processServiceJSON(result);
        }
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

//向 Nacos Server發起 HTTP 清單查詢
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {

    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));

    return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}
           

queryList方法主要封裝号請求參數,然後向Nacos Server服務端發送http請求。

當服務端執行個體發生改變時,Nacos Server會推送最新的執行個體給服務端。

Nacos服務發現原理分析

服務發現是先擷取本地緩存,如果沒有本地緩存,就請求Nacos Server服務端擷取資料,如果Nacos Server挂了,也不會影響服務的調用。

總結

  • Ribbon 項目啟動時,會建立一個負載均衡攔截器。從Ribbon發起服務請求開始,最終會調用到攔截器的攔截方法。攔截方法又調用ServerList擷取執行個體接口,而NacosServerList實作擷取執行個體清單。
  • Nacos調用服務 NacosServerList實作了擷取服務執行個體清單。NacosServerList類selectInstances方法最終調用了hostReactor.getServiceInfo方法getServiceInfo方法先從serviceInfoMap集合中擷取本地緩存,如果本地緩存找不到,就請求Nacos Server擷取服務執行個體,并更新本地緩存。擷取服務之後,定時更新本地緩存。

參考

  • Spring Cloud nacos Ribbon整合源碼分析
  • 服務發現:服務之間調用請求鍊路分析

繼續閱讀