天天看點

Eureka Client端啟動過程源碼分析

作者:廣東高腳七

Client端啟動主流程

Eureka Client端啟動過程源碼分析
  1. SpringBoot的自動配置掃描spring-cloud-netflix-eureka-client.jar下的META-INF/spring.factories中的自動配置類org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
  2. EurekaClientAutoConfiguration根據條件初始化相關bean
  3. EurekaClientConfigBean:eureka用戶端向eureka伺服器注冊執行個體所需的配置資訊。大多數需要的資訊由預設配置DefaultEurekaClientConfig提供。使用者隻需要提供eureka伺服器服務url。Eureka伺服器的服務url可以通過兩種方式配置:1)通過在DNS中注冊資訊。2)在配置中指定。
  4. EurekaInstanceConfigBean:執行個體注冊到Eureka伺服器所需的配置資訊,如id,appname,ip等。
  5. EurekaServiceRegistry:提供執行個體注冊和登出等功能。
  6. EurekaHealthIndicatorConfiguration:Eureka健康檢查配置類。
  7. EurekaClientConfiguration:定義的一個配置類,配置EurekaClient(最核心)、ApplicationInfoManager(這個類初始化注冊Eureka Server所需的資訊,這些資訊将被其他元件發現。)、EurekaRegistration(服務執行個體的注冊資訊)。
  8. 自動配置類中,建立EurekaClient的Bean時,使用CloudEurekaClient建立執行個體,CloudEurekaClient中調用了Netflix原生的DiscoveryClient完成初始化流程。
  9. DiscoveryClient構造器中會調initScheduledTasks(),這是最核心的方法,它完成初始化排程任務(例如:叢集解析器、心跳、instanceInfo複制器、fetch)。
  10. 使用TimedSupervisorTask建立擷取服務和服務續約的定時任務,使用InstanceInfoReplicator建立服務注冊的定時任務,然後調Eureka Server端的接口完成相應功能。這些核心功能會分解到後續的文章分析。

源碼分析

  • EurekaClientAutoConfiguration
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
    CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
    "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
    "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
    "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
    "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {


      // 建立EurekaClient的用戶端配置
      // 如registryFetchIntervalSeconds多久發送續約心跳
      // 如instanceInfoReplicationIntervalSeconds:将執行個體更改複制到eureka伺服器的頻率(以秒為機關)。
      @Bean
      @ConditionalOnMissingBean(value = EurekaClientConfig.class,
          search = SearchStrategy.CURRENT)
      public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
        return new EurekaClientConfigBean();
      }
      
      // 做服務注冊和服務登出
      @Bean
      public EurekaServiceRegistry eurekaServiceRegistry() {
        return new EurekaServiceRegistry();
      }
      
      @Configuration(proxyBeanMethods = false)
      @ConditionalOnMissingRefreshScope
      protected static class EurekaClientConfiguration {
       // 建立Eureka用戶端,CloudEurekaClient繼承了Netflix原生的DiscoveryClient
       // 會調DiscoveryClient的構造方法做核心的初始化任務。
        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class,
            search = SearchStrategy.CURRENT)
        public EurekaClient eurekaClient(ApplicationInfoManager manager,
            EurekaClientConfig config) {
          return new CloudEurekaClient(manager, config, this.optionalArgs,
              this.context);
        } 
 }  
}           
  • CloudEurekaClient
public class CloudEurekaClient extends DiscoveryClient {
    
    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
      EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
      ApplicationEventPublisher publisher)
      // 調父類構造器做核心初始化任務
      super(applicationInfoManager, config, args);
      this.applicationInfoManager = applicationInfoManager;
      this.publisher = publisher;
      this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
          "eurekaTransport");
      ReflectionUtils.makeAccessible(this.eurekaTransportField);
   }  
}           
  • DiscoveryClient
@Singleton
public class DiscoveryClient implements EurekaClient { 
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer     ) {
       // 省略了非核心代碼 ...
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());


            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff


            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            
            // 初始化遠端調用Eureka Server端接口的用戶端
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);


            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }
        
        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }
        
        //如果設定為true,如果向遠端伺服器的初始注冊不成功,eurekclient初始化應該在構造函數時抛出異常。
        // 注意,如果shouldRegisterWithEureka()設定為false,則此配置為無操作
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
            // 向遠端伺服器的初始注冊
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }
        
        // 最核心的代碼
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();
        
        // 省略了非核心代碼 ...
    }
    
    /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
    
        // 擷取系統資料庫資訊
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            // 從eureka伺服器擷取系統資料庫資訊的頻率(以秒為機關)。預設為30s
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            // 建立定時拉取系統資料庫的任務
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    // Runnable - 拉取系統資料庫核心邏輯
                    new CacheRefreshThread()
            );
            // 延遲30s開始執行拉取系統資料庫的任務
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }


        if (clientConfig.shouldRegisterWithEureka()) {
            // Eureka用戶端發送心跳續約的時間間隔,每30s
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);


            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    // Runnable-發送心跳的核心邏輯
                    new HeartbeatThread()
            );
            // 延遲30s開始執行發送心跳續約的任務
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);


            // InstanceInfo replicator
            // 實作了Runnable,表示更新本地執行個體資訊并将其複制到遠端伺服器的任務。
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize


            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }


                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };


            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
             // 啟動服務注冊任務
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
}             
  • CacheRefreshThread 拉取服務的任務
/**
   * The task that fetches the registry information at specified intervals.
   */
  class CacheRefreshThread implements Runnable {
      public void run() {
          refreshRegistry();
      }
  }
    
    @VisibleForTesting
    void refreshRegistry() {
        try {
            // 省略非核心代碼...
            
            // 拉取系統資料庫核心邏輯
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
              logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                      allAppsHashCodes);
            }
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }
    }
    
    // 用戶端拉取服務清單的邏輯
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();


        try {
            // 取出之前拉取的本地緩存中的服務清單資訊
            // 增量拉取(eureka.client.disableDelta 預設false)關閉或者
            // 第一次拉取應用時,這裡擷取所有應用。否則增量拉取
            Applications applications = getApplications();
            
            // 通過多個條件判斷是否觸發全量拉取,任意一個條件成立都會觸發:
            // 1、是否關閉了增量更新,預設是false
            // 2、用戶端是否隻關注單個VIP的系統資料庫資訊
            // 3、是否強制全量更新,即方法入參forceFullRegistryFetch
            // 4、本地緩存還未緩存過有效的服務清單資訊
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                // 全量拉取并存儲系統資料庫
                getAndStoreFullRegistry();
            } else {
                // 增量拉取并更新系統資料庫
                getAndUpdateDelta(applications);
            }
            // 重新計算和設定一緻性哈希碼
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }


        // Notify about cache refresh before updating the instance remote status
        // 将本地緩存更新的事件廣播給所有已注冊的監聽器
        onCacheRefreshed();


        // Update remote status based on refreshed data held in the cache
        // 确定此應用程式執行個體的狀态,如果沒有找到,則設定為UNKNOWN
        updateInstanceRemoteStatus();


        // registry was fetched successfully, so return true
        return true;
    }
    
    //從eureka伺服器擷取完整的系統資料庫資訊并将其存儲在本地。
     private void getAndStoreFullRegistry() throws Throwable {
        // 通過CAS做版本維護       
        long currentUpdateGeneration = fetchRegistryGeneration.get();


        logger.info("Getting all instance registry info from the eureka server");


        Applications apps = null;
        // 向服務端發起遠端調用
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());


        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            // 存儲到本地
            // filterAndShuffle(apps):在篩選應用程式中僅具有UP狀态
            // 的執行個體并對其進行重組之後,擷取應用程式。過濾依賴于配置
            // EurekaClientConfig.shouldFilterOnlyUpInstances()
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }
    
    // 從eureka伺服器擷取增量系統資料庫資訊并在本地更新它。
     private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();


        Applications delta = null;
        // 遠端調用服務端擷取增量應用
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }
       
        // 增量擷取不到資料時,全量擷取
        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    // 增量更新本地緩存
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }           
  • HeartbeatThread 續約任務
private class HeartbeatThread implements Runnable {
        public void run() {
        // REST調Eureka Server的接口發送心跳
            if (renew()) {
            // 更新最後一次發送心跳的時間
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }           
  • InstanceInfoReplicator 服務注冊
class InstanceInfoReplicator implements Runnable {


  public void run() {
        try {
            // 更新執行個體資訊
            discoveryClient.refreshInstanceInfo();


            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
            // 通過REST調Eureka Server端進行執行個體注冊
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
        // 使用Future開啟下一次任務執行
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }


}           

總結一些設計思想

Eureka Client端的初始化流程,核心是3個定時任務,如擷取服務的定時任務,代碼如下:

@Singleton
public class DiscoveryClient implements EurekaClient {
  private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
  }
  // ... ...
}             

執行scheduler.schedule(),這裡明明是一次性的,但實際為啥能定時執行呢?點進TimedSupervisorTask進去看它的run(),finally又做了一次scheduler.schedule(),進而達到循環執行的效果。

public class TimedSupervisorTask extends TimerTask {
    @Override
    public void run() {
        Future<?> future = null;
        try {
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            // 防止偶發執行任務逾時,重置為任務預設延遲時間
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();
      
            long currentDelay = delay.get();
            // 目前任務執行逾時時,延長2倍執行時間,但不能大于最大延遲時間(預設30s)
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);


        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }


            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }


            throwableCounter.increment();
        } finally {
            if (future != null) {
                future.cancel(true);
            }


            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
 }           

那為什麼不直接調下面這個方法呢?

scheduleAtFixedRate(Runnable command,
                    long initialDelay,
                    long period,
                    TimeUnit unit);           

其實這樣設計是為了更靈活地控制定時任務的執行時間,避免任務執行逾時或者偶爾逾時的場景,是任務執行更可靠。咱在開發中,做定時任務可參考這種設計方式。

繼續閱讀