天天看點

【源碼】Spring Cloud —— Eureka Client 2 DiscoveryClient前言版本DiscoveryClient總結參考

【源碼】Spring Cloud —— Eureka Client 2 DiscoveryClient

  • 前言
  • 版本
  • DiscoveryClient
    • 系統資料庫資訊拉取
    • 服務注冊
    • 定時任務
      • 定時拉取注冊清單資訊
      • 心跳續約
      • 按需注冊
    • 服務下線
  • 總結
  • 參考

前言

上一章節對 Spring Cloud Netflix Eureka Client 提供的 核心元件類 做了大緻的了解,本章節結合部分源碼解讀核心類 DiscoveryClient

該系列章節需要相關的内容做鋪墊,傳送門:
           

關于延時、周期任務排程 —— ScheduledExecutorService ScheduledThreadPoolExecutor

版本

Spring Cloud Netflix 版本:2.2.3.RELEASE

對應 Netflix-Eureka 版本:1.9.21

DiscoveryClient

DiscoveryClient 的核心業務邏輯發生在 構造方法 中,也就是說,在啟動 Spring 應用注冊對應的 Bean 時同時注冊對應的 服務執行個體

if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            
        // 略

        return;
    }
           

屬性

shouldRegisterWithEureka

shouldFetchRegistry

false

時,不進行 服務注冊、發現,即對應我們的配置項

eureka.client.register-with-eureka

eureka.client.fetch-register

scheduler = Executors.newScheduledThreadPool(2,
             new ThreadFactoryBuilder()
                     .setNameFormat("DiscoveryClient-%d")
                     .setDaemon(true)
                     .build());

     heartbeatExecutor = new ThreadPoolExecutor(
             1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
             new SynchronousQueue<Runnable>(),
             new ThreadFactoryBuilder()
                     .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                     .setDaemon(true)
                     .build()
     );

     cacheRefreshExecutor = new ThreadPoolExecutor(
             1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
             new SynchronousQueue<Runnable>(),
             new ThreadFactoryBuilder()
                     .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                     .setDaemon(true)
                     .build()
     );
           

如果允許 服務注冊、發現,則執行個體化用于 發送心跳、重新整理緩存 相關的 線程池

eurekaTransport = new EurekaTransport();
	// 建構
    scheduleServerEndpointTask(eurekaTransport, args);
           

EurekaTransport 封裝了 DiscoveryClient 與 Eureka Server 進行 HTTP 調用的 Jersey 用戶端

系統資料庫資訊拉取

// 允許拉取系統資料庫資訊,則進行拉取
	// 如果拉取失敗,則調用 fetchRegistryFromBackup
	if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }
           

如果允許系統資料庫資訊拉取,則調用

fetchRegistry

方法拉取,拉取失敗則調用交由成員屬性

backupRegistryProvider

擷取,其預設實作為 NotImplementedRegisteryImpl,即沒有實作

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                // 全量拉取
                getAndStoreFullRegistry();
            } else {
            	// 增量拉取
                getAndUpdateDelta(applications);
            }
    		
    		// hash 碼計算
            applications.setAppsHashCode(applications.getReconcileHashCode());
            // 列印執行個體數
            logTotalInstances();
        } catch (Throwable e) {
            // ...
        }

        // ...
        
        return true;
    }
           
  • 全量拉取:第一次拉取時,必然是 全量拉取 并緩存到本地,請求

    url

    eureka/apps

    ,拉取的執行個體經過過濾、調整後緩存在

    localRegionApps

    屬性
  • 增量拉取:之後的拉取則隻針對一段時間内發生的變更資訊,并根據變更的狀态進行 增量式更新,請求

    url

    /eureka/app/delta

服務注冊

if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
        	// 注冊
            if (!register() ) {
            	
            	// 注冊失敗
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }
           

注冊方法為

register

,PS:這種在判斷條件中執行邏輯的寫法,在 JUC 源碼中十分常用

boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }
           

請求

url

/eureka/${APP_NAME}

,傳遞參數為 InstanceInfo,如果伺服器傳回

204

狀态碼,則表明注冊成功

定時任務

// 初始化定時任務
	initScheduledTasks();
           

定時任務包括

  • 定時拉取注冊清單資訊,因為第一次全量拉取注冊清單之後緩存在本地,是以需要定時擷取 Eureka Server 的注冊清單資訊,進行更新
  • 心跳續約,需要定時向 Eureka Server 發送心跳以保證 服務執行個體 的健康
  • 按需注冊,需要定時或者在 執行個體狀态 發生改變時,重新注冊 對應的執行個體,以保證服務的 可用性

定時拉取注冊清單資訊

if (clientConfig.shouldFetchRegistry()) {
        // 拉取時間間隔,可由屬性 
        // eureka.client.registry-fetch-interval-seconds 設定
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        cacheRefreshTask = new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        );
        scheduler.schedule(
                cacheRefreshTask,
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
           

定時拉取注冊清單資訊,初始化對應的

cacheRefreshTask

後交由

scheduler

延時執行,對于 周期 執行并未由

scheduler

實作,而是委托給了

TimedSupervisorTask#run

@Override
    public void run() {
        Future<?> future = null;
        try {
        	// 任務執行
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
			
			// 結果擷取
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());

			// 成功計數
            successCounter.increment();
        } catch (...) {
            
            // 略
        
        } finally {
            if (future != null) {
                future.cancel(true);
            }

			// 此處再次将任務交給 scheduler,實作周期執行
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
           

拉取系統資料庫資訊的業務邏輯由 CacheRefreshThread 定義

class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

	refreshRegistry 略
           

心跳續約

// 續約周期,預設 30s
	int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
    int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
    logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

    heartbeatTask = new TimedSupervisorTask(
            "heartbeat",
            scheduler,
            heartbeatExecutor,
            renewalIntervalInSecs,
            TimeUnit.SECONDS,
            expBackOffBound,
            new HeartbeatThread()
    );
    scheduler.schedule(
            heartbeatTask,
            renewalIntervalInSecs, TimeUnit.SECONDS);
           

續約邏輯由 HeartbeatThread 提供

private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

	---------------------- renew ----------------------

	boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();

				// 對于 404 的執行個體,重新進行注冊
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }
           

請求的

url

apps/${APP_NAME}/${INSTANCE_INFO_ID}

,HTTP 方法為

put

,續約成功傳回

200

狀态碼

按需注冊

// 負責定時重新整理執行個體,檢測狀态按需注冊
    instanceInfoReplicator = new InstanceInfoReplicator(
            this,
            instanceInfo,
            clientConfig.getInstanceInfoReplicationIntervalSeconds(),
            2);

	// 監聽器,負責監聽執行個體的狀态,按需注冊
    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
        @Override
        public String getId() {
            return "statusChangeListener";
        }

        @Override
        public void notify(StatusChangeEvent statusChangeEvent) {

            // ...

			// 按需注冊
            instanceInfoReplicator.onDemandUpdate();
        }
    };

	// 注冊監聽器 statusChangeListener
    if (clientConfig.shouldOnDemandUpdateStatusChange()) {
        applicationInfoManager.registerStatusChangeListener(statusChangeListener);
    }

	// 定時任務啟動    
    instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
           

按需注冊 主要分兩部分

  • 啟動一個定時任務

    instanceInfoReplicator

    ,定時重新整理 服務執行個體 的資訊和檢查 應用狀态 的變化,在 服務執行個體 資訊發生變化的情況下向 Eureka Server 重新發起注冊
  • 注冊一個監聽器

    statusChangeListener

    ,在 應用狀态 發生變化向 Eureka Server 重新發起注冊

定時任務的執行邏輯由

InstanceInfoReplicator#run

提供

public void run() {
        try {
        	// 重新整理執行個體資訊
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
		
				// 如果發生更改,則重新注冊
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
        	// 周期執行
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
           

監聽器

statusChangeListener

監聽到事件 StatusChangeEvent,調用

instanceInfoReplicator#onDemandUpdate

public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            if (!scheduler.isShutdown()) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        
                        // ...
    
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
                // ...
            }
        } else {
            // ...
        }
    }
           

最終也是委托給了

InstanceInfoReplicator#run

服務下線

服務下線 交由 Spring 生命周期管理,DiscoveryClient 對象銷毀時會調用

unregister

方法進行服務下線,調用

url

apps/${APP_NAME}/${INSTANCE_INFO_ID}

,傳遞參數為服務名和服務執行個體

id

,HTTP 方法為

delete

總結

本章節結合部分源碼重點解讀了 DiscoveryClient 類,該類主要的業務邏輯:系統資料庫資訊拉取、服務注冊、心跳續約、按需注冊 等都在 構造方法 中實作,是以當我們引入對應的依賴并指定對應的 Eureka Server 時,就會注冊對應的服務到 注冊中心

上一篇:【源碼】Spring Cloud —— Eureka Client 1 核心元件

參考

《Spring Cloud 微服務架構進階》 —— 朱榮鑫 張天 黃迪璇

繼續閱讀