
前言
之前寫了幾篇Spring Cloud的小白教程,相信看過的朋友對Spring Cloud中的一些應用有了簡單的了解,寫小白篇的目的就是為初學者建立一個基本概念,讓初學者在學習的道路上建立一定的基礎。
從今天開始,我會持續更新幾篇Spring Cloud的進階教程。
Eureka簡介
Eureka是Netflix開發的服務發現架構,本身就是一個基于REST的服務。Spring Cloud将它內建在其子項目spring-cloud-netflix中,用來實作服務的注冊與發現功能。
Eureka總體架構圖
Eureka元件介紹
- 服務注冊中心叢集
分别部署在IDC1、IDC2、IDC3中心
- 服務提供者
服務提供者一個部署在IDC1,一個部署在IDC3
- 服務消費者
服務消費者一個部署在IDC1,一個部署在IDC2
元件之間的調用關系
- 啟動服務:服務提供者會向服務注冊中心發起Register請求,注冊服務。
- 運作過程中:服務提供者會定時向注冊中心發送Renew心跳,告訴它“我還活着”。
- 停止服務提供:服務提供者會向服務注冊中心發送Cancel請求,告訴它清空目前服務注冊資訊。
- 啟動後:從服務注冊中心拉取服務注冊資訊。
- 運作過程中:定時更新服務注冊資訊。
- 發起遠端調用:
- 服務消費者會從服務注冊中心選擇同機房的服務提供者,然後發起遠端調用,隻有同機房的服務提供者當機才會去選擇其他機房的服務提供者。
- 如果服務消費者發現同機房沒有服務提供者,則會按照負載均衡算法 選擇其他機房的服務提供者,然後發起遠端調用。
注冊中心
- 啟動後:從其他節點拉取服務注冊資訊
- 運作過程中:
- 定時運作Evict任務,定時清理沒有按時發送Renew的服務提供者,這裡的清理會将非常正常停止、網絡異常等其他因素引起的所有服務。
- 接收到的Register、Renew、Cancel請求,都會同步到其他的注冊中心節點。
Eureka Server會通過Register、Renew、Get Registry等接口提供服務的注冊、發現和心跳檢測等。
Eureka Client是一個java用戶端,用于簡化與Eureka Server的互動,用戶端本身也内置了負載均衡器(預設使用round-robin方式),在啟動後會向Eureka Server發送心跳檢測,預設周期為30s,Eureka Server如果在多個心跳周期内沒有接收到Eureka client的某一個節點的心跳請求,Eureka Server會從服務注冊中心清理到對應的Eureka Client的服務節點(預設90s)。
資料結構
服務存儲的資料結構可以簡單的了解為是一個兩層的HashMap結構(為了保證線程安全使用的ConcurrentHashMap),具體的我們可以檢視源碼中的AbstractInstanceRegistry類:
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
第一層ConcurrentHashMap的key=spring.application.name,也就是應用名稱,value為ConcurrentHashMap。
第二層ConcurrentHashMap的key=instanceId,也就是服務的唯一執行個體id,value為Lease對象,也就是具體的服務。Lease其實就是對InstanceInfo的包裝,裡面儲存了執行個體資訊、服務注冊的時間等。具體的我們可以檢視InstanceInfo源碼。
資料存儲過程
Eureka是通過REST接口對外提供服務的。
這裡我以注冊為例(ApplicationResource),首先将PeerAwareInstanceRegistry的執行個體注入到ApplicationResource的成員變量的registry裡。
- ApplicationResource接收到請求後,對調用registry.register()方法。
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
- AbstractInstanceRegistry在register方法裡完成對服務資訊的存儲。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
從源碼中不難看出存儲的資料結構是雙層的HashMap。
Eureka還實作了二級緩存來保證即将對外傳輸的服務資訊,
- 一級緩存:本質還是HashMap,沒有過期時間,儲存服務資訊的對外輸出的資料結構。
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
- 二級緩存:是guava的緩存,包含失效機制,儲存服務資訊的對外輸出的資料結構。
private final LoadingCache<Key, Value> readWriteCacheMap;
- 緩存的更新:
緩存的機制可以檢視ResponseCacheImpl源碼。
- 删除二級緩存:
- client端發送register、renew、cancel請求并更新register系統資料庫之後會删除二級緩存;
- server端自身的Evict任務剔除服務後會删除二級緩存;
- 二級緩存本事設定的失效機制(指的是guava實作的readWriteCacheMap),
- 加載二級緩存:
- client發送Get registry請求後,如果二級緩存中沒有,就會觸發guava的load機制,從registry中擷取原始的服務資訊後進行加工處理,然後放入二級緩存中;
- server端更新一級緩存的時候,如果二級緩存沒有資料也會觸發guava的load機制;
- 更新一級緩存:
- server端内置了一個time task會定時将二級緩存中的資料同步到一級緩存中,這其中包括了删除和更新。
Eureka的資料結構簡單總結為:
服務注冊機制
服務注冊中心、服務提供者、服務消費者在啟動後都會向服務注冊中心發起注冊服務的請求(前提是配置了注冊服務)。
注冊中心接到register請求後:
- 将服務資訊儲存到registry中;
- 更新隊列,将該事件添加到更新隊列中,給Eureka client增量同步服務資訊使用;
- 清空二級緩存,用于保證資料的一緻性;(即清空的是:readWriteCacheMap)
- 更新門檻值;
- 同步服務資訊;
服務續約
服務注冊後,要定時發送續約請求(心跳檢查),證明我還活着,不要清空我的服務資訊,定時時間預設30s,可以通過配置:eureka.instance.lease-renewal-interval-in-seconds來修改。
注冊中心接收到續約請求後(renew):
- 更新服務對象的最近續約時間(lastUpdateTimestamp);
- 将資訊同步給其他的節點;
服務登出
正常的服務停止之前會發送登出服務請求,通知注冊中心我要下線了。
注冊中心接收到登出請求後(cancel):
- 将服務資訊從registry中删除;
- 更新隊列;
- 清空二級緩存;
- 同步資訊給其他節點;
說明:隻有服務正常停止才會發送cancel請求,非正常停止的會通過Eureka Server的主動剔除機制進行删除。
服務剔除
服務剔除其實是一個兜底的方案,目的就是解決非正常情況下的服務當機或其他因素導緻不能發送cancel請求的服務資訊清理的政策。
服務剔除分為:
- 判斷剔除條件
- 找出過期服務
- 清理過期服務
剔除條件:
- 關閉自我保護
- 自我保護如果開啟,會先判斷是server還是client出現問題,如果是client的問題就會進行删除;
自我保護機制:Eureka的自我保護機制是為了防止誤殺服務提供的一種保護機制。Eureka的自我保護機制認為如果有大量的服務都續約失敗,則認為自己出現了問題(例如:自己斷網了),也就不剔除了。反之,則是它人的問題,就進行剔除。
自我保護的門檻值分為server和client,如果超出門檻值就是表示大量服務可用,部分服務不可用,這判定為client端出現問題。如果未超出門檻值就是表示大量服務不可用,則判定是自己出現了問題。
門檻值的計算:
- 自我保護門檻值 = 服務總數 每分鐘續約數 自我保護門檻值因子;
- 每分鐘續約數 = (60s / 用戶端續約時間);
過期服務:
找出過期服務會周遊所有的服務,判斷上次續約時間距離目前時間大于門檻值就标記為過期,同時會将這些過期的服務儲存的過期的服務集合中。
剔除服務:
剔除服務之前會先計算要是剔除的服務數量,然後周遊過期服務,通過洗牌算法確定每次都公平的選擇出要剔除的服務,然後進行剔除。
執行剔除服務後:
- 從register中删除服務資訊;
- 清空二級緩存,保證資料的一緻性;
服務擷取
Eureka Client服務的擷取都是從緩存中擷取,如果緩存中沒有,就加載資料到緩存中,然後在從緩存中取。服務的擷取方式分為全量同步和增量同步兩種。
registry中隻儲存資料結構,緩存中存ready的服務資訊
- 先讀取一級緩存
- 先判斷是否開啟一級緩存
- 如果開啟一級緩存,就從一級緩存中取,如果一級緩存中沒有,則從二級緩存中取;
- 如果沒有開啟一級緩存,則直接從二級緩存中取;
- 再讀取二級緩存
- 如果二級緩存中存在,則直接傳回;
- 如果二級緩存中不存在,則先将資料加載到二級緩存中,然後再讀取二級緩存中的資料。
注意:加載二級緩存的時候需要判斷是全量還是增量,如果是增量的話,就從recentlyChangedQueue中加載,如果是全量的話就從registry中加載。
服務同步
服務同步是Server節點之間的資料同步。分為啟動時同步,運作時同步。
- 啟動同步
啟動同步時,會先周遊Applications中擷取的服務資訊,并将服務資訊注冊到registry中。可以參考PeerAwareInstanceRegistryImpl類中的syncUp方法:注意這個方法使用類兩層for循環,第一次循環時保證自己已經拉取到服務資訊,第二層循環是周遊拉取到服務注冊資訊。public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }
- 運作時同步
server端當有reigster、renew、cancel請求進來時,會将這些請求封裝到一個task中,然後放到一個隊列當中,然後經過一系列的處理後,在放到另一個隊列中。 可以檢視PeerAwareInstanceRegistryImpl類中的BatchWorkerRunnable類,這裡就不再貼源碼了。
總結
Eureka的原理接介紹到這裡,從整體上看似簡單,但實作細節相關複雜。得多看幾遍源碼才能猜透他們的設計思路。
Eureka作為服務的注冊與發現,它實際的設計原則是遵循AP原則,也就是“資料的最終一緻性”。現在還有好多公司使用zk、nacos來作為服務的注冊中心,後續會簡單更新一篇關于服務注冊中心的對比,這裡就不過多闡述。
- 寫作不易,轉載請注明出處,喜歡的小夥伴可以關注公衆号檢視更多喜歡的文章。
- 聯系方式:[email protected]
- QQ:95472323