文章目錄
- 1.概述
-
- 1.1 Eureka的一些概念
- 2.源碼分析
-
- 2.1 Eureka Server源碼
-
- 2.1.1 `@EnableEurekaServer`注解
- 2.1.2 EurekaServerInitializerConfiguration
-
- 2.1.2.1 contextInitialized()
-
- 2.1.2.1.1 initEurekaEnvironment
- 2.1.2.1.2 initEurekaServerContext
-
- 2.1.2.1.2.1 registry.syncUp()
- 2.1.2.1.2.2 eurekaClient.getApplications()
- 2.1.2.1.2.3 fetchRegistry方法
- 2.2 Eureka Client源碼
- 2.3 續約
1.概述
要學會首先要使用:【SpringClould】SpringClould eureka 單機與叢集搭建
Spring Cloud Eureka分為Server端和Client端,Server端作為應用的注冊中心,Client端會向Server端注冊自己的服務。
源碼解析輕看,一定要看 Eureka源碼分析系列教程
1.1 Eureka的一些概念
在Eureka的服務治理中,會涉及到下面一些概念:
-
:Eureka Client會通過發送REST請求的方式向Eureka Server注冊自己的服務,提供自身的中繼資料,比如ip位址、端口、運作狀況名額的url、首頁位址等資訊。Eureka Server接收到注冊請求後,就會把這些中繼資料資訊存儲在一個雙層的Map中。服務注冊
-
:在服務注冊後,Eureka Client會維護一個服務續約
來持續通知Eureka Server,說明服務一直心跳
,防止被剔除。Eureka Client在預設的情況下會處于可用狀态
發送一次心跳來進行服務續約。每隔30秒
-
:Eureka Server之間會互相進行注冊,建構Eureka Server叢集,不同Eureka Server之間會進行服務同步,用來保證服務資訊的一緻性。服務同步
-
:服務消費者(Eureka Client)在啟動的時候,會發送一個REST請求給Eureka Server,擷取上面注冊的服務清單,并且擷取服務
,預設緩存在Eureka Client本地
。同時,為了性能考慮,緩存30秒
Eureka Server也會維護一份隻讀的服務清單緩存,該緩存每隔30秒更新一次。
-
:服務消費者在擷取到服務清單後,就可以根據清單中的服務清單資訊,查找到其他服務的位址,進而進行遠端調用。Eureka有Region和Zone的概念,一個Region可以包含多個Zone,在進行服務調用時,優先通路處于同一個Zone中的服務提供者。服務調用
-
:當Eureka Client需要關閉或重新開機時,就不希望在這個時間段内再有請求進來,是以,就需要提前先發送REST請求給Eureka Server,告訴Eureka Server自己要下線了,Eureka Server在收到請求後,就會把該服務狀态置為下線(DOWN),并把該下線事件傳播出去。服務下線
-
:有時候,服務執行個體可能會因為網絡故障等原因導緻不能提供服務,而此時該執行個體也沒有發送請求給Eureka Server來進行服務下線,是以,還需要有服務剔除的機制。Eureka Server在啟動的時候會建立一個定時任務,服務剔除
每隔一段時間(預設60秒),從目前服務清單中把逾時沒有續約(預設90秒)的服務剔除。
-
:既然Eureka Server會定時剔除逾時沒有續約的服務,那就有可能出現一種場景,網絡一段時間内發生了異常,所有的服務都沒能夠進行續約,Eureka Server就把所有的服務都剔除了,這樣顯然不太合理。是以,就有了自我保護機制,當短時間内,統計續約失敗的比例,如果達到一定門檻值,則會觸發自我保護的機制,自我保護
,等到正常後,再退出自我保護機制。在該機制下,Eureka Server不會剔除任何的微服務
從這些概念中,就可以知道大體的流程,Eureka Client向Eureka Server注冊,并且維護心跳來進行續約,如果長時間不續約,就會被剔除。Eureka Server之間進行資料同步來形成叢集,Eureka Client從Eureka Server擷取服務清單,用來進行服務調用,Eureka Client服務重新開機前調用Eureka Server的接口進行下線操作。
2.源碼分析
2.1 Eureka Server源碼
我們在server端加入了注解後
@SpringBootApplication
@EnableEurekaServer
@EnableDiscoveryClient
public class EurekaServerNodeApplication {
public static void main(String[] args) {
System.out.println("xx");
SpringApplication.run(EurekaServerNodeApplication.class, args);
}
}
然後這個注解對應代碼如下
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
/**
* Annotation to activate Eureka Server related configuration {@link EurekaServerAutoConfiguration}
*
* @author Dave Syer
* @author Biju Kunjummen
*
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
2.1.1 @EnableEurekaServer
注解
@EnableEurekaServer
@Configuration
public class EurekaServerMarkerConfiguration {
@Bean
public Marker eurekaServerMarkerBean() {
return new Marker();
}
class Marker {
}
}
spring-cloud-netflix-eureka-server.jar>spring.factories//spring boot
會自動注入
内容
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
@Configuration
@Import({EurekaServerInitializerConfiguration.class})
@ConditionalOnBean({Marker.class})
@EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})
@PropertySource({"classpath:/eureka/server.properties"})
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
//@Bean,向spring容器中注入一堆bean
}
2.1.2 EurekaServerInitializerConfiguration
EurekaServerInitializerConfiguration.start()
@Override
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
2.1.2.1 contextInitialized()
//初始化
public void contextInitialized(ServletContext context) {
try {
//初始化運作環境
this.initEurekaEnvironment();
//初始化eureka
this.initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
} catch (Throwable var3) {
log.error("Cannot bootstrap eureka server :", var3);
throw new RuntimeException("Cannot bootstrap eureka server :", var3);
}
}
2.1.2.1.1 initEurekaEnvironment
protected void initEurekaEnvironment() throws Exception {
log.info("Setting the eureka configuration..");
String dataCenter = ConfigurationManager.getConfigInstance()
.getString(EUREKA_DATACENTER);
if (dataCenter == null) {
log.info(
"Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
String environment = ConfigurationManager.getConfigInstance()
.getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
log.info(
"Eureka environment value eureka.environment is not set, defaulting to test");
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
}
}
2.1.2.1.2 initEurekaServerContext
protected void initEurekaServerContext() throws Exception {
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
if (this.isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
//從其他eureka節點同步資料
int registryCount = this.registry.syncUp();
//剔除無效節點
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
EurekaMonitors.registerAllStats();
}
2.1.2.1.2.1 registry.syncUp()
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
//預設擷取5次,前提條件是開啟注冊到eureka
//開啟擷取注冊資訊時,才會從其他節點擷取
//registerWithEureka與fetchRegistry都要配置成true
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
//擷取系統資料庫資訊
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
//将其他節點上的執行個體注冊到目前服務中
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
2.1.2.1.2.2 eurekaClient.getApplications()
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
...
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
//定時任務
//定時更新注冊服務
//定時續約
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
//續約任務
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//更新系統資料庫
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
...
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
//擷取注冊資訊
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
...
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
//初始化定時任務
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
}
2.1.2.1.2.3 fetchRegistry方法
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()//是否禁用增量
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)//系統資料庫為null
|| (applications.getRegisteredApplications().size() == 0)//系統資料庫裡的節點資料
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
//全量擷取
getAndStoreFullRegistry();
} else {
//增量擷取
getAndUpdateDelta(applications);
}
//更新hashcode
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
getAndStoreFullRegistry 全量擷取(http請求擷取)
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
getAndUpdateDelta 增量擷取(增量擷取)
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
//擷取增量資料
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
//如果增量資料沒有,擷取全量資料
if (delta == null) {
getAndStoreFullRegistry();
}
//cas修改更新次數
else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
String reconcileHashCode = "";
//加鎖
if (fetchRegistryUpdateLock.tryLock()) {
try {//更新增量節點
updateDelta(delta);
//擷取hashcode
reconcileHashCode = getReconcileHashCode(applications);
} finally {
//解鎖
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
//與服務端節點比較,判斷節點是否相同,不相同,做全量更新
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
}
}
Jersey提供服務,擷取所有服務
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
...
//擷取key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
//擷取資料responseCache.getGZIP(cacheKey)
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
responseCache.getGZIP(cacheKey)
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
...
//擷取key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
//擷取資料responseCache.getGZIP(cacheKey)
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
三級緩存定義
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
...
//擷取key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
//擷取資料responseCache.getGZIP(cacheKey)
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
下面再看下Eureka Server方面的源碼,主要代碼都在
com.netflix.eureka:eureka-core-xxx.jar
包下。
先看下這個包下的
EurekaBootStrap
類,這個類實作了
ServletContextListener
接口,在 Servlet API 中有一個
ServletContextListener
接口,它能夠監聽
ServletContext
對象的生命周期,當Servlet 容器啟動或終止Web 應用時,會觸發
ServletContextEvent
事件,該事件由
ServletContextListener
來處理。在
ServletContextListener
接口中定義了處理ServletContextEvent 事件的兩個方法:
contextInitialized和contextDestroyed
。
EurekaBootStrap類中實作了這兩個方法,在容器初始化的時候,就會執行這個類中的方法。
@Override
public void contextInitialized(ServletContextEvent event) {
try {
initEurekaEnvironment();
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
在contextInitialized中,會初始化
EurekaEnvironment
和
EurekaServerContext
,進入
EurekaServerContext
方法,可以看到,在方法中,建立了幾個類,
PeerAwareInstanceRegistryImpl
和
PeerEurekaNodes
等。
Eureka Server會接收Eureka Client發送的REST請求,進行服務的注冊,續約,下線等操作,這部分代碼在
com.netflix.eureka:eureka-core-xx.jar
包的resources目錄下。
在resources目錄下有個
ApplicationResource
類,類中有個方法,addInstance,這個方法就是接收注冊服務請求的,下面看下這個方法:
com.netflix.eureka.resources.ApplicationResource#addInstance
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
這個方法接收一個
InstanceInfo info
參數,這個參數就是要注冊的Eureka Client節點的資訊,在對這個
InstanceInfo
資訊進行了一連串的校驗之後,會調用
registry.register(info, “true”.equals(isReplication))
這個方法,進行服務注冊,再進入這個方法看下:
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
這個方法就是在EurekaBootStrap中初始化的
PeerAwareInstanceRegistryImpl
類中的方法,在方法中,會擷取
InstanceInfo
的續約時間資訊,預設是90秒。然後調用父類的register方法注冊,注冊完後,會調用replicateToPeers方法,把這個節點的注冊資訊告訴其它Eureka Server節點。
先看下父類的register方法:
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo r, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(r.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap =
new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(r.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(r.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = r.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is " +
"greater than the one that is being registered {}",
existingLastDirtyTimestamp,
registrationLastDirtyTimestamp);
r.setLastDirtyTimestamp(existingLastDirtyTimestamp);
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(r, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(r.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
r.getAppName() + "(" + r.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(r.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", r.getOverriddenStatus(), r.getId());
if (!overriddenInstanceStatusMap.containsKey(r.getId())) {
logger.info("Not found overridden id {} and hence adding it", r.getId());
overriddenInstanceStatusMap.put(r.getId(), r.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(r.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
r.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(r, existingLease, isReplication);
r.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(r.getStatus())) {
lease.serviceUp();
}
r.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
r.setLastUpdatedTimestamp();
invalidateCache(r.getAppName(), r.getVIPAddress(), r.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
r.getAppName(), r.getId(), r.getStatus(), isReplication);
} finally {
read.unlock();
}
}
這個方法挺長的,有興趣的可以仔細看下,方法功能大體還是注冊資訊了,注冊的資訊會存放在map中,而且還是個兩層的
ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
,外層map的key是appName,也就是服務名,内層map的key是instanceId,也就是執行個體名。更新完map資訊後,還會更新緩存資訊。
注冊完資訊後,調用了replicateToPeers方法,向其他Eureka Server轉發該注冊資訊,以便實作資訊的同步。進到這個方法裡面看下:
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMe(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
方法中會通過for循環周遊所有的PeerEurekaNode,調用replicateInstanceActionsToPeers方法,把資訊複制給其他的Eureka Server節點,下面是replicateInstanceActionsToPeers方法:
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
方法中,會判斷action具體的動作,如果是Register,就會調用node.register(info);
/**
* Sends the registration information of {@link InstanceInfo} receiving by
* this node to the peer node represented by this class.
*
* @param info
* the instance information {@link InstanceInfo} of any instance
* that is send to this instance.
* @throws Exception
*/
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
在該方法中,是通過啟動了一個任務,來向其它節點同步資訊的,不是實時同步的。
2.2 Eureka Client源碼
先從服務注冊開始梳理,Eureka Client啟動的時候就去Eureka Server注冊服務。通過在啟動類上添加
@EnableDiscoveryClient
這個注解,來聲明這是一個Eureka Client。是以,先看下這個注解:
/**
* Annotation to enable a DiscoveryClient implementation.
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
}
這個注解上方有注釋,
說這個注解是為了開啟一個DiscoveryClient的執行個體
。
接下來就可以搜尋
DiscoveryClient
,可以發現有一個類,還有一個接口,檢視這個類的關系圖:
這個類實作了EurekaClient接口,而EurekaClient又繼承了
LookupService
接口。這兩個接口都是
Netflix
開源包中的内容,主要定義了針對Eureka發現服務的抽象方法。是以,DiscoveryClient類主要就是發現服務的。
接下來,就詳細看下DiscoveryClient類,類上面的注釋,說明了這個類是用來幫助和Eureka Server互相協作的,可以進行服務注冊,服務續約,服務下線,擷取服務清單。需要配置一個
Eureka Server的Url
清單。
上面提到的這個清單,就是我們在配置檔案中配置的
eureka.client.service-url.defaultZone
這一選項,這個位址就是Eureka Server的位址,服務注冊、服務續約以及其他的操作,都是向這個位址發送請求的。
在DiscoveryClient類中可以看到有很多方法,包括
register()、renew()、shutdown()、unregister()
等。
既然Eureka Client需要一開始先初始化DiscoveryClient執行個體,那就看下DiscoveryClient的構造方法。
DiscoveryClient的構造方法還是挺長的,裡面初始化了一大堆的對象,不過可以觀察到在new了這麼一大堆對象之後,調用了
initScheduledTasks();
這個方法,是以,點進
initScheduledTasks()
方法裡面看下。
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
。。。。。
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
....
}
内容如下
在initScheduledTasks方法中,初始化了幾個任務。
一開始有個if判斷,判斷是否需要從Eureka Server擷取資料,如果為真,則初始化一個服務擷取的定時任務。
還有有個
if (clientConfig.shouldRegisterWithEureka())
的判斷,是以,當Eureka Client配置這個為
true
時,就會執行這個if語句裡面的邏輯。if語句中,會初始化一個
Heartbeat timer
和
InstanceInfoReplicator。Heartbeat timer
就是不斷的發送請求來維持心跳的,也就是服務續約的任務。而
InstanceInfoReplicator
類實作了Runnable接口,是以需要看下
InstanceInfoReplicator
類中的run方法。
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
在run方法中,會調用我們之前看到的
discoveryClient.register()
方法進行服務注冊。
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
register()
方法,就會把
instanceInfo
資訊,通過REST請求發送給Eureka Server。
instanceInfo
就是客服端服務的中繼資料。
是以,在initScheduledTasks方法中,做了三個操作,
向Eureka Server注冊服務,并且在條件滿足的情況下,建立服務擷取和服務續約兩個定時任務。
我們在Eureka Client的配置檔案中還配置了
eureka.client.service-url.defaultZone
這個位址,是以,在DiscoveryClient類中找一下serviceUrl這個關鍵字,可以看到有相應的方法:
/**
* @deprecated use {@link #getServiceUrlsFromConfig(String, boolean)} instead.
*/
@Deprecated
public static List<String> getEurekaServiceUrlsFromConfig(String instanceZone, boolean preferSameZone) {
return EndpointUtils.getServiceUrlsFromConfig(staticClientConfig, instanceZone, preferSameZone);
}
方法最終調用了EndpointUtils.getServiceUrlsFromConfig,點進這個方法看下:
/**
* Get the list of all eureka service urls from properties file for the eureka client to talk to.
*
* @param clientConfig the clientConfig to use
* @param instanceZone The zone in which the client resides
* @param preferSameZone true if we have to prefer the same zone as the client, false otherwise
* @return The list of all eureka service urls for the eureka client to talk to
*/
public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
List<String> orderedUrls = new ArrayList<String>();
String region = getRegion(clientConfig);
String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
if (availZones == null || availZones.length == 0) {
availZones = new String[1];
availZones[0] = DEFAULT_ZONE;
}
logger.debug("The availability zone for the given region {} are {}", region, Arrays.toString(availZones));
int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);
if (serviceUrls != null) {
orderedUrls.addAll(serviceUrls);
}
int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
while (currentOffset != myZoneOffset) {
serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]);
if (serviceUrls != null) {
orderedUrls.addAll(serviceUrls);
}
if (currentOffset == (availZones.length - 1)) {
currentOffset = 0;
} else {
currentOffset++;
}
}
if (orderedUrls.size() < 1) {
throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
}
return orderedUrls;
}
在方法中,先擷取了該項目配置的
Region
,又根據region擷取了可用的zone清單。這裡可以看到,項目的
region
隻能屬于一個,
一個region下可以配置多個zone
。
再通過getZoneOffset方法,從多個zone中選擇對應的一個下标,根據這個zone來加載這個zone下的serviceUrls。
2.3 續約
DiscoveryClient的renew方法是服務續約,預設是30s一次
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
參考:https://blog.csdn.net/u010883443/article/details/108470758
https://segmentfault.com/a/1190000011668299