天天看點

Nacos源碼分析專題(四)-服務發現

1.用戶端

1.1.定時更新服務清單

1.1.1.NacosNamingService

在前面我們講到一個類​

​NacosNamingService​

​,這個類不僅僅提供了服務注冊功能,同樣提供了服務發現的功能。

Nacos源碼分析專題(四)-服務發現

多個重載的方法最終都會進入一個方法:

@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
                                      boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    // 1.判斷是否需要訂閱服務資訊(預設為 true)
    if (subscribe) {
        // 1.1.訂閱服務資訊
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                                                 StringUtils.join(clusters, ","));
    } else {
        // 1.2.直接去nacos拉取服務資訊
        serviceInfo = hostReactor
            .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                                              StringUtils.join(clusters, ","));
    }
    // 2.從服務資訊中擷取執行個體清單并傳回
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}      

1.1.2.HostReactor

進入訂閱服務消息,這裡是由​

​HostReactor​

​​類的​

​getServiceInfo()​

​方法來實作的:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    // 由 服務名@@叢集名拼接 key
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // 讀取本地服務清單的緩存,緩存是一個Map,格式:Map<String, ServiceInfo>
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    // 判斷緩存是否存在
    if (null == serviceObj) {
        // 不存在,建立空ServiceInfo
        serviceObj = new ServiceInfo(serviceName, clusters);
        // 放入緩存
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        // 放入待更新的服務清單(updatingMap)中
        updatingMap.put(serviceName, new Object());
        // 立即更新服務清單
        updateServiceNow(serviceName, clusters);
        // 從待更新清單中移除
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {
        // 緩存中有,但是需要更新
        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish 等待5秒中,待更新完成
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER
                        .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
    // 開啟定時更新服務清單的功能
    scheduleUpdateIfAbsent(serviceName, clusters);
    // 傳回緩存中的服務資訊
    return serviceInfoMap.get(serviceObj.getKey());
}      

基本邏輯就是先從本地緩存讀,根據結果來選擇:

  • 如果本地緩存沒有,立即去​

    ​nacos​

    ​​讀取,​

    ​updateServiceNow(serviceName, clusters)​

  • Nacos源碼分析專題(四)-服務發現
  • 如果本地緩存有,則開啟定時更新功能,并傳回緩存結果:
  • ​scheduleUpdateIfAbsent(serviceName, clusters)​

  • Nacos源碼分析專題(四)-服務發現
  • 在​

    ​UpdateTask​

    ​​中,最終還是調用​

    ​updateService​

    ​方法:
  • Nacos源碼分析專題(四)-服務發現
  • 不管是立即更新服務清單,還是定時更新服務清單,最終都會執行​

    ​HostReactor​

    ​​中的​

    ​updateService()​

    ​方法:
public void updateService(String serviceName, String clusters) throws NacosException {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        // 基于ServerProxy發起遠端調用,查詢服務清單
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

        if (StringUtils.isNotEmpty(result)) {
            // 處理查詢結果
            processServiceJson(result);
        }
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}      

1.1.3.ServerProxy

而​

​ServerProxy​

​​的​

​queryList​

​方法如下:

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
    throws NacosException {
    // 準備請求參數
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));
    // 發起請求,位址與API接口一緻
    return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}      

1.2.處理服務變更通知

除了定時更新服務清單的功能外,Nacos還支援服務清單變更時的主動推送功能。

在​

​HostReactor​

​類的構造函數中,有非常重要的幾個步驟:

Nacos源碼分析專題(四)-服務發現

基本思路是:

  • 通過​

    ​PushReceiver​

    ​監聽服務端推送的變更資料
  • 解析資料後,通過​

    ​NotifyCenter​

    ​釋出服務變更的事件
  • ​InstanceChangeNotifier​

    ​監聽變更事件,完成對服務清單的更新

1.2.1.PushReceiver

我們先看​

​PushReceiver​

​​,這個類會以​

​UDP​

​​方式接收​

​Nacos​

​服務端推送的服務變更資料。

先看構造函數:

public PushReceiver(HostReactor hostReactor) {
    try {
        this.hostReactor = hostReactor;
        // 建立 UDP用戶端
        String udpPort = getPushReceiverUdpPort();
        if (StringUtils.isEmpty(udpPort)) {
            this.udpSocket = new DatagramSocket();
        } else {
            this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
        }
        // 準備線程池
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.push.receiver");
                return thread;
            }
        });
        // 開啟線程任務,準備接收變更資料
        this.executorService.execute(this);
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] init udp socket failed", e);
    }
}      

​PushReceiver​

​​構造函數中基于線程池來運作任務。這是因為​

​PushReceiver​

​​本身也是一個​

​Runnable​

​,其中的run方法業務邏輯如下:

@Override
public void run() {
    while (!closed) {
        try {
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            // 接收推送資料
            udpSocket.receive(packet);
            // 解析為json字元串
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
            // 反序列化為對象
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                // 交給 HostReactor去處理
                hostReactor.processServiceJson(pushPacket.data);

                // send ack to server 發送ACK回執,略。。
        } catch (Exception e) {
            if (closed) {
                return;
            }
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}      

1.2.2.HostReactor

通知資料的處理由交給了​

​HostReactor​

​​的​

​processServiceJson​

​方法:

public ServiceInfo processServiceJson(String json) {
    // 解析出ServiceInfo資訊
    ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    // 查詢緩存中的 ServiceInfo
    ServiceInfo oldService = serviceInfoMap.get(serviceKey);

    // 如果緩存存在,則需要校驗哪些資料要更新
    boolean changed = false;
    if (oldService != null) {
        // 拉取的資料是否已經過期
        if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
            NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                               + serviceInfo.getLastRefTime());
        }
        // 放入緩存
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        
        // 中間是緩存與新資料的對比,得到newHosts:新增的執行個體;remvHosts:待移除的執行個體;
        // modHosts:需要修改的執行個體
        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
            // 釋出執行個體變更的事件
            NotifyCenter.publishEvent(new InstancesChangeEvent(
                serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
            DiskCache.write(serviceInfo, cacheDir);
        }

    } else {
        // 本地緩存不存在
        changed = true;
        // 放入緩存
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        // 直接釋出執行個體變更的事件
        NotifyCenter.publishEvent(new InstancesChangeEvent(
            serviceInfo.getName(), serviceInfo.getGroupName(),
            serviceInfo.getClusters(), serviceInfo.getHosts()));
        serviceInfo.setJsonFromServer(json);
        DiskCache.write(serviceInfo, cacheDir);
    }
    // 。。。
    return serviceInfo;
}      

2.服務端

2.1.拉取服務清單接口

在介紹的​

​InstanceController​

​中,提供了拉取服務清單的接口:

/**
     * Get all instance of input service.
     *
     * @param request http request
     * @return list of instance
     * @throws Exception any error during list
     */
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
    // 從request中擷取namespaceId和serviceName
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);

    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    // 擷取用戶端的 UDP端口
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));

    // 擷取服務清單
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                     healthyOnly);
}      

進入​

​doSrvIpxt()​

​方法來擷取服務清單:

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent,
                            String clusters, String clientIP,
                            int udpPort, String env, boolean isCheck,
                            String app, String tid, boolean healthyOnly) throws Exception {
    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    // 擷取服務清單資訊
    Service service = serviceManager.getService(namespaceId, serviceName);
    long cacheMillis = switchDomain.getDefaultCacheMillis();

    // now try to enable the push
    try {
        if (udpPort > 0 && pushService.canEnablePush(agent)) {
            // 添加目前用戶端 IP、UDP端口到 PushService 中
            pushService
                .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                           pushDataSource, tid, app);
            cacheMillis = switchDomain.getPushCacheMillis(serviceName);
        }
    } catch (Exception e) {
        Loggers.SRV_LOG
            .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
        cacheMillis = switchDomain.getDefaultCacheMillis();
    }

    if (service == null) {
        // 如果沒找到,傳回空
        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
        }
        result.put("name", serviceName);
        result.put("clusters", clusters);
        result.put("cacheMillis", cacheMillis);
        result.replace("hosts", JacksonUtils.createEmptyArrayNode());
        return result;
    }
    // 結果的檢測,異常執行個體的剔除等邏輯省略
    // 最終封裝結果并傳回 。。。

    result.replace("hosts", hosts);
    if (clientInfo.type == ClientInfo.ClientType.JAVA
        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
        result.put("dom", serviceName);
    } else {
        result.put("dom", NamingUtils.getServiceName(serviceName));
    }
    result.put("name", serviceName);
    result.put("cacheMillis", cacheMillis);
    result.put("lastRefTime", System.currentTimeMillis());
    result.put("checksum", service.getChecksum());
    result.put("useSpecifiedURL", false);
    result.put("clusters", clusters);
    result.put("env", env);
    result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
    return result;
}      

2.2.釋出服務變更的UDP通知

在上一節中,​

​InstanceController​

​​中的​

​doSrvIpxt()​

​方法中,有這樣一行代碼:

pushService.addClient(namespaceId, serviceName, clusters, agent,
                      new InetSocketAddress(clientIP, udpPort),
                           pushDataSource, tid, app);      

其實是把消費者的​

​UDP​

​​端口、IP等資訊封裝為一個​

​PushClient​

​​對象,存儲​

​PushService​

​中。友善以後服務變更後推送消息。

​PushService​

​​類本身實作了​

​ApplicationListener​

​接口:

Nacos源碼分析專題(四)-服務發現

這個是事件監聽器接口,監聽的是​

​ServiceChangeEvent​

​(服務變更事件)。當服務清單變化時,就會通知我們:

Nacos源碼分析專題(四)-服務發現

3.總結

Nacos的服務發現分為兩種模式:

模式一:主動拉取模式,消費者定期主動從​

​Nacos​

​拉取服務清單并緩存起來,再服務調用時優先讀取本地緩存中的服務清單。