【源碼】Spring Cloud —— Eureka Client 2 DiscoveryClient
- 前言
- 版本
- DiscoveryClient
-
- 系統資料庫資訊拉取
- 服務注冊
- 定時任務
-
- 定時拉取注冊清單資訊
- 心跳續約
- 按需注冊
- 服務下線
- 總結
- 參考
前言
上一章節對 Spring Cloud Netflix Eureka Client 提供的 核心元件類 做了大緻的了解,本章節結合部分源碼解讀核心類 DiscoveryClient
該系列章節需要相關的内容做鋪墊,傳送門:
關于延時、周期任務排程 —— ScheduledExecutorService ScheduledThreadPoolExecutor
版本
Spring Cloud Netflix 版本:2.2.3.RELEASE
對應 Netflix-Eureka 版本:1.9.21
DiscoveryClient
DiscoveryClient 的核心業務邏輯發生在 構造方法 中,也就是說,在啟動 Spring 應用注冊對應的 Bean 時同時注冊對應的 服務執行個體
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
// 略
return;
}
屬性
shouldRegisterWithEureka
和
shouldFetchRegistry
為
false
時,不進行 服務注冊、發現,即對應我們的配置項
eureka.client.register-with-eureka
和
eureka.client.fetch-register
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
);
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
);
如果允許 服務注冊、發現,則執行個體化用于 發送心跳、重新整理緩存 相關的 線程池
eurekaTransport = new EurekaTransport();
// 建構
scheduleServerEndpointTask(eurekaTransport, args);
EurekaTransport 封裝了 DiscoveryClient 與 Eureka Server 進行 HTTP 調用的 Jersey 用戶端
系統資料庫資訊拉取
// 允許拉取系統資料庫資訊,則進行拉取
// 如果拉取失敗,則調用 fetchRegistryFromBackup
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
如果允許系統資料庫資訊拉取,則調用
fetchRegistry
方法拉取,拉取失敗則調用交由成員屬性
backupRegistryProvider
擷取,其預設實作為 NotImplementedRegisteryImpl,即沒有實作
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
// 全量拉取
getAndStoreFullRegistry();
} else {
// 增量拉取
getAndUpdateDelta(applications);
}
// hash 碼計算
applications.setAppsHashCode(applications.getReconcileHashCode());
// 列印執行個體數
logTotalInstances();
} catch (Throwable e) {
// ...
}
// ...
return true;
}
- 全量拉取:第一次拉取時,必然是 全量拉取 并緩存到本地,請求
為url
,拉取的執行個體經過過濾、調整後緩存在eureka/apps
屬性localRegionApps
- 增量拉取:之後的拉取則隻針對一段時間内發生的變更資訊,并根據變更的狀态進行 增量式更新,請求
為url
/eureka/app/delta
服務注冊
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
// 注冊
if (!register() ) {
// 注冊失敗
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
注冊方法為
register
,PS:這種在判斷條件中執行邏輯的寫法,在 JUC 源碼中十分常用
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
請求
url
為
/eureka/${APP_NAME}
,傳遞參數為 InstanceInfo,如果伺服器傳回
204
狀态碼,則表明注冊成功
定時任務
// 初始化定時任務
initScheduledTasks();
定時任務包括
- 定時拉取注冊清單資訊,因為第一次全量拉取注冊清單之後緩存在本地,是以需要定時擷取 Eureka Server 的注冊清單資訊,進行更新
- 心跳續約,需要定時向 Eureka Server 發送心跳以保證 服務執行個體 的健康
- 按需注冊,需要定時或者在 執行個體狀态 發生改變時,重新注冊 對應的執行個體,以保證服務的 可用性
定時拉取注冊清單資訊
if (clientConfig.shouldFetchRegistry()) {
// 拉取時間間隔,可由屬性
// eureka.client.registry-fetch-interval-seconds 設定
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
定時拉取注冊清單資訊,初始化對應的
cacheRefreshTask
後交由
scheduler
延時執行,對于 周期 執行并未由
scheduler
實作,而是委托給了
TimedSupervisorTask#run
@Override
public void run() {
Future<?> future = null;
try {
// 任務執行
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
// 結果擷取
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
// 成功計數
successCounter.increment();
} catch (...) {
// 略
} finally {
if (future != null) {
future.cancel(true);
}
// 此處再次将任務交給 scheduler,實作周期執行
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
拉取系統資料庫資訊的業務邏輯由 CacheRefreshThread 定義
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
refreshRegistry 略
心跳續約
// 續約周期,預設 30s
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
續約邏輯由 HeartbeatThread 提供
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
---------------------- renew ----------------------
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
// 對于 404 的執行個體,重新進行注冊
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
請求的
url
為
apps/${APP_NAME}/${INSTANCE_INFO_ID}
,HTTP 方法為
put
,續約成功傳回
200
狀态碼
按需注冊
// 負責定時重新整理執行個體,檢測狀态按需注冊
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2);
// 監聽器,負責監聽執行個體的狀态,按需注冊
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
// ...
// 按需注冊
instanceInfoReplicator.onDemandUpdate();
}
};
// 注冊監聽器 statusChangeListener
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 定時任務啟動
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
按需注冊 主要分兩部分
- 啟動一個定時任務
,定時重新整理 服務執行個體 的資訊和檢查 應用狀态 的變化,在 服務執行個體 資訊發生變化的情況下向 Eureka Server 重新發起注冊instanceInfoReplicator
- 注冊一個監聽器
,在 應用狀态 發生變化向 Eureka Server 重新發起注冊statusChangeListener
定時任務的執行邏輯由
InstanceInfoReplicator#run
提供
public void run() {
try {
// 重新整理執行個體資訊
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 如果發生更改,則重新注冊
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// 周期執行
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
監聽器
statusChangeListener
監聽到事件 StatusChangeEvent,調用
instanceInfoReplicator#onDemandUpdate
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
// ...
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
// ...
}
} else {
// ...
}
}
最終也是委托給了
InstanceInfoReplicator#run
服務下線
服務下線 交由 Spring 生命周期管理,DiscoveryClient 對象銷毀時會調用
unregister
方法進行服務下線,調用
url
為
apps/${APP_NAME}/${INSTANCE_INFO_ID}
,傳遞參數為服務名和服務執行個體
id
,HTTP 方法為
delete
總結
本章節結合部分源碼重點解讀了 DiscoveryClient 類,該類主要的業務邏輯:系統資料庫資訊拉取、服務注冊、心跳續約、按需注冊 等都在 構造方法 中實作,是以當我們引入對應的依賴并指定對應的 Eureka Server 時,就會注冊對應的服務到 注冊中心
上一篇:【源碼】Spring Cloud —— Eureka Client 1 核心元件
參考
《Spring Cloud 微服務架構進階》 —— 朱榮鑫 張天 黃迪璇