Eureka Client 源碼解析 擷取系統資料庫、用戶端注冊
1. EurekaClient 構造器中的流程:
接下來我們準備開始真正分析Eureka Client 的源碼,上一章我們分析了Eureka Client的自動配置類都加載了哪些東西,其中最為核心的就是EurekaClient:
先簡單看一下CloudEurekaClient構造器中的流程,大概看一下都做了哪些事,接下來我們會一一分析:
2. EurekaClient 構造器跟蹤
入口:
跟CloudEurekaClient的構造:
//CloudEurekaClient.java
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
ApplicationEventPublisher publisher) {
//繼續跟super方法
//CloudEurekaClient 繼承自 DiscoveryClient
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
//DiscoveryClient.java
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
//繼續跟this方法
this(applicationInfoManager, config, args, ResolverUtils::randomize);
}
//DiscoveryClient.java
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
//此時一共5個參數
//第四個參數是個匿名内部類執行個體,主要是提供一個備用系統資料庫的功能(當遠端系統資料庫擷取失敗的時候)
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
private volatile BackupRegistry backupRegistryInstance;
@Override
//傳回BackupRegistry,備份系統資料庫
//Eureka是AP的,高可用,當Eureka Server全部挂掉了,這個備份系統資料庫執行個體就起作用了
//當然這個備份系統資料庫需要自己實作并配置才能用
public synchronized BackupRegistry get() {
if (backupRegistryInstance == null) {
String backupRegistryClassName = config.getBackupRegistryImpl();
if (null != backupRegistryClassName) {
try {
backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
} catch (InstantiationException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (IllegalAccessException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (ClassNotFoundException e) {
logger.error("Error instantiating BackupRegistry.", e);
}
}
if (backupRegistryInstance == null) {
logger.warn("Using default backup registry implementation which does not do anything.");
backupRegistryInstance = new NotImplementedRegistryImpl();
}
}
return backupRegistryInstance;
}
}, randomizer);
}
繼續跟this構造,這個方法很長,我們隻關注最核心的:
//DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
...//省略了很多代碼
//這裡fetchRegistry方法做的事情就是我們第一個需要關注的任務
//擷取系統資料庫
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
//第二個任務,register方法,進行注冊
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
//第三個任務,初始化定時任務
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
3. 擷取用戶端系統資料庫
3.1 擷取備用系統資料庫:
現在看擷取系統資料庫的方法fetchRegistry,該方法傳回值true/false代表是否擷取成功
先看擷取失敗的情況,如果需要擷取,但擷取失敗,就會擷取備用的系統資料庫:
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
//如果應該擷取,但擷取失敗,就會擷取備用的系統資料庫
fetchRegistryFromBackup();
}
//DiscoveryClient.java
private void fetchRegistryFromBackup() {
try {
//先直接擷取備用系統資料庫執行個體,預設傳回null,除非子類重寫該方法
@SuppressWarnings("deprecation")
BackupRegistry backupRegistryInstance = newBackupRegistryInstance();
if (null == backupRegistryInstance) { // backward compatibility with the old protected method, in case it is being used.
//這個backupRegistryProvider就是之前構造中傳入的匿名内部類
//備用系統資料庫的提供者,通過提供者擷取備用系統資料庫執行個體
//沒有專門實作,這裡傳回的就是NotImplementedRegistryImpl(之前構造中可以看到)
backupRegistryInstance = backupRegistryProvider.get();
}
if (null != backupRegistryInstance) {
//如果這個備用系統資料庫執行個體不為空
Applications apps = null;
if (isFetchingRemoteRegionRegistries()) {//判斷是否可以從遠端region擷取
//擷取配置的遠端region清單
String remoteRegionsStr = remoteRegionsToFetch.get();
if (null != remoteRegionsStr) {
//從遠端region擷取系統資料庫,apps就是Applications
//Applications就是系統資料庫,map結構,key是微服務名稱,value是Application
//Application也是map結構,key是InstanceInfo的Id,value是InstanceInfo
//一個InstanceInfo對應的就是一個主機資訊
apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(","));
}
} else {
//不可以從遠端region擷取,則擷取本地備用系統資料庫
apps = backupRegistryInstance.fetchRegistry();
}
if (apps != null) {
//如果apps不空,則将apps打散
final Applications applications = this.filterAndShuffle(apps);
applications.setAppsHashCode(applications.getReconcileHashCode());
//将shuffle過的applications放到本地緩存
localRegionApps.set(applications);
logTotalInstances();
logger.info("Fetched registry successfully from the backup");
}
} else {
logger.warn("No backup registry instance defined & unable to find any discovery servers.");
}
} catch (Throwable e) {
logger.warn("Cannot fetch applications from apps although backup registry was specified", e);
}
}
判斷是否可以從遠端region擷取: 該值我們可以通過配置檔案進行配置,指定擷取eureka系統資料庫資訊的遠端region清單: 備用系統資料庫Provider是之前構造中傳入的一個匿名内部類:
3.2 從注冊中心擷取系統資料庫
現在看從注冊中心擷取系統資料庫的具體實作:
//DiscoveryClient.java
/**
* Fetches the registry information.
*
* <p>
* This method tries to get only deltas after the first fetch unless there
* is an issue in reconciling eureka server and client registry information.
* 除非在協調eureka伺服器和用戶端系統資料庫資訊時出現問題,否則此方法嘗試在第一次擷取後隻擷取delta。
*
* 翻譯一下:第一次是全量下載下傳,後面就是增量下載下傳
* </p>
*
* @param forceFullRegistryFetch Forces a full registry fetch. 是否強制全量下載下傳
*
* @return true if the registry was fetched 擷取成功傳回true,否則false
*/
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
//這裡入參forceFullRegistryFetch,代表含義是:true則代表強制全量下載下傳
//false就是有可能全量,也有可能是增量擷取,視情況而定。
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
// 先擷取目前本地緩存的系統資料庫資訊,剛啟動這裡肯定擷取為空
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
//隻要滿足以上各種情況的任何一個條件,就會進行全量下載下傳:
// 配置檔案中關閉了增量下載下傳
// 配置了VIP的系統資料庫位址
// 強制進行全量下載下傳
// 本地緩存的系統資料庫資訊為空
...//省略了各種info日志列印
// 全量擷取,擷取存儲全部系統資料庫資訊
getAndStoreFullRegistry();
} else {
// 增量擷取,擷取更新增量資料
//(所謂增量下載下傳其實是擷取服務端的最近更新隊列的資料,以後會看到)
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
3.2.1 全量下載下傳
看getAndStoreFullRegistry方法:
//DiscoveryClient.java
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
//先嘗試擷取registry-refresh-single-vip-address配置
//如果不為null走的是vip位址擷取
//為null,則走預設的
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
//擷取響應體,就是Applications
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
//将系統資料庫資訊打亂放入本地緩存
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
registry-refresh-single-vip-address配置:可以專門指定位址進行系統資料庫下載下傳更新
這裡我們沒有指定vip位址,是以走eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()):
可以看到實作中,通過Jersey 架構送出了get請求:
//AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}
//AbstractJerseyEurekaHttpClient.java
//可以看到實作中,通過Jersey 架構送出了get請求
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
//SpringMVC的處理器是Controller,Jersey架構的處理器是Resource
//這裡WebResource就是代表一個web資源
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
//從這個web資源建立一個新的WebResource,并将一個附加的查詢參數添加到這個web資源的URI中。
//建構查詢參數到URI中
webResource = webResource.queryParam("regions", regionsParamValue);
}
//擷取請求建構者
Builder requestBuilder = webResource.getRequestBuilder();
//添加請求頭,用戶端來說,沒有特殊配置,這裡不會添加任何請求頭
addExtraHeaders(requestBuilder);
//指定接受json資料格式,并發起get請求
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
//擷取響應,将響應體轉成 系統資料庫的資料結構
applications = response.getEntity(Applications.class);
}
// 将結果封裝成需要的資料結構傳回
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue == null ? "" : "regions=" + regionsParamValue,
response == null ? "N/A" : response.getStatus()
);
}
if (response != null) {
response.close();
}
}
}
看requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class)的get方法:
//WebResource.java
@Override
public <T> T get(Class<T> c) throws UniformInterfaceException, ClientHandlerException {
//看到這裡送出的就是"GET"請求,下面就是Jersey 架構的東西了
//當然還涉及到一些url的處理,就不跟了
return handle(c, build("GET"));
}
請求發了,至于Eureka Server端怎麼處理并響應,等将Eureka Server時在講,看到全量下載下傳還是比較簡單的。
3.2.2 增量下載下傳
下一章講Eureka Client端的一些定時任務,其中定時更新系統資料庫的任務也會用到增量下載下傳,下一章說。
4. 用戶端注冊
現在看用戶端注冊:
-
shouldRegisterWithEureka
指定是否需要向Eureka進行注冊,該配置預設為true,一般Server端會設定為false
-
shouldEnforceRegistrationAtInit
在初始化時候是否注冊,預設false
可以看到預設情況下這裡的注冊方法不會執行,即初始化的時候不會進行注冊,那麼這裡不注冊,在哪裡注冊呢?其實是在定時任務裡注冊的(心跳續約),但是注冊邏輯是一樣的,是以這裡我們先分析這個方法:
PS:從代碼中可以看到,如果設定為初始化時注冊,會一個問題:如果注冊失敗了會抛異常,導緻整個用戶端啟動就失敗了,也不會啟動後面的定時任務,如果不強制初始化時進行注冊,會通過心跳續約的定時任務去注冊,即使注冊失敗了也不影響用戶端啟動,并會定時多次嘗試進行注冊。
看register方法:
//DiscoveryClient.java
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
//将目前instanceInfo(主機資訊)作為參數進行注冊
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
//204(No Content), 表示執行成功, 但是沒有資料,
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
繼續看eurekaTransport.registrationClient.register方法,可以看到通過Jersey 架構送出post注冊請求:
//AbstractJerseyEurekaHttpClient.java
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
//通過Jersey 架構送出post注冊請求
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
//攜帶的參數就是InstanceInfo
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
//WebResource.java
@Override
public <T> T post(Class<T> c, Object requestEntity) throws UniformInterfaceException, ClientHandlerException {
return handle(c, build("POST", requestEntity));
}
可以看到注冊也是比較簡單的。