天天看點

Nacos源碼分析十七、服務端執行個體注冊分析

本篇讨論nacos服務端對于執行個體注冊的處理流程。

先回想一下用戶端如何發起注冊請求的,代碼在NacosNamingService的registerService方法:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

    if (instance.isEphemeral()) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());

        beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }

    serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
           

臨時節點添加心跳保持,然後調用serverProxy.registerService注冊服務:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
        namespaceId, serviceName, instance);

    final Map<String, String> params = new HashMap<String, String>(9);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));

    reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}
           

不往裡跟了,url是/nacos/v1/ns/instance。下面我們把目光轉向服務端。首先是接收的接口InstanceController類,對應的方法是:

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    
    final Instance instance = parseInstance(request);
    
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}
           

首先根據請求封裝一個Instance執行個體對象,然後調用serviceManager的registerInstance方法進行執行個體注冊:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    // 建立服務
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());

    // 擷取服務
    Service service = getService(namespaceId, serviceName);

    // 再取一次,如果是空,可能是沒有心跳被移除了
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }

    // 添加執行個體
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
           

同一個服務可能有多個執行個體,是以這裡隻是提供了執行個體接口,先判斷服務是否存在,如果不存在則建立一個新的,最後把執行個體添加進去。

createEmptyService 建立服務

跟一下代碼到createServiceIfAbsent方法:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException {
    // 先擷取服務
    Service service = getService(namespaceId, serviceName);
    // 如果服務不存在則建立一個
    if (service == null) {
        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        // new 一個
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        //計算校驗和
        service.recalculateChecksum();
        //有叢集要添加
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        //服務驗證,服務和叢集名驗證
        service.validate();

        //服務初始化
        putServiceAndInit(service);
        if (!local) {
            //永久服務還要添加到一緻性服務裡
            addOrReplaceService(service);
        }
    }
}
           

getService

先看一下服務是否存在,如果不存在則建立一個新的。看一下getService方法:

public Service getService(String namespaceId, String serviceName) {
    if (serviceMap.get(namespaceId) == null) {
        return null;
    }
    // 對應命名空間下的服務名
    return chooseServiceMap(namespaceId).get(serviceName);
}
public Map<String, Service> chooseServiceMap(String namespaceId) {
	return serviceMap.get(namespaceId);
}
           

這個serviceMap的結構是Map<namespaceId,Map<serviceName,Service>>。

服務初始化putServiceAndInit方法的過程:

private void putServiceAndInit(Service service) throws NacosException {
    //添加到命名空間中
    putService(service);
    //心跳初始化
    service.init();
    //生成key放入一緻性服務裡,永久的和臨時的
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
           

putService就是往serviceMap中添加新的服務對象。

init方法:

public void init() {
    // 建立一個5秒的周期任務, 檢查心跳 延遲5秒,周期5秒
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}
           

啟一個clientBeatCheckTask任務。 下面關于Cluster的暫時先不看。 這裡暫時沒有。

我們看一下這個任務的執行邏輯:

@Override
public void run() {
    try {
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }
        
        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }

        // 目前service的所有執行個體
        List<Instance> instances = service.allIPs(true);
        
        // first set health status of instances:
        for (Instance instance : instances) {
            // 超過健康檢查時間
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                        instance.getIp(), instance.getPort(), instance.getClusterName(),
                                        service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                        instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }
        
        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }
        
        // then remove obsolete instances:
        for (Instance instance : instances) {
            
            if (instance.isMarked()) {
                continue;
            }

            // 超過删除檢查時間
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                        JacksonUtils.toJson(instance));
                deleteIp(instance);
            }
        }
        
    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }
    
}
           

檢查該服務下的所有執行個體,如果最後上報的心跳時間超過了InstanceHeartBeatTimeOut時間,則釋出事件,這裡釋出了兩個事件ServiceChangeEvent和InstanceHeartbeatTimeoutEvent事件。

然後再檢查是否超過了删除時間,如果過了删除時間,則執行deleteIp删除:

private void deleteIp(Instance instance) {
    
    try {
        NamingProxy.Request request = NamingProxy.Request.newRequest();
        request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
                .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
                .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
        
        String url = "http://127.0.0.1:" + ApplicationUtils.getPort() + ApplicationUtils.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
        
        // delete instance asynchronously:
        HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() {
            @Override
            public Object onCompleted(Response response) throws Exception {
                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                    Loggers.SRV_LOG
                            .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
                                    instance.toJson(), response.getResponseBody(), response.getStatusCode());
                }
                return null;
            }
        });
        
    } catch (Exception e) {
        Loggers.SRV_LOG
                .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
    }
}
           

調的是本機自己的删除執行個體接口。

ServiceChangeEvent

回來看心跳逾時的兩個事件。首先是ServiceChangeEvent。處理的監聽器是PushService,我們看一下它的實作:

@Override
public void onApplicationEvent(ServiceChangeEvent event) {
    Service service = event.getService();
    String serviceName = service.getName();
    String namespaceId = service.getNamespaceId();
    
    Future future = GlobalExecutor.scheduleUdpSender(() -> {
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            //擷取所有需要推送的PushClient
            ConcurrentMap<String, PushClient> clients = clientMap
                    .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            if (MapUtils.isEmpty(clients)) {
                return;
            }
            
            Map<String, Object> cache = new HashMap<>(16);
            long lastRefTime = System.nanoTime();
            for (PushClient client : clients.values()) {
                //逾時的不删除不處理
                if (client.zombie()) {
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    clients.remove(client.toString());
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    continue;
                }
                
                Receiver.AckEntry ackEntry;
                Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                byte[] compressData = null;
                Map<String, Object> data = null;
                //有壓縮資料
                if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                    org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                    compressData = (byte[]) (pair.getValue0());
                    data = (Map<String, Object>) pair.getValue1();
                    
                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                }

                //準備UDP資料
                if (compressData != null) {
                    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                } else {
                    ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                    if (ackEntry != null) {
                        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                    }
                }
                
                Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                        client.getServiceName(), client.getAddrStr(), client.getAgent(),
                        (ackEntry == null ? null : ackEntry.key));

                //發送
                udpPush(ackEntry);
            }
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
            
        } finally {
            //發送完删除
            futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        }
        
    }, 1000, TimeUnit.MILLISECONDS);
    
    futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
    
}
           

可以看到就是發送UDP資料通知所有用戶端。我們看一下udpPush:

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    if (ackEntry == null) {
        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
        return null;
    }

    //大于嘗試的次數 記錄日志,failedPush+1,移除待發送資料,确認ackMap移除對應的key
    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return ackEntry;
    }
    
    try {
        if (!ackMap.containsKey(ackEntry.key)) {
            totalPush++;
        }
        ackMap.put(ackEntry.key, ackEntry);
        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
        
        Loggers.PUSH.info("send udp packet: " + ackEntry.key);
        //發送UDP封包
        udpSocket.send(ackEntry.origin);
        
        ackEntry.increaseRetryTime();

        //10秒沒應答就再嘗試一次
        // Retransmitter裡面重新調用udpPush
        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
        
        return ackEntry;
    } catch (Exception e) {
        Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                ackEntry.origin.getAddress().getHostAddress(), e);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        
        return null;
    }
}
           

注意UDP的不可靠性,這裡會重新嘗試。我們看一下Retransmitter:

public static class Retransmitter implements Runnable {
    
    Receiver.AckEntry ackEntry;
    
    public Retransmitter(Receiver.AckEntry ackEntry) {
        this.ackEntry = ackEntry;
    }
    
    @Override
    public void run() {
        //沒接受到響應。收到響應後會從ackMap中移除
        if (ackMap.containsKey(ackEntry.key)) {
            Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);
            // 再次發送
            udpPush(ackEntry);
        }
    }
}
           

還是調用udpPush。

InstanceHeartbeatTimeoutEvent

這個事件沒找到哪兒處理的,有興趣的朋友可以找找看。

consistencyService的listen

consistencyService
        .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
        .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
           

一緻性服務監聽兩個key,一個是臨時的,一個是永久的。 這裡的key字首是com.alibaba.nacos.naming.iplist.,如果是臨時的,後面多個ephemeral.

然後是DelegateConsistencyServiceImpl代理一緻性服務

@Override
public void listen(String key, RecordListener listener) throws NacosException {
    
    // this special key is listened by both:
    // 特殊字首key兩個都監聽
    // com.alibaba.nacos.naming.domains.meta.
    if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
        persistentConsistencyService.listen(key, listener);
        ephemeralConsistencyService.listen(key, listener);
        return;
    }

    // 根據key設定不同的一緻性監聽
    mapConsistencyService(key).listen(key, listener);
}
 private ConsistencyService mapConsistencyService(String key) {
     // com.alibaba.nacos.naming.iplist.ephemeral.
     return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
 }
           

我們看一下臨時的DistroConsistencyServiceImpl

@Override
public void listen(String key, RecordListener listener) throws NacosException {
    // ConcurrentLinkedQueue隊列
    if (!listeners.containsKey(key)) {
        listeners.put(key, new ConcurrentLinkedQueue<>());
    }

    // 二次确認不重複添加
    if (listeners.get(key).contains(listener)) {
        return;
    }
    // 添加到隊列中
    listeners.get(key).add(listener);
}
           

就是添加到監聽隊列裡。我們看一下監聽什麼。首先看一下DistroConsistencyServiceImpl的初始化方法:

@PostConstruct
public void init() {
    GlobalExecutor.submitLoadDataTask(loadDataTask);
    GlobalExecutor.submitDistroNotifyTask(notifier);
}
           

添加了兩個任務,loadDataTask是啟動加載的,這個以後再說。現在看notifier

@Override
public void run() {
    Loggers.DISTRO.info("distro notifier started");

    // 無限循環
    for (; ; ) {
        try {
            // 消費隊列
            Pair<String, ApplyAction> pair = tasks.take();
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}
           

當有task添加進來後,這裡從隊列中取出來,然後處理:

private void handle(Pair<String, ApplyAction> pair) {
    try {
        String datumKey = pair.getValue0();
        ApplyAction action = pair.getValue1();

        // 消費後移除key
        services.remove(datumKey);
        
        int count = 0;

        // 沒有監聽直接退出
        if (!listeners.containsKey(datumKey)) {
            return;
        }

        // 周遊監聽器,調動對應的監聽方法
        for (RecordListener listener : listeners.get(datumKey)) {
            
            count++;
            
            try {
                if (action == ApplyAction.CHANGE) {
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }
                
                if (action == ApplyAction.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }
        
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                    .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}
           

可以看到監聽onChange和onDelete。也就是說每個服務service建立出來後會添加到監聽器清單中,當出現修改和删除時處理對應的變更。

addOrReplaceService

當要添加的服務不是臨時服務時,會調用這個方法:

public void addOrReplaceService(Service service) throws NacosException {
    consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);
}
           

然後是RaftConsistencyServiceImpl的put方法:

@Override
public void put(String key, Record value) throws NacosException {
    try {
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft put failed.", e);
        throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
                e);
    }
}
           

然後是signalPublish方法:

public void signalPublish(String key, Record value) throws Exception {

    //不是leader
    if (!isLeader()) {
        ObjectNode params = JacksonUtils.createEmptyJsonNode();
        params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);
        
        final RaftPeer leader = getLeader();

        //交給leader去做/v1/ns/raft/datum
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }
    
    try {
        // 是leader
        // 加鎖
        OPERATE_LOCK.lock();
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }
        
        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));

        //釋出資料改變通知  peers是所有節點集合. peers.local擷取本機
        onPublish(datum, peers.local());
        
        final String content = json.toString();

        // 過半同步成功才會響應,也就是說put操作需要過半同步成功,強一緻性 CP模型
        //隻要過半的結點數
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        //周遊所有結點
        for (final String server : peers.allServersIncludeMyself()) {
            //自己算一次
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            // /v1/ns/raft/datum/commit
            final String url = buildUrl(server, API_ON_PUB);
            // 異步同步資料
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
                    new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT
                                        .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                                datum.key, server, response.getStatusCode());
                                return 1;
                            }
                            // 處理完成coundDown
                            latch.countDown();
                            return 0;
                        }
                        
                        @Override
                        public STATE onContentWriteCompleted() {
                            return STATE.CONTINUE;
                        }
                    });
            
        }

        //等待半數完成  還有個5秒逾時時間
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            // only majority servers return success can we consider this update success
            Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
            throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
        }
        
        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        // 解鎖
        OPERATE_LOCK.unlock();
    }
}
           

可以看到這裡涉及到raft部分的了。如果目前節點不是leader,則轉發給leader處理。

如果是leader,onPublish進行本地資料儲存,釋出change任務。 然後通知所有從節點,使用CountDownLatch計數,當有超過一半+1個響應了,則送出完成了。我們看一下CountDownLatch的個數怎麼來的peers.majorityCount():

public int majorityCount() {
    return peers.size() / 2 + 1;
}
           

節點數/2+1,即半數+1個。

回來看一下onPublish方法:

public void onPublish(Datum datum, RaftPeer source) throws Exception {
    RaftPeer local = peers.local();
    if (datum.value == null) {
        Loggers.RAFT.warn("received empty datum");
        throw new IllegalStateException("received empty datum");
    }

    //不是leader不能幹這個事
    if (!peers.isLeader(source.ip)) {
        Loggers.RAFT
                .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                        JacksonUtils.toJson(getLeader()));
        throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
    }

    //過時了
    if (source.term.get() < local.term.get()) {
        Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                JacksonUtils.toJson(local));
        throw new IllegalStateException(
                "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
    }

    //重置任期
    local.resetLeaderDue();
    
    // if data should be persisted, usually this is true:
    if (KeyBuilder.matchPersistentKey(datum.key)) {
        raftStore.write(datum);
    }

    //放入資料
    datums.put(datum.key, datum);
    
    if (isLeader()) {
        local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
    } else {
        if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
            //set leader term:
            getLeader().term.set(source.term.get());
            local.term.set(getLeader().term.get());
        } else {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        }
    }
    raftStore.updateTerm(local.term.get());

    //添加任務
    notifier.addTask(datum.key, ApplyAction.CHANGE);
    
    Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
           

raftStore做了本地檔案存儲。然後addTask添加change任務。

至此新增服務部分完成了,下面是往服務裡添加執行個體。

addInstance

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {

    //獲得服務執行個體key
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

    //再次擷取服務
    Service service = getService(namespaceId, serviceName);

    // 加鎖
    synchronized (service) {
        //添加并擷取所有該服務的執行個體
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        
        Instances instances = new Instances();
        //設定到包裝對象裡
        instances.setInstanceList(instanceList);

        //放進一緻性服務裡。這裡根據key來選擇是臨時性的還是永久性的
        consistencyService.put(key, instances);
    }
}
           

首先是添加新的執行個體

addIpAddresses

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
           

updateIpAddresses方法:

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
        throws NacosException {

    //擷取老的執行個體集合資料
    Datum datum = consistencyService
            .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

    //擷取叢集中所有相關的執行個體集合,臨時的或者是永久的
    List<Instance> currentIPs = service.allIPs(ephemeral);
    //IP端口和執行個體的映射
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    //執行個體ID集合
    Set<String> currentInstanceIds = Sets.newHashSet();

    //放入對應的集合裡
    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }

    //更新後的老的執行個體集合
    Map<String, Instance> instanceMap;
    if (datum != null) {
        //根據目前服務執行個體的健康标志和心跳時間,來更新老的執行個體集合資料
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    } else {
        //重新建立一個
        instanceMap = new HashMap<>(ips.length);
    }

    //周遊新的執行個體
    for (Instance instance : ips) {
        //不存在就建立服務執行個體叢集
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            //初始化,開啟叢集心跳檢查
            cluster.init();
            //添加服務執行個體叢集
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                            instance.getClusterName(), instance.toJson());
        }

        //删除操作的話就删除老的執行個體集合的資料
        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        } else {
            //否則添加
            instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            instanceMap.put(instance.getDatumKey(), instance);
        }
        
    }
    
    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(
                "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                        .toJson(instanceMap.values()));
    }

    //傳回總的執行個體集合
    return new ArrayList<>(instanceMap.values());
}
           

這是一個統一的更新操作,新舊比較後處理完傳回新的資料

然後丢到一緻性服務裡

consistencyService.put

前面分析過RaftConsistencyServiceImpl的put了,這裡我們看一下臨時的DistroConsistencyServiceImpl的put方法:

@Override
public void put(String key, Record value) throws NacosException {
    onPut(key, value);
    // 臨時一緻性協定的同步資料。這裡同步資料是異步任務執行的,也就是說先傳回用戶端put成功再同步,弱一緻性。 AP模型
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), ApplyAction.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}
           

首先是onPut方法:

public void onPut(String key, Record value) {
    
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        //建立臨時資料
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        //放進一個map裡
        dataStore.put(key, datum);
    }

    //沒有監聽器就傳回
    if (!listeners.containsKey(key)) {
        return;
    }

    //有監聽立即通知服務有改變
    notifier.addTask(key, ApplyAction.CHANGE);
}
           

可以看到addTask一個change任務。即新執行個體添加變更事件。

下面是distroProtocol.sync:

public void sync(DistroKey distroKey, ApplyAction action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        // holder持有臨時同步延遲執行器引擎,引擎中有NacosTaskProcessor,臨時一緻性情況下實際上持有的是DistroDelayTaskProcessor,添加任務後最終由processor執行
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
        }
    }
}
           

這裡是添加了distroDelayTask任務,通過DistroDelayTaskProcessor處理:

@Override
public boolean process(AbstractDelayTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    // 發起臨時一緻性同步任務
    if (ApplyAction.CHANGE.equals(distroDelayTask.getAction())) {
        DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
        distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask);
        return true;
    }
    return false;
}
           

syncChangeTask是異步線程了,我們看一下他的run方法:

@Override
public void run() {
    Loggers.DISTRO.info("[DISTRO-START] {}", toString());
    try {
        String type = getDistroKey().getResourceType();
        DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
        distroData.setType(ApplyAction.CHANGE);
        // syncData執行資料同步,交由 NamingProxy.syncData執行 /nacos/v1/ns/distro/datum
        boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
        // 同步失敗重試,就是重新送出任務
        if (!result) {
            handleFailedTask();
        }
        Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
    } catch (Exception e) {
        Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
        handleFailedTask();
    }
}
           

發送rest請求,失敗重試。

總結一下

到這裡nacos服務端處理服務注冊基本分析完了。簡單總結一下:

  1. 首先看情況是否建立一個新的Service。
    1. 初始化時建立資料初始化加載任務和心跳檢測任務
    2. 建立Service後需要添加監聽,用于監聽資料的修改和删除操作。
  2. 新服務Service 建立完成後需要把新的執行個體添加進去
    1. 新老執行個體比較一下進行更新
    2. 觸發一緻性服務的put方法,會先進行本地通知,然後進行叢集資料同步。
  3. 過程中我們分析了兩種叢集同步資料的方式
    1. AP模型,弱一緻性。通過建立DistroSyncChangeTask任務進行通知。主線程不等待叢集通知結果先傳回給使用者。
    2. CP模型,強一緻性。通過RaftCore的signalPublish方法進行資料同步,要求半數以上有響應才會成功。