天天看點

Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊

Eureka Client 源碼解析 擷取系統資料庫、用戶端注冊

1. EurekaClient 構造器中的流程:

接下來我們準備開始真正分析Eureka Client 的源碼,上一章我們分析了Eureka Client的自動配置類都加載了哪些東西,其中最為核心的就是EurekaClient:

Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊

先簡單看一下CloudEurekaClient構造器中的流程,大概看一下都做了哪些事,接下來我們會一一分析:

Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊

2. EurekaClient 構造器跟蹤

入口:

Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊

跟CloudEurekaClient的構造:

//CloudEurekaClient.java
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
		EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
		ApplicationEventPublisher publisher) {
	//繼續跟super方法 
	//CloudEurekaClient 繼承自 DiscoveryClient
	super(applicationInfoManager, config, args);
	this.applicationInfoManager = applicationInfoManager;
	this.publisher = publisher;
	this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
			"eurekaTransport");
	ReflectionUtils.makeAccessible(this.eurekaTransportField);
}

//DiscoveryClient.java
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
	//繼續跟this方法
    this(applicationInfoManager, config, args, ResolverUtils::randomize);
}

//DiscoveryClient.java
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
	//此時一共5個參數
	//第四個參數是個匿名内部類執行個體,主要是提供一個備用系統資料庫的功能(當遠端系統資料庫擷取失敗的時候)
    this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
        private volatile BackupRegistry backupRegistryInstance;

        @Override
        //傳回BackupRegistry,備份系統資料庫
        //Eureka是AP的,高可用,當Eureka Server全部挂掉了,這個備份系統資料庫執行個體就起作用了
        //當然這個備份系統資料庫需要自己實作并配置才能用
        public synchronized BackupRegistry get() {
            if (backupRegistryInstance == null) {
                String backupRegistryClassName = config.getBackupRegistryImpl();
                if (null != backupRegistryClassName) {
                    try {
                        backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                        logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                    } catch (InstantiationException e) {
                        logger.error("Error instantiating BackupRegistry.", e);
                    } catch (IllegalAccessException e) {
                        logger.error("Error instantiating BackupRegistry.", e);
                    } catch (ClassNotFoundException e) {
                        logger.error("Error instantiating BackupRegistry.", e);
                    }
                }

                if (backupRegistryInstance == null) {
                    logger.warn("Using default backup registry implementation which does not do anything.");
                    backupRegistryInstance = new NotImplementedRegistryImpl();
                }
            }

            return backupRegistryInstance;
        }
    }, randomizer);
}
           

繼續跟this構造,這個方法很長,我們隻關注最核心的:

//DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
	
	...//省略了很多代碼

	//這裡fetchRegistry方法做的事情就是我們第一個需要關注的任務
	//擷取系統資料庫
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

    // call and execute the pre registration handler before all background tasks (inc registration) is started
    if (this.preRegistrationHandler != null) {
        this.preRegistrationHandler.beforeRegistration();
    }

    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
	        //第二個任務,register方法,進行注冊
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    //第三個任務,初始化定時任務
    initScheduledTasks();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());
}
           
Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊

3. 擷取用戶端系統資料庫

Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊
Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊
Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊

3.1 擷取備用系統資料庫:

現在看擷取系統資料庫的方法fetchRegistry,該方法傳回值true/false代表是否擷取成功

先看擷取失敗的情況,如果需要擷取,但擷取失敗,就會擷取備用的系統資料庫:

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
	//如果應該擷取,但擷取失敗,就會擷取備用的系統資料庫
    fetchRegistryFromBackup();
}

//DiscoveryClient.java
private void fetchRegistryFromBackup() {
    try {
    	//先直接擷取備用系統資料庫執行個體,預設傳回null,除非子類重寫該方法
        @SuppressWarnings("deprecation")
        BackupRegistry backupRegistryInstance = newBackupRegistryInstance();
        if (null == backupRegistryInstance) { // backward compatibility with the old protected method, in case it is being used.
        
	        //這個backupRegistryProvider就是之前構造中傳入的匿名内部類
	        //備用系統資料庫的提供者,通過提供者擷取備用系統資料庫執行個體
	        //沒有專門實作,這裡傳回的就是NotImplementedRegistryImpl(之前構造中可以看到)
            backupRegistryInstance = backupRegistryProvider.get();
        }

        if (null != backupRegistryInstance) {
	        //如果這個備用系統資料庫執行個體不為空
            Applications apps = null;
            if (isFetchingRemoteRegionRegistries()) {//判斷是否可以從遠端region擷取
	            //擷取配置的遠端region清單
                String remoteRegionsStr = remoteRegionsToFetch.get();
                if (null != remoteRegionsStr) {
	                //從遠端region擷取系統資料庫,apps就是Applications
	                //Applications就是系統資料庫,map結構,key是微服務名稱,value是Application
	                //Application也是map結構,key是InstanceInfo的Id,value是InstanceInfo
	                //一個InstanceInfo對應的就是一個主機資訊
                    apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(","));
                }
            } else {
	            //不可以從遠端region擷取,則擷取本地備用系統資料庫
                apps = backupRegistryInstance.fetchRegistry();
            }
            if (apps != null) {
	            //如果apps不空,則将apps打散
                final Applications applications = this.filterAndShuffle(apps);
                applications.setAppsHashCode(applications.getReconcileHashCode());
                //将shuffle過的applications放到本地緩存
                localRegionApps.set(applications);
                logTotalInstances();
                logger.info("Fetched registry successfully from the backup");
            }
        } else {
            logger.warn("No backup registry instance defined & unable to find any discovery servers.");
        }
    } catch (Throwable e) {
        logger.warn("Cannot fetch applications from apps although backup registry was specified", e);
    }
}
           
判斷是否可以從遠端region擷取:
Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊
該值我們可以通過配置檔案進行配置,指定擷取eureka系統資料庫資訊的遠端region清單:
Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊
備用系統資料庫Provider是之前構造中傳入的一個匿名内部類:
Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊

3.2 從注冊中心擷取系統資料庫

現在看從注冊中心擷取系統資料庫的具體實作:

Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊
//DiscoveryClient.java
/**
 * Fetches the registry information.
 *
 * <p>
 * This method tries to get only deltas after the first fetch unless there
 * is an issue in reconciling eureka server and client registry information.
 * 除非在協調eureka伺服器和用戶端系統資料庫資訊時出現問題,否則此方法嘗試在第一次擷取後隻擷取delta。
 * 
 * 翻譯一下:第一次是全量下載下傳,後面就是增量下載下傳
 * </p>
 *
 * @param forceFullRegistryFetch Forces a full registry fetch. 是否強制全量下載下傳
 *
 * @return true if the registry was fetched 擷取成功傳回true,否則false
 */
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
	//這裡入參forceFullRegistryFetch,代表含義是:true則代表強制全量下載下傳
	//false就是有可能全量,也有可能是增量擷取,視情況而定。
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        // 先擷取目前本地緩存的系統資料庫資訊,剛啟動這裡肯定擷取為空
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
        	//隻要滿足以上各種情況的任何一個條件,就會進行全量下載下傳:
        	//	配置檔案中關閉了增量下載下傳
        	//	配置了VIP的系統資料庫位址
        	//	強制進行全量下載下傳
        	//	本地緩存的系統資料庫資訊為空
        	
			...//省略了各種info日志列印
			
            // 全量擷取,擷取存儲全部系統資料庫資訊
            getAndStoreFullRegistry();
        } else {
	        // 增量擷取,擷取更新增量資料
	        //(所謂增量下載下傳其實是擷取服務端的最近更新隊列的資料,以後會看到)
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}
           

3.2.1 全量下載下傳

看getAndStoreFullRegistry方法:

//DiscoveryClient.java
private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
    //先嘗試擷取registry-refresh-single-vip-address配置
    //如果不為null走的是vip位址擷取
    //為null,則走預設的
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
	    //擷取響應體,就是Applications
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
	    //将系統資料庫資訊打亂放入本地緩存
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}
           
registry-refresh-single-vip-address配置:可以專門指定位址進行系統資料庫下載下傳更新
Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊

這裡我們沒有指定vip位址,是以走eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()):

Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊

可以看到實作中,通過Jersey 架構送出了get請求:

//AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
    return getApplicationsInternal("apps/", regions);
}

//AbstractJerseyEurekaHttpClient.java
//可以看到實作中,通過Jersey 架構送出了get請求
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
	    //SpringMVC的處理器是Controller,Jersey架構的處理器是Resource
	    //這裡WebResource就是代表一個web資源
        WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
        if (regions != null && regions.length > 0) {
            regionsParamValue = StringUtil.join(regions);
            //從這個web資源建立一個新的WebResource,并将一個附加的查詢參數添加到這個web資源的URI中。
	        //建構查詢參數到URI中
            webResource = webResource.queryParam("regions", regionsParamValue);
        }
        //擷取請求建構者
        Builder requestBuilder = webResource.getRequestBuilder();
        //添加請求頭,用戶端來說,沒有特殊配置,這裡不會添加任何請求頭
        addExtraHeaders(requestBuilder);
        //指定接受json資料格式,并發起get請求
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

        Applications applications = null;
        if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
	        //擷取響應,将響應體轉成 系統資料庫的資料結構
            applications = response.getEntity(Applications.class);
        }
        // 将結果封裝成需要的資料結構傳回
        return anEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                    serviceUrl, urlPath,
                    regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                    response == null ? "N/A" : response.getStatus()
            );
        }
        if (response != null) {
            response.close();
        }
    }
}
           

看requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class)的get方法:

//WebResource.java
@Override
public <T> T get(Class<T> c) throws UniformInterfaceException, ClientHandlerException {
	//看到這裡送出的就是"GET"請求,下面就是Jersey 架構的東西了
	//當然還涉及到一些url的處理,就不跟了
    return handle(c, build("GET"));
}
           

請求發了,至于Eureka Server端怎麼處理并響應,等将Eureka Server時在講,看到全量下載下傳還是比較簡單的。

3.2.2 增量下載下傳

下一章講Eureka Client端的一些定時任務,其中定時更新系統資料庫的任務也會用到增量下載下傳,下一章說。

4. 用戶端注冊

現在看用戶端注冊:

Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊
  • shouldRegisterWithEureka

    指定是否需要向Eureka進行注冊,該配置預設為true,一般Server端會設定為false

    Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊
  • shouldEnforceRegistrationAtInit

    在初始化時候是否注冊,預設false

    Spring Cloud Eureka Client 源碼解析(二)擷取系統資料庫、用戶端注冊1. EurekaClient 構造器中的流程:2. EurekaClient 構造器跟蹤3. 擷取用戶端系統資料庫4. 用戶端注冊

可以看到預設情況下這裡的注冊方法不會執行,即初始化的時候不會進行注冊,那麼這裡不注冊,在哪裡注冊呢?其實是在定時任務裡注冊的(心跳續約),但是注冊邏輯是一樣的,是以這裡我們先分析這個方法:

PS:從代碼中可以看到,如果設定為初始化時注冊,會一個問題:如果注冊失敗了會抛異常,導緻整個用戶端啟動就失敗了,也不會啟動後面的定時任務,如果不強制初始化時進行注冊,會通過心跳續約的定時任務去注冊,即使注冊失敗了也不影響用戶端啟動,并會定時多次嘗試進行注冊。

看register方法:

//DiscoveryClient.java
/**
 * Register with the eureka service by making the appropriate REST call.
 */
boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
	    //将目前instanceInfo(主機資訊)作為參數進行注冊
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    //204(No Content), 表示執行成功, 但是沒有資料,
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
           

繼續看eurekaTransport.registrationClient.register方法,可以看到通過Jersey 架構送出post注冊請求:

//AbstractJerseyEurekaHttpClient.java
public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
	    //通過Jersey 架構送出post注冊請求
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                //攜帶的參數就是InstanceInfo
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                    response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

//WebResource.java
@Override
public <T> T post(Class<T> c, Object requestEntity) throws UniformInterfaceException, ClientHandlerException {
    return handle(c, build("POST", requestEntity));
}
           

可以看到注冊也是比較簡單的。

繼續閱讀