天天看点

Eureka Client端启动过程源码分析

作者:广东高脚七

Client端启动主流程

Eureka Client端启动过程源码分析
  1. SpringBoot的自动配置扫描spring-cloud-netflix-eureka-client.jar下的META-INF/spring.factories中的自动配置类org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
  2. EurekaClientAutoConfiguration根据条件初始化相关bean
  3. EurekaClientConfigBean:eureka客户端向eureka服务器注册实例所需的配置信息。大多数需要的信息由默认配置DefaultEurekaClientConfig提供。用户只需要提供eureka服务器服务url。Eureka服务器的服务url可以通过两种方式配置:1)通过在DNS中注册信息。2)在配置中指定。
  4. EurekaInstanceConfigBean:实例注册到Eureka服务器所需的配置信息,如id,appname,ip等。
  5. EurekaServiceRegistry:提供实例注册和注销等功能。
  6. EurekaHealthIndicatorConfiguration:Eureka健康检查配置类。
  7. EurekaClientConfiguration:定义的一个配置类,配置EurekaClient(最核心)、ApplicationInfoManager(这个类初始化注册Eureka Server所需的信息,这些信息将被其他组件发现。)、EurekaRegistration(服务实例的注册信息)。
  8. 自动配置类中,创建EurekaClient的Bean时,使用CloudEurekaClient创建实例,CloudEurekaClient中调用了Netflix原生的DiscoveryClient完成初始化流程。
  9. DiscoveryClient构造器中会调initScheduledTasks(),这是最核心的方法,它完成初始化调度任务(例如:集群解析器、心跳、instanceInfo复制器、fetch)。
  10. 使用TimedSupervisorTask创建获取服务和服务续约的定时任务,使用InstanceInfoReplicator创建服务注册的定时任务,然后调Eureka Server端的接口完成相应功能。这些核心功能会分解到后续的文章分析。

源码分析

  • EurekaClientAutoConfiguration
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
    CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
    "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
    "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
    "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
    "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {


      // 创建EurekaClient的客户端配置
      // 如registryFetchIntervalSeconds多久发送续约心跳
      // 如instanceInfoReplicationIntervalSeconds:将实例更改复制到eureka服务器的频率(以秒为单位)。
      @Bean
      @ConditionalOnMissingBean(value = EurekaClientConfig.class,
          search = SearchStrategy.CURRENT)
      public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
        return new EurekaClientConfigBean();
      }
      
      // 做服务注册和服务注销
      @Bean
      public EurekaServiceRegistry eurekaServiceRegistry() {
        return new EurekaServiceRegistry();
      }
      
      @Configuration(proxyBeanMethods = false)
      @ConditionalOnMissingRefreshScope
      protected static class EurekaClientConfiguration {
       // 创建Eureka客户端,CloudEurekaClient继承了Netflix原生的DiscoveryClient
       // 会调DiscoveryClient的构造方法做核心的初始化任务。
        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class,
            search = SearchStrategy.CURRENT)
        public EurekaClient eurekaClient(ApplicationInfoManager manager,
            EurekaClientConfig config) {
          return new CloudEurekaClient(manager, config, this.optionalArgs,
              this.context);
        } 
 }  
}           
  • CloudEurekaClient
public class CloudEurekaClient extends DiscoveryClient {
    
    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
      EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
      ApplicationEventPublisher publisher)
      // 调父类构造器做核心初始化任务
      super(applicationInfoManager, config, args);
      this.applicationInfoManager = applicationInfoManager;
      this.publisher = publisher;
      this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
          "eurekaTransport");
      ReflectionUtils.makeAccessible(this.eurekaTransportField);
   }  
}           
  • DiscoveryClient
@Singleton
public class DiscoveryClient implements EurekaClient { 
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer     ) {
       // 省略了非核心代码 ...
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());


            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff


            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            
            // 初始化远程调用Eureka Server端接口的客户端
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);


            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }
        
        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }
        
        //如果设置为true,如果向远程服务器的初始注册不成功,eurekclient初始化应该在构造函数时抛出异常。
        // 注意,如果shouldRegisterWithEureka()设置为false,则此配置为无操作
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
            // 向远程服务器的初始注册
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }
        
        // 最核心的代码
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();
        
        // 省略了非核心代码 ...
    }
    
    /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
    
        // 获取注册表信息
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            // 从eureka服务器获取注册表信息的频率(以秒为单位)。默认为30s
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            // 创建定时拉取注册表的任务
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    // Runnable - 拉取注册表核心逻辑
                    new CacheRefreshThread()
            );
            // 延迟30s开始执行拉取注册表的任务
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }


        if (clientConfig.shouldRegisterWithEureka()) {
            // Eureka客户端发送心跳续约的时间间隔,每30s
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);


            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    // Runnable-发送心跳的核心逻辑
                    new HeartbeatThread()
            );
            // 延迟30s开始执行发送心跳续约的任务
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);


            // InstanceInfo replicator
            // 实现了Runnable,表示更新本地实例信息并将其复制到远程服务器的任务。
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize


            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }


                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };


            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
             // 启动服务注册任务
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
}             
  • CacheRefreshThread 拉取服务的任务
/**
   * The task that fetches the registry information at specified intervals.
   */
  class CacheRefreshThread implements Runnable {
      public void run() {
          refreshRegistry();
      }
  }
    
    @VisibleForTesting
    void refreshRegistry() {
        try {
            // 省略非核心代码...
            
            // 拉取注册表核心逻辑
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
              logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                      allAppsHashCodes);
            }
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }
    }
    
    // 客户端拉取服务列表的逻辑
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();


        try {
            // 取出之前拉取的本地缓存中的服务列表信息
            // 增量拉取(eureka.client.disableDelta 默认false)关闭或者
            // 第一次拉取应用时,这里获取所有应用。否则增量拉取
            Applications applications = getApplications();
            
            // 通过多个条件判断是否触发全量拉取,任意一个条件成立都会触发:
            // 1、是否关闭了增量更新,默认是false
            // 2、客户端是否只关注单个VIP的注册表信息
            // 3、是否强制全量更新,即方法入参forceFullRegistryFetch
            // 4、本地缓存还未缓存过有效的服务列表信息
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                // 全量拉取并存储注册表
                getAndStoreFullRegistry();
            } else {
                // 增量拉取并更新注册表
                getAndUpdateDelta(applications);
            }
            // 重新计算和设置一致性哈希码
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }


        // Notify about cache refresh before updating the instance remote status
        // 将本地缓存更新的事件广播给所有已注册的监听器
        onCacheRefreshed();


        // Update remote status based on refreshed data held in the cache
        // 确定此应用程序实例的状态,如果没有找到,则设置为UNKNOWN
        updateInstanceRemoteStatus();


        // registry was fetched successfully, so return true
        return true;
    }
    
    //从eureka服务器获取完整的注册表信息并将其存储在本地。
     private void getAndStoreFullRegistry() throws Throwable {
        // 通过CAS做版本维护       
        long currentUpdateGeneration = fetchRegistryGeneration.get();


        logger.info("Getting all instance registry info from the eureka server");


        Applications apps = null;
        // 向服务端发起远程调用
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());


        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            // 存储到本地
            // filterAndShuffle(apps):在筛选应用程序中仅具有UP状态
            // 的实例并对其进行重组之后,获取应用程序。过滤依赖于配置
            // EurekaClientConfig.shouldFilterOnlyUpInstances()
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }
    
    // 从eureka服务器获取增量注册表信息并在本地更新它。
     private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();


        Applications delta = null;
        // 远程调用服务端获取增量应用
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }
       
        // 增量获取不到数据时,全量获取
        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    // 增量更新本地缓存
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }           
  • HeartbeatThread 续约任务
private class HeartbeatThread implements Runnable {
        public void run() {
        // REST调Eureka Server的接口发送心跳
            if (renew()) {
            // 更新最后一次发送心跳的时间
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }           
  • InstanceInfoReplicator 服务注册
class InstanceInfoReplicator implements Runnable {


  public void run() {
        try {
            // 更新实例信息
            discoveryClient.refreshInstanceInfo();


            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
            // 通过REST调Eureka Server端进行实例注册
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
        // 使用Future开启下一次任务执行
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }


}           

总结一些设计思想

Eureka Client端的初始化流程,核心是3个定时任务,如获取服务的定时任务,代码如下:

@Singleton
public class DiscoveryClient implements EurekaClient {
  private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
  }
  // ... ...
}             

执行scheduler.schedule(),这里明明是一次性的,但实际为啥能定时执行呢?点进TimedSupervisorTask进去看它的run(),finally又做了一次scheduler.schedule(),从而达到循环执行的效果。

public class TimedSupervisorTask extends TimerTask {
    @Override
    public void run() {
        Future<?> future = null;
        try {
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            // 防止偶发执行任务超时,重置为任务默认延迟时间
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();
      
            long currentDelay = delay.get();
            // 当前任务执行超时时,延长2倍执行时间,但不能大于最大延迟时间(默认30s)
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);


        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }


            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }


            throwableCounter.increment();
        } finally {
            if (future != null) {
                future.cancel(true);
            }


            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
 }           

那为什么不直接调下面这个方法呢?

scheduleAtFixedRate(Runnable command,
                    long initialDelay,
                    long period,
                    TimeUnit unit);           

其实这样设计是为了更灵活地控制定时任务的执行时间,避免任务执行超时或者偶尔超时的场景,是任务执行更可靠。咱在开发中,做定时任务可参考这种设计方式。

继续阅读