天天看點

微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析

目錄

  • 前言
  • 1. 用戶端注冊進 Nacos 注冊中心(用戶端視角)
    • 1.1 Spring Cloud 提供的規範标準
    • 1.2 Nacos 的自動配置類
    • 1.3 監聽服務初始化事件 AbstractAutoServiceRegistration.bind()
    • 1.4 注冊服務執行個體的邏輯 NacosServiceRegistry.register()
      • 1.4.1 心跳機制 BeatReactor.addBeatInfo()
      • 1.4.2 注冊服務 NamingProxy.registerService()
    • 1.5 以 Open API 方式發送注冊請求
    • 1.6 小結
  • 2. Nacos 伺服器注冊服務(伺服器視角)
    • 2.1 伺服器接收請求 InstanceController.register()
    • 2.2 在伺服器控制台注冊服務執行個體 ServiceManager.registerInstance()
    • 2.3 建立空服務 ServiceManager.createEmptyService()
      • 2.3.1 将服務添加到緩存 Service.putService()
      • 2.3.2 建立心跳機制 Service.init()
      • 2.3.3 實作資料一緻性的監聽 DelegateConsistencyServiceImpl.listen()
    • 2.4 小結
  • 3. 用戶端查詢所有服務執行個體
    • 3.1 消費者用戶端向 Nacos 送出請求
    • 3.2 Nacos 伺服器處理請求 InstanceController.list()
    • 3.2 擷取所有服務的所有資訊 InstanceController.doSrvIpxt()
  • 4. 用戶端監聽 Nacos 伺服器以動态擷取服務執行個體
    • 4.1 用戶端發送請求
    • 4.2 服務動态感覺的原理
    • 4.3 Nacos 伺服器處理請求
  • 5. 補充内容
    • 5.1 Dubbo 的自動裝配
  • 6. 源碼結構圖總結
    • 6.1 用戶端視角下的服務注冊結構圖
    • 6.2 伺服器視角下的服務注冊結構圖
    • 6.3 用戶端查詢所有服務執行個體結構圖
  • 最後

參考資料:

《Spring Microservices in Action》

《Spring Cloud Alibaba 微服務原理與實戰》

《B站 尚矽谷 SpringCloud 架構開發教程 周陽》

為友善了解與表達,這裡把 Nacos 控制台和 Nacos 注冊中心稱為 Nacos 伺服器(就是 web 界面那個),我們編寫的業務服務稱為 Nacso 用戶端;

Nacos 用戶端将自己注冊進 Nacos 伺服器。《1. 服務如何注冊進 Nacos 注冊中心》主要從 Nacos 用戶端角度解釋如何發送資訊給 Nacos 伺服器;《2. Nacos 伺服器注冊服務》主要從 Nacos 伺服器角度解釋注冊原理;

《3. 用戶端查詢所有服務執行個體》将從服務消費者和提供者的角度,解釋服務消費者如何擷取提供者的所有執行個體。服務消費者和提供者都是 Nacos 的用戶端;

《4. 用戶端監聽 Nacos 伺服器以動态擷取服務執行個體》從消費者用戶端角度出發監聽 Nacos 伺服器,以動态獲知提供者的變化;

  • 服務要注冊進 Spring Cloud 內建的 Nacos,首先要滿足 Spring Cloud 提供的規範标準;
  • 在 spring-cloud-common 包中有一個類 org.springframework.cloud.client.serviceregistry.ServiceRegistry,它是Spring Cloud 提供的服務注冊的标準。內建到 Spring Cloud 中實作服務注冊的元件,都會實作該接口;
public interface ServiceRegistry<R extends Registration>{
    void register(R registration); 
    void deregister(R registration); 
    void close(); 
    void setStatus(R registration,String status);
    <T> T getstatus(R registration);
}
           

  • 在 spring-cloud-commons 包的

    META-INF/spring.factories

    中包含自動裝配的配置資訊。即約定 Spring Cloud 啟動時,會将那些類自動注入到容器中:
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析
  • 其中 AutoServiceRegistrationAutoConfiguration(服務注冊自動配置類) 是服務注冊相關配置類,源碼如下:
@Configuration
@Import({AutoServiceRegistrationConfiguration.class})
@ConditionalOnProperty(
    value = {"spring.cloud.service-registry.auto-registration.enabled"},
    matchIfMissing = true
)
public class AutoServiceRegistrationAutoConfiguration {
    //自動注冊類
    @Autowired(required = false)
    private AutoServiceRegistration autoServiceRegistration;
    //自動注冊類的配置檔案
    @Autowired
    private AutoServiceRegistrationProperties properties;

    public AutoServiceRegistrationAutoConfiguration() {
    }

    //初始化函數
    @PostConstruct
    protected void init() {
        if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
            throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");
        }
    }
}
           
  • 其中關鍵之處在于注入了一個 AutoServiceRegistration(服務注冊器) 自動注冊接口,該接口在 Spring Cloud 中有一個抽象實作類 AbstractAutoServiceRegistration(服務注冊器抽象類);
  • 我們要用什麼注冊中心,該注冊中心就要繼承 AbstractAutoServiceRegistration(服務注冊器抽象類) 抽象類;
  • 對于 Nacos 來說,使用 NacosAutoServiceRegistration(Nacos 服務注冊器) 類繼承 AbstractAutoServiceRegistration(服務注冊器抽象類) 抽象類;
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析

  • 在上述中需要關注 ApplicationListener(事件監聽器) 接口,它是一種事件監聽機制,接口聲明如下:
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
    void onApplicationEvent(E var1);
}
           
  • AbstractAutoServiceRegistration(服務注冊器抽象類) 使用

    AbstractAutoServiceRegistration.bind()

    方法實作了該接口,用來監聽 WebServerInitializedEvent(服務初始化事件);
@Override
@SuppressWarnings("deprecation")
//【斷點步入】
public void onApplicationEvent(WebServerInitializedEvent event) {
	bind(event);
}
           
  • 我們給

    AbstractAutoServiceRegistration.bind()

    方法打上斷點,啟動服務提供者,可以發現:
    • Nacos 伺服器上沒有服務注冊執行個體;
    • 服務初始化已經完成;
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析
  • 【結論】說明服務注冊到 Nacos 的流程是先進行服務初始化,然後通過事件監聽機制監聽初始化事件。當初始化完成時,調用

    AbstractAutoServiceRegistration.bind()

    方法将服務注冊進 Nacos 注冊中心;

這裡能說明什麼時候服務會将自己的資訊發給 Nacos 伺服器;
  • 我們追進

    AbstractAutoServiceRegistration.bind()

    方法,發現在 AbstractAutoServiceRegistration(服務注冊器抽象類) 裡調用

    NacosServiceRegistry.register()

    方法後, Nacos 伺服器上出現服務執行個體;
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析
  • NacosServiceRegistry.register()

    方法,方法的邏輯如下:
@Override
public void register(Registration registration) {
    //判斷是否有服務 ID
	if (StringUtils.isEmpty(registration.getServiceId())) {
		log.warn("No service to register for nacos client...");
		return;
	}
	String serviceId = registration.getServiceId();  //服務 ID(service-provider)
	Instance instance = getNacosInstanceFromRegistration(registration);  //服務執行個體(裡面有 ip、port 等資訊)
	try {
	    //【斷點步入】注冊的方法
		namingService.registerInstance(serviceId, instance);
		log.info("nacos registry, {} {}:{} register finished", serviceId,
				instance.getIp(), instance.getPort());
	}
	catch (Exception e) {
		log.error("nacos registry, {} register failed...{},", serviceId,
				registration.toString(), e);
	}
}
           
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析
  • 我們點進去

    namingService.registerInstance()

    方法(實作邏輯在 NacosNamingService.registerInstance())得到注冊的具體邏輯,如下:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    if (instance.isEphemeral()) {
        //用心跳 BeatInfo 封裝服務執行個體資訊
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        long instanceInterval = instance.getInstanceHeartBeatInterval();
        beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
        //【斷點步入 1.4.1】将 beatInfo 心跳資訊放進 beatReactor 心跳發送器(發送心跳後,Nacos 伺服器仍然沒有服務執行個體)
        this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }
    //【斷點步入 1.4.2】使用 namingProxy 命名代理方式将服務執行個體資訊以 POST 請求方式發送服務執行個體
    this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
           
  • 即注冊服務執行個體的邏輯分兩步:
    • 先通過

      beatReactor.addBeatInfo

      建立心跳資訊實作健康檢測;
    • 再以 POST 請求方式發送服務執行個體資訊;

  • 我們進入

    BeatReactor.addBeatInfo()

    方法一探心跳機制,源碼如下:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    this.dom2Beat.put(this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
    //【核心】定時向服務端發送一個心跳包 beatInfo
    this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS);
    //【核心】
    MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
           
  • BeatReactor.addBeatInfo()

    方法主要做了兩件事:
    • 用戶端調用

      ScheduledExecutorService.schedule()

      接口方法(靠 ScheduledThreadPoolExecutor(計劃線程執行器) 實作)執行定時任務,在每個任務周期内定時向服務端發送一個心跳包 beatInfo;
    • 然後通過

      MetricsMonitor.getDom2BeatSizeMonitor()

      方法擷取一個 心跳測量螢幕(實際為 Gauge),不斷檢測服務端的回應,如果在設定時間内沒有收到服務端的回應,則認為伺服器出現了故障;
  • Nacos 服務端會根據用戶端的心跳包不斷更新服務的狀态;

  • 接着進入

    NamingProxy.registerService()

    方法,源碼如下:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
    Map<String, String> params = new HashMap(9);
    params.put("namespaceId", this.namespaceId);
    params.put("serviceName", serviceName);
    params.put("groupName", groupName);
    params.put("clusterName", instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));
    //【斷點步入】這步執行完後,Nacos 伺服器才出現服務執行個體資訊
    this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");
}
           
  • 該方法先對服務執行個體做了封裝,然後通過

    NamingProxy.reqAPI()

    方法拼湊注冊服務的 API。

    NamingProxy.reqAPI()

    方法源碼如下:
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
        params.put("namespaceId", this.getNamespaceId());
        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
            throw new IllegalArgumentException("no server available");
        } else {
            Exception exception = new Exception();
            if (servers != null && !servers.isEmpty()) {
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size());

                for(int i = 0; i < servers.size(); ++i) {
                    String server = (String)servers.get(index);

                    try {
                        return this.callServer(api, params, server, method);
                    } catch (NacosException var11) {
                        exception = var11;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
                    } catch (Exception var12) {
                        exception = var12;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
                    }
                    index = (index + 1) % servers.size();
                }

                throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
            } else {
                int i = 0;
                while(i < 3) {
                    try {
                        return this.callServer(api, params, this.nacosDomain);
                    } catch (Exception var13) {
                        exception = var13;
                        LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
                        ++i;
                    }
                }
                throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
            }
        }
    }
           

  • 最終将以 Open API 方式發送如下請求給 Nacos 伺服器:
  • POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'

微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析

  • Nacos 在先加載配置檔案初始化服務,使用 ApplicationListener(事件監聽器) 監聽機制監聽該服務初始化事件,當服務初始化完成後,進入

    NacosServiceRegistry.register()

    注冊邏輯;
  • 注冊原理分兩步:
    • 先使用 NacosNamingService(Nacos 命名服務) 發送心跳;
    • 再使用 NamingProxy(命名代理),以 POST 請求方式發送服務執行個體給 Nacos 伺服器;
  • 即 Nacos Service 必須要確定注冊的服務執行個體是健康的(心跳檢測),才能進行服務注冊;

  • Nacos 伺服器的源碼下載下傳、啟動詳情請見《微服務架構 | 3.2 Alibaba Nacos 注冊中心》;
  • 上述提到用戶端注冊伺服器的最後一步是向伺服器發送如下 Open API 請求:
  • POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'

  • 那麼伺服器的源碼分析将從這個請求開始;

  • 伺服器在 nacos-naming 子產品下 InstanceController(服務執行個體控制器) 裡定義了一個

    register

    方法用來處理用戶端的注冊,源碼如下:
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {

    //省略其他代碼

    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        //從請求體裡解析出 namespaceId 命名空間(本例中是 public)
        final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //解析 serviceName 服務名(本例中是 DEFAULT_GROUP@@service-provider,實際就是service-provider)
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = parseInstance(request);
        //【斷點步入】在伺服器控制台注冊服務執行個體的方法
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

        //省略其他代碼
}
           
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析

  • ServiceManager.registerInstance()

    方法的源碼如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    //【斷點步入 2.3】建立空服務
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    
    Service service = getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    //添加服務執行個體
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
           
  • 這段代碼主要做了三個邏輯的内容:
    • createEmptyService()

      :建立空服務(用于在 Nacos 伺服器控制台的“服務清單”中展示服務資訊),實際上是初始化一個 serviceMap,它是一個 ConcurrentHashMap 集合;
    • getService()

      :從 serviceMap 中根據 namespaceld 和 serviceName 得到一個服務對象;
    • addInstance()

      :把目前注冊的服務執行個體儲存到 Service 中;

  • 我們一直追進去,發現最後調用的是

    ServiceManager.createServiceIfAbsent()

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
    //通過命名空間 ID 和服務名從緩存中擷取 service 服務,第一次是沒有的,進入 if 語句建立 service
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        //判斷服務是否有效,不生效将抛出異常
        service.validate();
        //【斷點步入】添加與初始化服務
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}
           
  • 第一次注冊服務時緩存中肯定是沒有的,我們進入到 if 語句裡的

    ServiceManager.putServiceAndInit()

    方法;
private void putServiceAndInit(Service service) throws NacosException {
    //【斷點步入 2.3.1 】将服務添加到緩存
    putService(service);
    service = getService(service.getNamespaceId(), service.getName());
    //【斷點步入 2.3.2 】建立心跳機制
    service.init();
    //【斷點步入 2.3.3 】實作資料一緻性的監聽,将服務資料進行同步
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
           

public void putService(Service service) {
    if (!serviceMap.containsKey(service.getNamespaceId())) {
        synchronized (putServiceLock) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
            }
        }
    }
    //将 Service 儲存到 serviceMap 中
    serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}
           

public void init() {
    //【斷點步入】定時檢查心跳
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}
           
  • 進入

    HealthCheckReactor.scheduleCheck()

public static void scheduleCheck(ClientBeatCheckTask task) {
    futureMap.computeIfAbsent(task.taskKey(),
            k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
           
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析
  • 其主要是通過建立定時任務不斷檢測目前服務下所有執行個體最後發送心跳包的時間。如果逾時,則設定healthy為false表示服務不健康,并且發送服務變更事件;
  • 心跳檢測機制可以用下圖概述:
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析

@Override
public void listen(String key, RecordListener listener) throws NacosException {
    
    // 這個鍵 key 會被兩個服務監聽
    if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
        //持久一緻性服務
        persistentConsistencyService.listen(key, listener);
        //短暫一緻性服務
        ephemeralConsistencyService.listen(key, listener);
        return;
    }
    
    mapConsistencyService(key).listen(key, listener);
}
           

  • Nacos 用戶端通過 Open API 的形式發送服務注冊請求,服務端收到請求後,做以下三件事:
    • 建構一個 Service 對象儲存到 ConcurrentHashMap 集合中;
    • 使用定時任務對目前服務下的所有執行個體建立心跳檢測機制;
    • 基于資料一緻性協定将服務資料進行同步;
  • 至此,基于 Nacos 用戶端和服務端的服務注冊原了解析可以告一段落了,下面是一些補充内容;

  • 在進行 RPC 調用時,根據對服務的需求不同可以将 Nacos 用戶端分為兩種角色,分别是服務消費者和服務提供者;
  • 服務消費者需要使用提供者的服務,就先要向 Nacos 伺服器擷取所有已注冊的服務提供者執行個體,第 3 點将讨論用戶端(消費者)如何擷取這些執行個體(提供者);

  • 與用戶端向 Nacos 伺服器注冊服務一樣,消費者用戶端想要查詢提供者的所有執行個體,也要向 Nacos 伺服器發送請求;
  • 向 Nacos 伺服器發送請求也是由兩種形式,Open API 和 SDK。其中 SDK 最終也是以 Open API 方式發送的:
    • Open API:

      GET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider

    • SDK:

      List<Instance> selectInstances(String serviceName, boolean healthy)throws NacosException;

  • 我們使用 postman 發送

    GET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider

    請求,來模拟消費者用戶端擷取提供者執行個體;
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析
  • 這裡主要多發送幾次請求,因為 Nacos 伺服器可能緩存了所有提供者執行個體資訊,可能直接從緩存中拿而不是處理請求;

  • Nacos 伺服器處理擷取所有服務執行個體的控制器還是 nacos-naming 子產品下的 InstanceController(服務執行個體控制器);
  • InstanceController(服務執行個體控制器) 提供了一個

    InstanceController.list()

    接口處理該請求:
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析
  • 可以看到我們的斷點也被捕獲;
  • 源碼如下:
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
    
    //解析命名空間 ID
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    //解析服務名
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    //解析擷取一堆資訊
    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    
    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
    //【斷點步入 3.2】擷取所有服務的所有資訊
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
            healthyOnly);
}
           

  • InstanceController.doSrvIpxt()

    方法很長,筆者删去一些非重點源碼友善了解:
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

    //省略一些非核心代碼
    
    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //根據 namespaceld、serviceName 獲得 Service 執行個體
    Service service = serviceManager.getService(namespaceId, serviceName);
    List<Instance> srvedIPs;
    
    //擷取指定服務下的所有執行個體,從Service執行個體中基于srvIPs得到所有服務提供者的執行個體資訊
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
    
    //使用選擇器過濾一些 ip
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }
    
    Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
    ipMap.put(Boolean.TRUE, new ArrayList<>());
    ipMap.put(Boolean.FALSE, new ArrayList<>());
    
    //過濾不健康執行個體
    for (Instance ip : srvedIPs) {
        ipMap.get(ip.isHealthy()).add(ip);
    }
    
    //周遊完成 JSON 字元串的封裝
    ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
    for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
        List<Instance> ips = entry.getValue();
        if (healthyOnly && !entry.getKey()) {
            continue;
        }
        for (Instance instance : ips) {
            //删除不可用的執行個體
            if (!instance.isEnabled()) {
                continue;
            }
            ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
            ipObj.put("ip", instance.getIp());
            ipObj.put("port", instance.getPort());
            ipObj.put("valid", entry.getKey());
            ipObj.put("healthy", entry.getKey());
            ipObj.put("marked", instance.isMarked());
            ipObj.put("instanceId", instance.getInstanceId());
            ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
            ipObj.put("enabled", instance.isEnabled());
            ipObj.put("weight", instance.getWeight());
            ipObj.put("clusterName", instance.getClusterName());
            if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                ipObj.put("serviceName", instance.getServiceName());
            } else {
                ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
            }
            ipObj.put("ephemeral", instance.isEphemeral());
            hosts.add(ipObj);
        }
    }
    result.put("name", serviceName);
    result.put("cacheMillis", cacheMillis);
    result.put("lastRefTime", System.currentTimeMillis());
    result.put("checksum", service.getChecksum());
    result.put("useSpecifiedURL", false);
    result.put("clusters", clusters);
    result.put("env", env);
    result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
    return result;
}
           
  • 該方法的主要邏輯主要有三步:
    • 首先擷取所有服務執行個體;
    • 過濾一些服務執行個體;
    • 周遊完成 JSON 字元串的封裝并傳回;

  • 消費者用戶端角度出發監聽 Nacos 伺服器,以動态獲知提供者的變化;

  • 用戶端服務有兩種調用方式:
    • void subscribe(String serviceName, EventListener listener) throws NacosException

    • public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy, boolean subscribe)

      ,其中 subscribe 置為 true;

  • Nacos 用戶端有一個 HostReactor 類,它的功能是實作服務的動态更新;
  • 用戶端發起事件訂閱後,在 HostReactor 中有一個 UpdateTask 線程,每 10s 發送一次 Pull 請求,獲得服務端最新的位址清單;
  • 對于服務端,它和服務提供者的執行個體之間維持了心跳檢測,一旦服務提供者出現異常,則會發送一個 Push 消息給 Nacos 用戶端,也就是服務消費者;
  • 服務消費者收到請求之後,使用 HostReactor 中提供的 processServiceJSON 解析消息,并更新本地服務位址清單;
  • 原理可以用下圖總結:
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析

  • 伺服器與本篇第 3 點相同,其中伺服器的邏輯在《3.2 Nacos 伺服器處理請求》;

  • Spring Cloud Alibaba Dubbo 內建 Nacos 時,服務的注冊是依托 Dubbo 中的自動裝配機制完成的;
  • spring-cloud-alibaba-dubbo 下的

    META-INF/spring.factories

    檔案中自動裝配了一個和服務注冊相關的配置類 DubboServiceRegistrationNonWebApplicationAutoConfiguration(Dubbo 注冊自動配置類);
微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析
@Configuration
@ConditionalOnNotWebApplication
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter(DubboServiceRegistrationAutoConfiguration.class)
@Aspect
public class DubboServiceRegistrationNonWebApplicationAutoConfiguration {

	@Autowired
	private ServiceRegistry serviceRegistry; //實作類為 NacosServiceRegistry
    ...

	//監聽 ApplicationStartedEvent 事件,該事件在重新整理上下文之後,調用 application 指令前觸發;
	@EventListener(ApplicationStartedEvent.class)
	public void onApplicationStarted() {
		setServerPort();
		//最終調用 NacosServiceRegistry.registry 方法實作服務注冊
		register();
	}
	private void register() {
		if (registered) {
			return;
		}
		//這裡即 NacosServiceRegistry.registry()
		serviceRegistry.register(registration);
		registered = true;
	}
	...
}
           
  • NacosServiceRegistry.registry()

    的原理詳情請見本篇《1.4 注冊服務執行個體的邏輯 NacosServiceRegistry.register()》;
  • 由此可以得出結論:
    • Dubbo 內建 Nacos 時,服務的注冊是依賴 Dubbo 的自動裝配機制;
    • 而 Dubbo 的自動裝配機制是依靠

      NacosServiceRegistry.register()

      實作的;

源碼結構圖大緻與本篇目錄一緻;

  • AbstractAutoServiceRegistration.bind():監聽服務初始化事件;
    • NacosServiceRegistry.register():注冊服務執行個體;
      • NacosNamingService.registerInstance():通過Nacos 命名服務注冊服務執行個體;
        • BeatReactor.addBeatInfo():心跳機制;
          • ScheduledThreadPoolExecutor.schedule():執行定時任務,發送心跳包 beatInfo;
          • MetricsMonitor.getDom2BeatSizeMonitor():啟動心跳測量螢幕;
        • NamingProxy.registerService():以 Open API 方式發送注冊請求;
          • NamingProxy.reqAPI():拼湊注冊服務的 API;
            • NamingProxy.callServer():呼叫服務,發送請求;

  • InstanceController.register():伺服器接收請求;
    • ServiceManager.registerInstance():在伺服器控制台注冊服務執行個體;
      • ServiceManager.createEmptyService():建立空服務;
        • ServiceManager.createServiceIfAbsent():如果空缺就建立服務;
          • ServiceManager.putServiceAndInit():初始化服務;
            • Service.putService():将服務添加到緩存;
            • Service.init():建立心跳機制;
            • DelegateConsistencyServiceImpl.listen():實作資料一緻性的監聽;
      • ServiceManager.addInstance():添加服務執行個體;
        • KeyBuilder.buildInstanceListKey():建立服務是一緻性鍵;
        • DelegateConsistencyServiceImpl.put():将鍵和服務執行個體放 ConsistencyService 進行一緻性維護;

  • InstanceController.list():伺服器接收請求;
    • InstanceController.doSrvIpxt():擷取所有服務的所有資訊;
      • ServiceManager.getService():根據 namespaceld、serviceName 獲得 service 執行個體;
      • Service.srvIPs():擷取指定 service 服務下的所有執行個體;
      • JacksonUtils.createEmptyArrayNode():周遊完成 JSON 字元串的封裝;

新人制作,如有錯誤,歡迎指出,感激不盡!

歡迎關注公衆号,會分享一些更日常的東西!

如需轉載,請标注出處!

微服務架構 | *3.5 Nacos 服務注冊與發現的源碼分析