Eureka Client 源码解析 获取注册表、客户端注册
1. EurekaClient 构造器中的流程:
接下来我们准备开始真正分析Eureka Client 的源码,上一章我们分析了Eureka Client的自动配置类都加载了哪些东西,其中最为核心的就是EurekaClient:
先简单看一下CloudEurekaClient构造器中的流程,大概看一下都做了哪些事,接下来我们会一一分析:
2. EurekaClient 构造器跟踪
入口:
跟CloudEurekaClient的构造:
//CloudEurekaClient.java
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
ApplicationEventPublisher publisher) {
//继续跟super方法
//CloudEurekaClient 继承自 DiscoveryClient
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
//DiscoveryClient.java
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
//继续跟this方法
this(applicationInfoManager, config, args, ResolverUtils::randomize);
}
//DiscoveryClient.java
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
//此时一共5个参数
//第四个参数是个匿名内部类实例,主要是提供一个备用注册表的功能(当远程注册表获取失败的时候)
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
private volatile BackupRegistry backupRegistryInstance;
@Override
//返回BackupRegistry,备份注册表
//Eureka是AP的,高可用,当Eureka Server全部挂掉了,这个备份注册表实例就起作用了
//当然这个备份注册表需要自己实现并配置才能用
public synchronized BackupRegistry get() {
if (backupRegistryInstance == null) {
String backupRegistryClassName = config.getBackupRegistryImpl();
if (null != backupRegistryClassName) {
try {
backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
} catch (InstantiationException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (IllegalAccessException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (ClassNotFoundException e) {
logger.error("Error instantiating BackupRegistry.", e);
}
}
if (backupRegistryInstance == null) {
logger.warn("Using default backup registry implementation which does not do anything.");
backupRegistryInstance = new NotImplementedRegistryImpl();
}
}
return backupRegistryInstance;
}
}, randomizer);
}
继续跟this构造,这个方法很长,我们只关注最核心的:
//DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
...//省略了很多代码
//这里fetchRegistry方法做的事情就是我们第一个需要关注的任务
//获取注册表
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
//第二个任务,register方法,进行注册
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
//第三个任务,初始化定时任务
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
3. 获取客户端注册表
3.1 获取备用注册表:
现在看获取注册表的方法fetchRegistry,该方法返回值true/false代表是否获取成功
先看获取失败的情况,如果需要获取,但获取失败,就会获取备用的注册表:
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
//如果应该获取,但获取失败,就会获取备用的注册表
fetchRegistryFromBackup();
}
//DiscoveryClient.java
private void fetchRegistryFromBackup() {
try {
//先直接获取备用注册表实例,默认返回null,除非子类重写该方法
@SuppressWarnings("deprecation")
BackupRegistry backupRegistryInstance = newBackupRegistryInstance();
if (null == backupRegistryInstance) { // backward compatibility with the old protected method, in case it is being used.
//这个backupRegistryProvider就是之前构造中传入的匿名内部类
//备用注册表的提供者,通过提供者获取备用注册表实例
//没有专门实现,这里返回的就是NotImplementedRegistryImpl(之前构造中可以看到)
backupRegistryInstance = backupRegistryProvider.get();
}
if (null != backupRegistryInstance) {
//如果这个备用注册表实例不为空
Applications apps = null;
if (isFetchingRemoteRegionRegistries()) {//判断是否可以从远程region获取
//获取配置的远程region列表
String remoteRegionsStr = remoteRegionsToFetch.get();
if (null != remoteRegionsStr) {
//从远程region获取注册表,apps就是Applications
//Applications就是注册表,map结构,key是微服务名称,value是Application
//Application也是map结构,key是InstanceInfo的Id,value是InstanceInfo
//一个InstanceInfo对应的就是一个主机信息
apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(","));
}
} else {
//不可以从远程region获取,则获取本地备用注册表
apps = backupRegistryInstance.fetchRegistry();
}
if (apps != null) {
//如果apps不空,则将apps打散
final Applications applications = this.filterAndShuffle(apps);
applications.setAppsHashCode(applications.getReconcileHashCode());
//将shuffle过的applications放到本地缓存
localRegionApps.set(applications);
logTotalInstances();
logger.info("Fetched registry successfully from the backup");
}
} else {
logger.warn("No backup registry instance defined & unable to find any discovery servers.");
}
} catch (Throwable e) {
logger.warn("Cannot fetch applications from apps although backup registry was specified", e);
}
}
判断是否可以从远程region获取: 该值我们可以通过配置文件进行配置,指定获取eureka注册表信息的远程region列表: 备用注册表Provider是之前构造中传入的一个匿名内部类:
3.2 从注册中心获取注册表
现在看从注册中心获取注册表的具体实现:
//DiscoveryClient.java
/**
* Fetches the registry information.
*
* <p>
* This method tries to get only deltas after the first fetch unless there
* is an issue in reconciling eureka server and client registry information.
* 除非在协调eureka服务器和客户端注册表信息时出现问题,否则此方法尝试在第一次获取后只获取delta。
*
* 翻译一下:第一次是全量下载,后面就是增量下载
* </p>
*
* @param forceFullRegistryFetch Forces a full registry fetch. 是否强制全量下载
*
* @return true if the registry was fetched 获取成功返回true,否则false
*/
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
//这里入参forceFullRegistryFetch,代表含义是:true则代表强制全量下载
//false就是有可能全量,也有可能是增量获取,视情况而定。
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
// 先获取当前本地缓存的注册表信息,刚启动这里肯定获取为空
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
//只要满足以上各种情况的任何一个条件,就会进行全量下载:
// 配置文件中关闭了增量下载
// 配置了VIP的注册表地址
// 强制进行全量下载
// 本地缓存的注册表信息为空
...//省略了各种info日志打印
// 全量获取,获取存储全部注册表信息
getAndStoreFullRegistry();
} else {
// 增量获取,获取更新增量数据
//(所谓增量下载其实是获取服务端的最近更新队列的数据,以后会看到)
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
3.2.1 全量下载
看getAndStoreFullRegistry方法:
//DiscoveryClient.java
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
//先尝试获取registry-refresh-single-vip-address配置
//如果不为null走的是vip地址获取
//为null,则走默认的
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
//获取响应体,就是Applications
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
//将注册表信息打乱放入本地缓存
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
registry-refresh-single-vip-address配置:可以专门指定地址进行注册表下载更新
这里我们没有指定vip地址,所以走eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()):
可以看到实现中,通过Jersey 框架提交了get请求:
//AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}
//AbstractJerseyEurekaHttpClient.java
//可以看到实现中,通过Jersey 框架提交了get请求
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
//SpringMVC的处理器是Controller,Jersey框架的处理器是Resource
//这里WebResource就是代表一个web资源
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
//从这个web资源创建一个新的WebResource,并将一个附加的查询参数添加到这个web资源的URI中。
//构建查询参数到URI中
webResource = webResource.queryParam("regions", regionsParamValue);
}
//获取请求构建者
Builder requestBuilder = webResource.getRequestBuilder();
//添加请求头,客户端来说,没有特殊配置,这里不会添加任何请求头
addExtraHeaders(requestBuilder);
//指定接受json数据格式,并发起get请求
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
//获取响应,将响应体转成 注册表的数据结构
applications = response.getEntity(Applications.class);
}
// 将结果封装成需要的数据结构返回
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue == null ? "" : "regions=" + regionsParamValue,
response == null ? "N/A" : response.getStatus()
);
}
if (response != null) {
response.close();
}
}
}
看requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class)的get方法:
//WebResource.java
@Override
public <T> T get(Class<T> c) throws UniformInterfaceException, ClientHandlerException {
//看到这里提交的就是"GET"请求,下面就是Jersey 框架的东西了
//当然还涉及到一些url的处理,就不跟了
return handle(c, build("GET"));
}
请求发了,至于Eureka Server端怎么处理并响应,等将Eureka Server时在讲,看到全量下载还是比较简单的。
3.2.2 增量下载
下一章讲Eureka Client端的一些定时任务,其中定时更新注册表的任务也会用到增量下载,下一章说。
4. 客户端注册
现在看客户端注册:
-
shouldRegisterWithEureka
指定是否需要向Eureka进行注册,该配置默认为true,一般Server端会设置为false
-
shouldEnforceRegistrationAtInit
在初始化时候是否注册,默认false
可以看到默认情况下这里的注册方法不会执行,即初始化的时候不会进行注册,那么这里不注册,在哪里注册呢?其实是在定时任务里注册的(心跳续约),但是注册逻辑是一样的,所以这里我们先分析这个方法:
PS:从代码中可以看到,如果设置为初始化时注册,会一个问题:如果注册失败了会抛异常,导致整个客户端启动就失败了,也不会启动后面的定时任务,如果不强制初始化时进行注册,会通过心跳续约的定时任务去注册,即使注册失败了也不影响客户端启动,并会定时多次尝试进行注册。
看register方法:
//DiscoveryClient.java
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
//将当前instanceInfo(主机信息)作为参数进行注册
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
//204(No Content), 表示执行成功, 但是没有数据,
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
继续看eurekaTransport.registrationClient.register方法,可以看到通过Jersey 框架提交post注册请求:
//AbstractJerseyEurekaHttpClient.java
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
//通过Jersey 框架提交post注册请求
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
//携带的参数就是InstanceInfo
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
//WebResource.java
@Override
public <T> T post(Class<T> c, Object requestEntity) throws UniformInterfaceException, ClientHandlerException {
return handle(c, build("POST", requestEntity));
}
可以看到注册也是比较简单的。