天天看点

Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册

Eureka Client 源码解析 获取注册表、客户端注册

1. EurekaClient 构造器中的流程:

接下来我们准备开始真正分析Eureka Client 的源码,上一章我们分析了Eureka Client的自动配置类都加载了哪些东西,其中最为核心的就是EurekaClient:

Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册

先简单看一下CloudEurekaClient构造器中的流程,大概看一下都做了哪些事,接下来我们会一一分析:

Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册

2. EurekaClient 构造器跟踪

入口:

Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册

跟CloudEurekaClient的构造:

//CloudEurekaClient.java
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
		EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
		ApplicationEventPublisher publisher) {
	//继续跟super方法 
	//CloudEurekaClient 继承自 DiscoveryClient
	super(applicationInfoManager, config, args);
	this.applicationInfoManager = applicationInfoManager;
	this.publisher = publisher;
	this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
			"eurekaTransport");
	ReflectionUtils.makeAccessible(this.eurekaTransportField);
}

//DiscoveryClient.java
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
	//继续跟this方法
    this(applicationInfoManager, config, args, ResolverUtils::randomize);
}

//DiscoveryClient.java
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
	//此时一共5个参数
	//第四个参数是个匿名内部类实例,主要是提供一个备用注册表的功能(当远程注册表获取失败的时候)
    this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
        private volatile BackupRegistry backupRegistryInstance;

        @Override
        //返回BackupRegistry,备份注册表
        //Eureka是AP的,高可用,当Eureka Server全部挂掉了,这个备份注册表实例就起作用了
        //当然这个备份注册表需要自己实现并配置才能用
        public synchronized BackupRegistry get() {
            if (backupRegistryInstance == null) {
                String backupRegistryClassName = config.getBackupRegistryImpl();
                if (null != backupRegistryClassName) {
                    try {
                        backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                        logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                    } catch (InstantiationException e) {
                        logger.error("Error instantiating BackupRegistry.", e);
                    } catch (IllegalAccessException e) {
                        logger.error("Error instantiating BackupRegistry.", e);
                    } catch (ClassNotFoundException e) {
                        logger.error("Error instantiating BackupRegistry.", e);
                    }
                }

                if (backupRegistryInstance == null) {
                    logger.warn("Using default backup registry implementation which does not do anything.");
                    backupRegistryInstance = new NotImplementedRegistryImpl();
                }
            }

            return backupRegistryInstance;
        }
    }, randomizer);
}
           

继续跟this构造,这个方法很长,我们只关注最核心的:

//DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
	
	...//省略了很多代码

	//这里fetchRegistry方法做的事情就是我们第一个需要关注的任务
	//获取注册表
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

    // call and execute the pre registration handler before all background tasks (inc registration) is started
    if (this.preRegistrationHandler != null) {
        this.preRegistrationHandler.beforeRegistration();
    }

    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
	        //第二个任务,register方法,进行注册
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    //第三个任务,初始化定时任务
    initScheduledTasks();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());
}
           
Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册

3. 获取客户端注册表

Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册
Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册
Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册

3.1 获取备用注册表:

现在看获取注册表的方法fetchRegistry,该方法返回值true/false代表是否获取成功

先看获取失败的情况,如果需要获取,但获取失败,就会获取备用的注册表:

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
	//如果应该获取,但获取失败,就会获取备用的注册表
    fetchRegistryFromBackup();
}

//DiscoveryClient.java
private void fetchRegistryFromBackup() {
    try {
    	//先直接获取备用注册表实例,默认返回null,除非子类重写该方法
        @SuppressWarnings("deprecation")
        BackupRegistry backupRegistryInstance = newBackupRegistryInstance();
        if (null == backupRegistryInstance) { // backward compatibility with the old protected method, in case it is being used.
        
	        //这个backupRegistryProvider就是之前构造中传入的匿名内部类
	        //备用注册表的提供者,通过提供者获取备用注册表实例
	        //没有专门实现,这里返回的就是NotImplementedRegistryImpl(之前构造中可以看到)
            backupRegistryInstance = backupRegistryProvider.get();
        }

        if (null != backupRegistryInstance) {
	        //如果这个备用注册表实例不为空
            Applications apps = null;
            if (isFetchingRemoteRegionRegistries()) {//判断是否可以从远程region获取
	            //获取配置的远程region列表
                String remoteRegionsStr = remoteRegionsToFetch.get();
                if (null != remoteRegionsStr) {
	                //从远程region获取注册表,apps就是Applications
	                //Applications就是注册表,map结构,key是微服务名称,value是Application
	                //Application也是map结构,key是InstanceInfo的Id,value是InstanceInfo
	                //一个InstanceInfo对应的就是一个主机信息
                    apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(","));
                }
            } else {
	            //不可以从远程region获取,则获取本地备用注册表
                apps = backupRegistryInstance.fetchRegistry();
            }
            if (apps != null) {
	            //如果apps不空,则将apps打散
                final Applications applications = this.filterAndShuffle(apps);
                applications.setAppsHashCode(applications.getReconcileHashCode());
                //将shuffle过的applications放到本地缓存
                localRegionApps.set(applications);
                logTotalInstances();
                logger.info("Fetched registry successfully from the backup");
            }
        } else {
            logger.warn("No backup registry instance defined & unable to find any discovery servers.");
        }
    } catch (Throwable e) {
        logger.warn("Cannot fetch applications from apps although backup registry was specified", e);
    }
}
           
判断是否可以从远程region获取:
Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册
该值我们可以通过配置文件进行配置,指定获取eureka注册表信息的远程region列表:
Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册
备用注册表Provider是之前构造中传入的一个匿名内部类:
Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册

3.2 从注册中心获取注册表

现在看从注册中心获取注册表的具体实现:

Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册
//DiscoveryClient.java
/**
 * Fetches the registry information.
 *
 * <p>
 * This method tries to get only deltas after the first fetch unless there
 * is an issue in reconciling eureka server and client registry information.
 * 除非在协调eureka服务器和客户端注册表信息时出现问题,否则此方法尝试在第一次获取后只获取delta。
 * 
 * 翻译一下:第一次是全量下载,后面就是增量下载
 * </p>
 *
 * @param forceFullRegistryFetch Forces a full registry fetch. 是否强制全量下载
 *
 * @return true if the registry was fetched 获取成功返回true,否则false
 */
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
	//这里入参forceFullRegistryFetch,代表含义是:true则代表强制全量下载
	//false就是有可能全量,也有可能是增量获取,视情况而定。
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        // 先获取当前本地缓存的注册表信息,刚启动这里肯定获取为空
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
        	//只要满足以上各种情况的任何一个条件,就会进行全量下载:
        	//	配置文件中关闭了增量下载
        	//	配置了VIP的注册表地址
        	//	强制进行全量下载
        	//	本地缓存的注册表信息为空
        	
			...//省略了各种info日志打印
			
            // 全量获取,获取存储全部注册表信息
            getAndStoreFullRegistry();
        } else {
	        // 增量获取,获取更新增量数据
	        //(所谓增量下载其实是获取服务端的最近更新队列的数据,以后会看到)
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}
           

3.2.1 全量下载

看getAndStoreFullRegistry方法:

//DiscoveryClient.java
private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
    //先尝试获取registry-refresh-single-vip-address配置
    //如果不为null走的是vip地址获取
    //为null,则走默认的
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
	    //获取响应体,就是Applications
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
	    //将注册表信息打乱放入本地缓存
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}
           
registry-refresh-single-vip-address配置:可以专门指定地址进行注册表下载更新
Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册

这里我们没有指定vip地址,所以走eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()):

Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册

可以看到实现中,通过Jersey 框架提交了get请求:

//AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
    return getApplicationsInternal("apps/", regions);
}

//AbstractJerseyEurekaHttpClient.java
//可以看到实现中,通过Jersey 框架提交了get请求
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
	    //SpringMVC的处理器是Controller,Jersey框架的处理器是Resource
	    //这里WebResource就是代表一个web资源
        WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
        if (regions != null && regions.length > 0) {
            regionsParamValue = StringUtil.join(regions);
            //从这个web资源创建一个新的WebResource,并将一个附加的查询参数添加到这个web资源的URI中。
	        //构建查询参数到URI中
            webResource = webResource.queryParam("regions", regionsParamValue);
        }
        //获取请求构建者
        Builder requestBuilder = webResource.getRequestBuilder();
        //添加请求头,客户端来说,没有特殊配置,这里不会添加任何请求头
        addExtraHeaders(requestBuilder);
        //指定接受json数据格式,并发起get请求
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

        Applications applications = null;
        if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
	        //获取响应,将响应体转成 注册表的数据结构
            applications = response.getEntity(Applications.class);
        }
        // 将结果封装成需要的数据结构返回
        return anEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                    serviceUrl, urlPath,
                    regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                    response == null ? "N/A" : response.getStatus()
            );
        }
        if (response != null) {
            response.close();
        }
    }
}
           

看requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class)的get方法:

//WebResource.java
@Override
public <T> T get(Class<T> c) throws UniformInterfaceException, ClientHandlerException {
	//看到这里提交的就是"GET"请求,下面就是Jersey 框架的东西了
	//当然还涉及到一些url的处理,就不跟了
    return handle(c, build("GET"));
}
           

请求发了,至于Eureka Server端怎么处理并响应,等将Eureka Server时在讲,看到全量下载还是比较简单的。

3.2.2 增量下载

下一章讲Eureka Client端的一些定时任务,其中定时更新注册表的任务也会用到增量下载,下一章说。

4. 客户端注册

现在看客户端注册:

Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册
  • shouldRegisterWithEureka

    指定是否需要向Eureka进行注册,该配置默认为true,一般Server端会设置为false

    Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册
  • shouldEnforceRegistrationAtInit

    在初始化时候是否注册,默认false

    Spring Cloud Eureka Client 源码解析(二)获取注册表、客户端注册1. EurekaClient 构造器中的流程:2. EurekaClient 构造器跟踪3. 获取客户端注册表4. 客户端注册

可以看到默认情况下这里的注册方法不会执行,即初始化的时候不会进行注册,那么这里不注册,在哪里注册呢?其实是在定时任务里注册的(心跳续约),但是注册逻辑是一样的,所以这里我们先分析这个方法:

PS:从代码中可以看到,如果设置为初始化时注册,会一个问题:如果注册失败了会抛异常,导致整个客户端启动就失败了,也不会启动后面的定时任务,如果不强制初始化时进行注册,会通过心跳续约的定时任务去注册,即使注册失败了也不影响客户端启动,并会定时多次尝试进行注册。

看register方法:

//DiscoveryClient.java
/**
 * Register with the eureka service by making the appropriate REST call.
 */
boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
	    //将当前instanceInfo(主机信息)作为参数进行注册
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    //204(No Content), 表示执行成功, 但是没有数据,
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
           

继续看eurekaTransport.registrationClient.register方法,可以看到通过Jersey 框架提交post注册请求:

//AbstractJerseyEurekaHttpClient.java
public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
	    //通过Jersey 框架提交post注册请求
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                //携带的参数就是InstanceInfo
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                    response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

//WebResource.java
@Override
public <T> T post(Class<T> c, Object requestEntity) throws UniformInterfaceException, ClientHandlerException {
    return handle(c, build("POST", requestEntity));
}
           

可以看到注册也是比较简单的。

继续阅读