天天看点

Nacos服务注册与发现中心之服务注册(服务端代码)入口URL

上文讲解了Nacos服务注册与发现中心之服务注册的客户端代码,现在继续将服务注册时,Nacos服务端的源码

Nacos服务注册与发现中心之服务注册: 服务端代码

  • 入口URL
    • Nacos服务端服务注册源码
      • InstanceController
      • ServiceManager.registerInstance()
        • Service类
        • Service.init()
        • 我们再回到ServiceManager.registerInstance()方法
          • consistencyService.put(key, instances);

入口URL

从上篇文章我们可以知道使用SpringCloud集成的Nacos,客户端是通过发送HTTP请求的方式进行服务注册的,那么服务端的代码入口就是一个HTTP的请求地址了,从客户端代码可以得出请求的URI为:

/nacos/v1/ns/instance

,方法为

PUT

Nacos服务端服务注册源码

我们研读的源码的Nacos分支为

1.0.0-RC3

,Nacos2.0开始发生了比较大的重构,但是我们目前使用较多的版本依然为Nacos1的版本及其分支。

InstanceController

我们先来看到URI:

/nacos/v1/ns/instance

的处理类

com.alibaba.nacos.naming.controllers.InstanceController

及其对应的方法

com.alibaba.nacos.naming.controllers.InstanceController.register()

@CanDistro
    @RequestMapping(value = "", method = RequestMethod.POST)
    public String register(HttpServletRequest request) throws Exception {

        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

        serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
        return "ok";
    }
           

从此处可以看到,主要的逻辑为:从请求参数中获取服务名称和服务所在的命名空间的ID的值,然后通过

parseInstance(request)

方法创建一个Instantce对象。最后调用ServiceManager.registerInstance()方法进行服务注册。

我们稍微再看一眼

parseInstance(request)

方法:

private Instance parseInstance(HttpServletRequest request) throws Exception {

        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        String app = WebUtils.optional(request, "app", "DEFAULT");
        String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);

        Instance instance = getIPAddress(request);
        instance.setApp(app);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.generateInstanceId());
        instance.setLastBeat(System.currentTimeMillis());
        if (StringUtils.isNotEmpty(metadata)) {
            instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));
        }

        if (!instance.validate()) {
            throw new NacosException(NacosException.INVALID_PARAM, "instance format invalid:" + instance);
        }

        return instance;
    }
           

我们主要关注这个行代码:

instance.setLastBeat(System.currentTimeMillis());

这一行代码初始化了注册该服务的实例的最后一次心跳时间,这个用于后续Nacos服务端进行心跳检查时使用。

ServiceManager.registerInstance()

接着我们来看

ServiceManager.registerInstance()

方法:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
		
		// 1
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
		
		// 。。。 省略不关注的代码
		
		// 2
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
           

为了方便,我在注释上标注了1,2;

我们先来看1处的代码的源码:

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
        Service service = getService(namespaceId, serviceName);
        if (service == null) {

            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(Constants.DEFAULT_GROUP);
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            service.validate();
            if (local) {
                putService(service);
                service.init();
                consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
                consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
            } else {
                addOrReplaceService(service);
            }
        }
    }
           

可以看到,如果该服务是第一次有实例进行注册的话,会进行第一次的初始化操作,此时传入的参数:

local

为:

instance.isEphemeral()

,如果对上篇文章还有印象的话,这表示的是该实例是否为临时实例,默认值为

true

,所以此处如果我们不做另外设置的话,local都为

true

,看源码可以知道,此时会将service放入到本地缓存并且调用service.init()方法进行初始化。

Service类

在看Servcie.init()方法前,我们先来看看Service类(主要看所拥有的字段):

public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {

    private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+";

    @JSONField(serialize = false)
    private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);

    private String token;
    private List<String> owners = new ArrayList<>();
    private Boolean resetWeight = false;
    private Boolean enabled = true;
    private Selector selector = new NoneSelector();
    private String namespaceId;

    /**
     * IP will be deleted if it has not send beat for some time, default timeout is 30 seconds.
     */
    private long ipDeleteTimeout = 30 * 1000;

    private volatile long lastModifiedMillis = 0L;

    private volatile String checksum;

    /**
     * TODO set customized push expire time:
     */
    private long pushCacheMillis = 0L;

    private Map<String, Cluster> clusterMap = new HashMap<String, Cluster>();
// ... 此处省略掉暂不关心的字段和成员方法
    }
           

可以看到Service类其实并不是直接保存其下多个Instance实例的,其实它们都是保存到了

clusterMap

中,我们来看

Cluster

类的代码:

public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {

    private static final String CLUSTER_NAME_SYNTAX = "[0-9a-zA-Z-]+";
    /**
     * a addition for same site routing, can group multiple sites into a region, like Hangzhou, Shanghai, etc.
     */
    private String sitegroup = StringUtils.EMPTY;

    private int defCkport = 80;

    private int defIPPort = -1;

    @JSONField(serialize = false)
    private HealthCheckTask checkTask;

	// 持久化实例SET
    @JSONField(serialize = false)
    private Set<Instance> persistentInstances = new HashSet<>();
	
	// 临时实例SET
    @JSONField(serialize = false)
    private Set<Instance> ephemeralInstances = new HashSet<>();

    @JSONField(serialize = false)
    private Service service;

    @JSONField(serialize = false)
    private volatile boolean inited = false;

    private Map<String, String> metadata = new ConcurrentHashMap<>();
    }
           

可以看到该类保持了持久化实例和临时实例的Set。

最后再来看Service类的父类

com.alibaba.nacos.api.naming.pojo.Service

public class Service {
    /**
     * service name
     */
    private String name;

    /**
     * protect threshold
     */
    private float protectThreshold = 0.0F;

    /**
     * application name of this service
     */
    private String appName;

    /**
     * Service group to classify services into different sets.
     */
    private String groupName;

    private Map<String, String> metadata = new HashMap<String, String>();
}
           

由此我们可以引出Nacos服务注册与发现中心,服务数据结构的设计为:

Nacos服务注册与发现中心之服务注册(服务端代码)入口URL

以用户服务为例,我们可以这么分配为:一个科技公司营销业务线(NameSpace)的订单服务(Service)可以存在多个不同的地区分布:华东,华南(GROUP),每个地区有多个机房,深圳,广州(Cluster),每个机房部署多个实例:192.168.1.1:8080、192.168.1.2:8080(Instance)。

Service.init()

接下来我们来看

Service.init()

方法的源码:

public void init() {

        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);

        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }
           

从代码可以看到,主要是创建了一个定时任务,并且从命名就可以看出是做客户端心跳检测的一个定时任务;其后是进行Cluster的初始化,这个我们暂且跳过,我们来看创建定时任务的代码:

public static void scheduleCheck(ClientBeatCheckTask task) {
        futureMap.putIfAbsent(task.taskKey(), EXECUTOR.scheduleWithFixedDelay(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }
           

可以看到这里创建了一个每5秒钟运行一次的定时任务,我们可以得出一个结论Nacos服务注册与发现中心对每一个服务,每5秒钟会进行一次心跳超时检查。

接下来看心跳检查定时任务的代码:

@Override
    public void run() {
        try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }

            List<Instance> instances = service.allIPs(true);
			
			// 1
            // first set health status of instances:
            for (Instance instance : instances) {
                if (System.currentTimeMillis() - instance.getLastBeat() > ClientBeatProcessor.CLIENT_BEAT_TIMEOUT) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                instance.getIp(), instance.getPort(), instance.getClusterName(),
                                UtilsAndCommons.LOCALHOST_SITE, ClientBeatProcessor.CLIENT_BEAT_TIMEOUT, instance.getLastBeat());
                            getPushService().serviceChanged(service.getNamespaceId(), service.getName());
                        }
                    }
                }
            }
			
			// 2
            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }
			
			// 3
            // then remove obsolete instances:
            for (Instance instance : instances) {
                if (System.currentTimeMillis() - instance.getLastBeat() > service.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
                    deleteIP(instance);
                }
            }

        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }

    }
           

同样,我对代码关键点打了1,2,3的标注,我们先来看1处,首先会获取Service下所有的ip节点,即所有的实例,遍历所有实例,进行心跳超时检查,检查的逻辑为:

System.currentTimeMillis() - instance.getLastBeat() > ClientBeatProcessor.CLIENT_BEAT_TIMEOUT

, 这段代码的意思就是最后一次进行心跳的时间如果大于

ClientBeatProcessor.CLIENT_BEAT_TIMEOUT

,则视为该实例心跳超时,而

ClientBeatProcessor.CLIENT_BEAT_TIMEOUT

的值为15秒:

因为我们又可以得出一个结论:Nacos服务注册与发现中心对每一个服务每5秒钟进行一次心跳超时检查,如有实例的最后一次心跳时间超过15秒,则视心跳超时。并且此时会更改服务的健康状态,并且推送该状态变更给服务的消费者(后续讲服务消费者会补充)

我们再来看:2,此处会判断全局配置的

isExpireInstance

的值,而该值默认为

true

,表示允许对过期的实例进行删除,当然也可以进行配置,配置为false,表示不允许删除过期的实例。

@Value("${nacos.naming.expireInstance}")
private boolean expireInstance = true;
           

我们再来看:3,此处会再次遍历服务下所有的实例,判断逻辑为

System.currentTimeMillis() - instance.getLastBeat() > service.getIpDeleteTimeout()

表示:如果该实例最后一次心跳时间超过

service.getIpDeleteTimeout()

则对实例进行删除

service.getIpDeleteTimeout()

的值为30秒:

/**
     * IP will be deleted if it has not send beat for some time, default timeout is 30 seconds.
     */
    private long ipDeleteTimeout = 30 * 1000;
           

因此我们可以继续补充上诉结论:Nacos服务注册与发现中心对每一个服务每5秒钟进行一次心跳超时检查,如有该实例的最后一次心跳时间超过15秒,则视心跳超时。并且如果心跳超时时间超过30秒,则会对实例进行删除(摘除)

最后的删除实例的逻辑,我们会在后面解读,本文暂不研究。

我们再回到ServiceManager.registerInstance()方法

由于从该方法标注的:1处引申了太多,我们现在重新看回标注2:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
		
		// 1
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
		
		// 。。。 省略不关注的代码
		
		// 2
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
           

我们现在进行实例的添加:

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {

        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        Service service = getService(namespaceId, serviceName);
		// 1
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
		
		// 2
        consistencyService.put(key, instances);
    }
           

我们来看:1处的代码,

addIpAddresses

方法会将当前注册的实例添加到对应服务所有的实例列表中:

public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
        return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
    }

    public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
		// 3
        Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

        Map<String, Instance> oldInstanceMap = new HashMap<>(16);
        List<Instance> currentIPs = service.allIPs(ephemeral);
        Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());

        for (Instance instance : currentIPs) {
            map.put(instance.toIPAddr(), instance);
        }
        if (datum != null) {
            oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);
        }

        // use HashMap for deep copy:
        HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
        instanceMap.putAll(oldInstanceMap);
		
		// 4
        for (Instance instance : ips) {
            if (!service.getClusterMap().containsKey(instance.getClusterName())) {
                Cluster cluster = new Cluster(instance.getClusterName());
                cluster.setService(service);
                cluster.init();
                service.getClusterMap().put(instance.getClusterName(), cluster);
                Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                    instance.getClusterName(), instance.toJSON());
            }

            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
                instanceMap.remove(instance.getDatumKey());
            } else {
                instanceMap.put(instance.getDatumKey(), instance);
            }

        }

        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
            throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "
                + JSON.toJSONString(instanceMap.values()));
        }

        return new ArrayList<>(instanceMap.values());
    }

           

从3处,会根据注册的实例对应的服务,构建一个KEY,通过该KEY从Datum中获取到服务下的所有实例的一个Map结构数据,(Datum为Nacos进行持久化的一个键值对数据库:当然我们也可以配置成使用Mysql来进行持久化),获取到数据后,会创建一个map变量,将当前服务的所有实例数据PUT到map中,然后会调用

setValid

方法对map和

Datum中的数据进行比较和进行一些数据的同步:

private Map<String, Instance> setValid(List<Instance> oldInstances, Map<String, Instance> map) {

        Map<String, Instance> instanceMap = new HashMap<>(oldInstances.size());
        for (Instance instance : oldInstances) {
            Instance instance1 = map.get(instance.toIPAddr());
            if (instance1 != null) {
                instance.setHealthy(instance1.isHealthy());
                instance.setLastBeat(instance1.getLastBeat());
            }
            instanceMap.put(instance.getDatumKey(), instance);
        }
        return instanceMap;
    }
           

从代码可以看出,主要是进行

健康状态

最后一个心跳时间

的同步,最后同步后的结果会赋值给

oldInstanceMap

变量;然后会新建一个

instanceMap

变量,将

oldInstanceMap

的数据都赋值给

instanceMap

(进行一次深度拷贝)

最后我们再来看:4处,此处主要是遍历注册的实例(其实一般只有一个),然后通过实例的获取到它的Datum的KEY,PUT到

instanceMap

中。

consistencyService.put(key, instances);

让我们目光回到

addInstance()

方法,调用完

addIpAddresses

方法后,通过返回的最新的所注册服务的所有实例列表,最后执行了这行代码

consistencyService.put(key, instances);

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {

        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        Service service = getService(namespaceId, serviceName);
		// 1
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
		
		// 2
        consistencyService.put(key, instances);
    }
           

该方法有多个不同的实现类:

Nacos服务注册与发现中心之服务注册(服务端代码)入口URL

其实不管是Delegate、Distro还是Raft都是分布式一致性算法的一种,所以这里很明显就是做Nacos集群的同步的了,Nacos默认使用的是

Delegate

,至于这几种方法,会在后续补充。

END

下一篇讲Nacos服务注册与发现中心之心跳检查的服务端代码