Client端启动主流程
- SpringBoot的自动配置扫描spring-cloud-netflix-eureka-client.jar下的META-INF/spring.factories中的自动配置类org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
- EurekaClientAutoConfiguration根据条件初始化相关bean
- EurekaClientConfigBean:eureka客户端向eureka服务器注册实例所需的配置信息。大多数需要的信息由默认配置DefaultEurekaClientConfig提供。用户只需要提供eureka服务器服务url。Eureka服务器的服务url可以通过两种方式配置:1)通过在DNS中注册信息。2)在配置中指定。
- EurekaInstanceConfigBean:实例注册到Eureka服务器所需的配置信息,如id,appname,ip等。
- EurekaServiceRegistry:提供实例注册和注销等功能。
- EurekaHealthIndicatorConfiguration:Eureka健康检查配置类。
- EurekaClientConfiguration:定义的一个配置类,配置EurekaClient(最核心)、ApplicationInfoManager(这个类初始化注册Eureka Server所需的信息,这些信息将被其他组件发现。)、EurekaRegistration(服务实例的注册信息)。
- 自动配置类中,创建EurekaClient的Bean时,使用CloudEurekaClient创建实例,CloudEurekaClient中调用了Netflix原生的DiscoveryClient完成初始化流程。
- DiscoveryClient构造器中会调initScheduledTasks(),这是最核心的方法,它完成初始化调度任务(例如:集群解析器、心跳、instanceInfo复制器、fetch)。
- 使用TimedSupervisorTask创建获取服务和服务续约的定时任务,使用InstanceInfoReplicator创建服务注册的定时任务,然后调Eureka Server端的接口完成相应功能。这些核心功能会分解到后续的文章分析。
源码分析
- EurekaClientAutoConfiguration
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
"org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {
// 创建EurekaClient的客户端配置
// 如registryFetchIntervalSeconds多久发送续约心跳
// 如instanceInfoReplicationIntervalSeconds:将实例更改复制到eureka服务器的频率(以秒为单位)。
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class,
search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
return new EurekaClientConfigBean();
}
// 做服务注册和服务注销
@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
return new EurekaServiceRegistry();
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {
// 创建Eureka客户端,CloudEurekaClient继承了Netflix原生的DiscoveryClient
// 会调DiscoveryClient的构造方法做核心的初始化任务。
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
}
}
- CloudEurekaClient
public class CloudEurekaClient extends DiscoveryClient {
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
ApplicationEventPublisher publisher)
// 调父类构造器做核心初始化任务
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
}
- DiscoveryClient
@Singleton
public class DiscoveryClient implements EurekaClient {
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer ) {
// 省略了非核心代码 ...
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 初始化远程调用Eureka Server端接口的客户端
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
//如果设置为true,如果向远程服务器的初始注册不成功,eurekclient初始化应该在构造函数时抛出异常。
// 注意,如果shouldRegisterWithEureka()设置为false,则此配置为无操作
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
// 向远程服务器的初始注册
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// 最核心的代码
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
// 省略了非核心代码 ...
}
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
// 获取注册表信息
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
// 从eureka服务器获取注册表信息的频率(以秒为单位)。默认为30s
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 创建定时拉取注册表的任务
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
// Runnable - 拉取注册表核心逻辑
new CacheRefreshThread()
);
// 延迟30s开始执行拉取注册表的任务
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
// Eureka客户端发送心跳续约的时间间隔,每30s
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
// Runnable-发送心跳的核心逻辑
new HeartbeatThread()
);
// 延迟30s开始执行发送心跳续约的任务
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
// 实现了Runnable,表示更新本地实例信息并将其复制到远程服务器的任务。
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 启动服务注册任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
}
- CacheRefreshThread 拉取服务的任务
/**
* The task that fetches the registry information at specified intervals.
*/
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
// 省略非核心代码...
// 拉取注册表核心逻辑
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes);
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
// 客户端拉取服务列表的逻辑
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 取出之前拉取的本地缓存中的服务列表信息
// 增量拉取(eureka.client.disableDelta 默认false)关闭或者
// 第一次拉取应用时,这里获取所有应用。否则增量拉取
Applications applications = getApplications();
// 通过多个条件判断是否触发全量拉取,任意一个条件成立都会触发:
// 1、是否关闭了增量更新,默认是false
// 2、客户端是否只关注单个VIP的注册表信息
// 3、是否强制全量更新,即方法入参forceFullRegistryFetch
// 4、本地缓存还未缓存过有效的服务列表信息
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 全量拉取并存储注册表
getAndStoreFullRegistry();
} else {
// 增量拉取并更新注册表
getAndUpdateDelta(applications);
}
// 重新计算和设置一致性哈希码
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
// 将本地缓存更新的事件广播给所有已注册的监听器
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
// 确定此应用程序实例的状态,如果没有找到,则设置为UNKNOWN
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
//从eureka服务器获取完整的注册表信息并将其存储在本地。
private void getAndStoreFullRegistry() throws Throwable {
// 通过CAS做版本维护
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 向服务端发起远程调用
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 存储到本地
// filterAndShuffle(apps):在筛选应用程序中仅具有UP状态
// 的实例并对其进行重组之后,获取应用程序。过滤依赖于配置
// EurekaClientConfig.shouldFilterOnlyUpInstances()
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
// 从eureka服务器获取增量注册表信息并在本地更新它。
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
// 远程调用服务端获取增量应用
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
// 增量获取不到数据时,全量获取
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 增量更新本地缓存
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
- HeartbeatThread 续约任务
private class HeartbeatThread implements Runnable {
public void run() {
// REST调Eureka Server的接口发送心跳
if (renew()) {
// 更新最后一次发送心跳的时间
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
- InstanceInfoReplicator 服务注册
class InstanceInfoReplicator implements Runnable {
public void run() {
try {
// 更新实例信息
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 通过REST调Eureka Server端进行实例注册
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// 使用Future开启下一次任务执行
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
总结一些设计思想
Eureka Client端的初始化流程,核心是3个定时任务,如获取服务的定时任务,代码如下:
@Singleton
public class DiscoveryClient implements EurekaClient {
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
}
// ... ...
}
执行scheduler.schedule(),这里明明是一次性的,但实际为啥能定时执行呢?点进TimedSupervisorTask进去看它的run(),finally又做了一次scheduler.schedule(),从而达到循环执行的效果。
public class TimedSupervisorTask extends TimerTask {
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
// 防止偶发执行任务超时,重置为任务默认延迟时间
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
// 当前任务执行超时时,延长2倍执行时间,但不能大于最大延迟时间(默认30s)
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
}
那为什么不直接调下面这个方法呢?
scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
其实这样设计是为了更灵活地控制定时任务的执行时间,避免任务执行超时或者偶尔超时的场景,是任务执行更可靠。咱在开发中,做定时任务可参考这种设计方式。