Client端啟動主流程
- SpringBoot的自動配置掃描spring-cloud-netflix-eureka-client.jar下的META-INF/spring.factories中的自動配置類org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
- EurekaClientAutoConfiguration根據條件初始化相關bean
- EurekaClientConfigBean:eureka用戶端向eureka伺服器注冊執行個體所需的配置資訊。大多數需要的資訊由預設配置DefaultEurekaClientConfig提供。使用者隻需要提供eureka伺服器服務url。Eureka伺服器的服務url可以通過兩種方式配置:1)通過在DNS中注冊資訊。2)在配置中指定。
- EurekaInstanceConfigBean:執行個體注冊到Eureka伺服器所需的配置資訊,如id,appname,ip等。
- EurekaServiceRegistry:提供執行個體注冊和登出等功能。
- EurekaHealthIndicatorConfiguration:Eureka健康檢查配置類。
- EurekaClientConfiguration:定義的一個配置類,配置EurekaClient(最核心)、ApplicationInfoManager(這個類初始化注冊Eureka Server所需的資訊,這些資訊将被其他元件發現。)、EurekaRegistration(服務執行個體的注冊資訊)。
- 自動配置類中,建立EurekaClient的Bean時,使用CloudEurekaClient建立執行個體,CloudEurekaClient中調用了Netflix原生的DiscoveryClient完成初始化流程。
- DiscoveryClient構造器中會調initScheduledTasks(),這是最核心的方法,它完成初始化排程任務(例如:叢集解析器、心跳、instanceInfo複制器、fetch)。
- 使用TimedSupervisorTask建立擷取服務和服務續約的定時任務,使用InstanceInfoReplicator建立服務注冊的定時任務,然後調Eureka Server端的接口完成相應功能。這些核心功能會分解到後續的文章分析。
源碼分析
- EurekaClientAutoConfiguration
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
"org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {
// 建立EurekaClient的用戶端配置
// 如registryFetchIntervalSeconds多久發送續約心跳
// 如instanceInfoReplicationIntervalSeconds:将執行個體更改複制到eureka伺服器的頻率(以秒為機關)。
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class,
search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
return new EurekaClientConfigBean();
}
// 做服務注冊和服務登出
@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
return new EurekaServiceRegistry();
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {
// 建立Eureka用戶端,CloudEurekaClient繼承了Netflix原生的DiscoveryClient
// 會調DiscoveryClient的構造方法做核心的初始化任務。
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
}
}
- CloudEurekaClient
public class CloudEurekaClient extends DiscoveryClient {
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
ApplicationEventPublisher publisher)
// 調父類構造器做核心初始化任務
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
}
- DiscoveryClient
@Singleton
public class DiscoveryClient implements EurekaClient {
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer ) {
// 省略了非核心代碼 ...
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 初始化遠端調用Eureka Server端接口的用戶端
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
//如果設定為true,如果向遠端伺服器的初始注冊不成功,eurekclient初始化應該在構造函數時抛出異常。
// 注意,如果shouldRegisterWithEureka()設定為false,則此配置為無操作
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
// 向遠端伺服器的初始注冊
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// 最核心的代碼
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
// 省略了非核心代碼 ...
}
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
// 擷取系統資料庫資訊
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
// 從eureka伺服器擷取系統資料庫資訊的頻率(以秒為機關)。預設為30s
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 建立定時拉取系統資料庫的任務
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
// Runnable - 拉取系統資料庫核心邏輯
new CacheRefreshThread()
);
// 延遲30s開始執行拉取系統資料庫的任務
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
// Eureka用戶端發送心跳續約的時間間隔,每30s
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
// Runnable-發送心跳的核心邏輯
new HeartbeatThread()
);
// 延遲30s開始執行發送心跳續約的任務
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
// 實作了Runnable,表示更新本地執行個體資訊并将其複制到遠端伺服器的任務。
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 啟動服務注冊任務
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
}
- CacheRefreshThread 拉取服務的任務
/**
* The task that fetches the registry information at specified intervals.
*/
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
// 省略非核心代碼...
// 拉取系統資料庫核心邏輯
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes);
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
// 用戶端拉取服務清單的邏輯
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 取出之前拉取的本地緩存中的服務清單資訊
// 增量拉取(eureka.client.disableDelta 預設false)關閉或者
// 第一次拉取應用時,這裡擷取所有應用。否則增量拉取
Applications applications = getApplications();
// 通過多個條件判斷是否觸發全量拉取,任意一個條件成立都會觸發:
// 1、是否關閉了增量更新,預設是false
// 2、用戶端是否隻關注單個VIP的系統資料庫資訊
// 3、是否強制全量更新,即方法入參forceFullRegistryFetch
// 4、本地緩存還未緩存過有效的服務清單資訊
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 全量拉取并存儲系統資料庫
getAndStoreFullRegistry();
} else {
// 增量拉取并更新系統資料庫
getAndUpdateDelta(applications);
}
// 重新計算和設定一緻性哈希碼
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
// 将本地緩存更新的事件廣播給所有已注冊的監聽器
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
// 确定此應用程式執行個體的狀态,如果沒有找到,則設定為UNKNOWN
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
//從eureka伺服器擷取完整的系統資料庫資訊并将其存儲在本地。
private void getAndStoreFullRegistry() throws Throwable {
// 通過CAS做版本維護
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 向服務端發起遠端調用
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 存儲到本地
// filterAndShuffle(apps):在篩選應用程式中僅具有UP狀态
// 的執行個體并對其進行重組之後,擷取應用程式。過濾依賴于配置
// EurekaClientConfig.shouldFilterOnlyUpInstances()
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
// 從eureka伺服器擷取增量系統資料庫資訊并在本地更新它。
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
// 遠端調用服務端擷取增量應用
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
// 增量擷取不到資料時,全量擷取
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 增量更新本地緩存
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
- HeartbeatThread 續約任務
private class HeartbeatThread implements Runnable {
public void run() {
// REST調Eureka Server的接口發送心跳
if (renew()) {
// 更新最後一次發送心跳的時間
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
- InstanceInfoReplicator 服務注冊
class InstanceInfoReplicator implements Runnable {
public void run() {
try {
// 更新執行個體資訊
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 通過REST調Eureka Server端進行執行個體注冊
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// 使用Future開啟下一次任務執行
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
總結一些設計思想
Eureka Client端的初始化流程,核心是3個定時任務,如擷取服務的定時任務,代碼如下:
@Singleton
public class DiscoveryClient implements EurekaClient {
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
}
// ... ...
}
執行scheduler.schedule(),這裡明明是一次性的,但實際為啥能定時執行呢?點進TimedSupervisorTask進去看它的run(),finally又做了一次scheduler.schedule(),進而達到循環執行的效果。
public class TimedSupervisorTask extends TimerTask {
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
// 防止偶發執行任務逾時,重置為任務預設延遲時間
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
// 目前任務執行逾時時,延長2倍執行時間,但不能大于最大延遲時間(預設30s)
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
}
那為什麼不直接調下面這個方法呢?
scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
其實這樣設計是為了更靈活地控制定時任務的執行時間,避免任務執行逾時或者偶爾逾時的場景,是任務執行更可靠。咱在開發中,做定時任務可參考這種設計方式。