天天看點

【SpringClould】Spring Cloud Eureka源碼分析1.概述2.源碼分析

文章目錄

  • 1.概述
    • 1.1 Eureka的一些概念
  • 2.源碼分析
    • 2.1 Eureka Server源碼
      • 2.1.1 `@EnableEurekaServer`注解
      • 2.1.2 EurekaServerInitializerConfiguration
        • 2.1.2.1 contextInitialized()
          • 2.1.2.1.1 initEurekaEnvironment
          • 2.1.2.1.2 initEurekaServerContext
            • 2.1.2.1.2.1 registry.syncUp()
            • 2.1.2.1.2.2 eurekaClient.getApplications()
            • 2.1.2.1.2.3 fetchRegistry方法
    • 2.2 Eureka Client源碼
    • 2.3 續約
【SpringClould】Spring Cloud Eureka源碼分析1.概述2.源碼分析

1.概述

要學會首先要使用:【SpringClould】SpringClould eureka 單機與叢集搭建

Spring Cloud Eureka分為Server端和Client端,Server端作為應用的注冊中心,Client端會向Server端注冊自己的服務。

源碼解析輕看,一定要看 Eureka源碼分析系列教程

1.1 Eureka的一些概念

在Eureka的服務治理中,會涉及到下面一些概念:

  1. 服務注冊

    :Eureka Client會通過發送REST請求的方式向Eureka Server注冊自己的服務,提供自身的中繼資料,比如ip位址、端口、運作狀況名額的url、首頁位址等資訊。Eureka Server接收到注冊請求後,就會把這些中繼資料資訊存儲在一個雙層的Map中。
  2. 服務續約

    :在服務注冊後,Eureka Client會維護一個

    心跳

    來持續通知Eureka Server,說明服務一直

    處于可用狀态

    ,防止被剔除。Eureka Client在預設的情況下會

    每隔30秒

    發送一次心跳來進行服務續約。
  3. 服務同步

    :Eureka Server之間會互相進行注冊,建構Eureka Server叢集,不同Eureka Server之間會進行服務同步,用來保證服務資訊的一緻性。
  4. 擷取服務

    :服務消費者(Eureka Client)在啟動的時候,會發送一個REST請求給Eureka Server,擷取上面注冊的服務清單,并且

    緩存在Eureka Client本地

    ,預設

    緩存30秒

    。同時,為了性能考慮,

    Eureka Server也會維護一份隻讀的服務清單緩存,該緩存每隔30秒更新一次。

  5. 服務調用

    :服務消費者在擷取到服務清單後,就可以根據清單中的服務清單資訊,查找到其他服務的位址,進而進行遠端調用。Eureka有Region和Zone的概念,一個Region可以包含多個Zone,在進行服務調用時,優先通路處于同一個Zone中的服務提供者。
  6. 服務下線

    :當Eureka Client需要關閉或重新開機時,就不希望在這個時間段内再有請求進來,是以,就需要提前先發送REST請求給Eureka Server,告訴Eureka Server自己要下線了,Eureka Server在收到請求後,就會把該服務狀态置為下線(DOWN),并把該下線事件傳播出去。
  7. 服務剔除

    :有時候,服務執行個體可能會因為網絡故障等原因導緻不能提供服務,而此時該執行個體也沒有發送請求給Eureka Server來進行服務下線,是以,還需要有服務剔除的機制。Eureka Server在啟動的時候會建立一個定時任務,

    每隔一段時間(預設60秒),從目前服務清單中把逾時沒有續約(預設90秒)的服務剔除。

  8. 自我保護

    :既然Eureka Server會定時剔除逾時沒有續約的服務,那就有可能出現一種場景,網絡一段時間内發生了異常,所有的服務都沒能夠進行續約,Eureka Server就把所有的服務都剔除了,這樣顯然不太合理。是以,就有了自我保護機制,當短時間内,統計續約失敗的比例,如果達到一定門檻值,則會觸發自我保護的機制,

    在該機制下,Eureka Server不會剔除任何的微服務

    ,等到正常後,再退出自我保護機制。

從這些概念中,就可以知道大體的流程,Eureka Client向Eureka Server注冊,并且維護心跳來進行續約,如果長時間不續約,就會被剔除。Eureka Server之間進行資料同步來形成叢集,Eureka Client從Eureka Server擷取服務清單,用來進行服務調用,Eureka Client服務重新開機前調用Eureka Server的接口進行下線操作。

2.源碼分析

【SpringClould】Spring Cloud Eureka源碼分析1.概述2.源碼分析

2.1 Eureka Server源碼

我們在server端加入了注解後

@SpringBootApplication
@EnableEurekaServer
@EnableDiscoveryClient
public class EurekaServerNodeApplication {

	public static void main(String[] args) {
		System.out.println("xx");
		SpringApplication.run(EurekaServerNodeApplication.class, args);
	}

}
           

然後這個注解對應代碼如下

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.context.annotation.Import;

/**
 * Annotation to activate Eureka Server related configuration {@link EurekaServerAutoConfiguration}
 *
 * @author Dave Syer
 * @author Biju Kunjummen
 *
 */

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

           

2.1.1

@EnableEurekaServer

注解

@Configuration
public class EurekaServerMarkerConfiguration {

	@Bean
	public Marker eurekaServerMarkerBean() {
		return new Marker();
	}

	class Marker {
	}
}

           

spring-cloud-netflix-eureka-server.jar>spring.factories//spring boot

會自動注入

【SpringClould】Spring Cloud Eureka源碼分析1.概述2.源碼分析

内容

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

@Configuration
@Import({EurekaServerInitializerConfiguration.class})
@ConditionalOnBean({Marker.class})
@EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})
@PropertySource({"classpath:/eureka/server.properties"})
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    //@Bean,向spring容器中注入一堆bean
}


           

2.1.2 EurekaServerInitializerConfiguration

EurekaServerInitializerConfiguration.start()

@Override
	public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//TODO: is this class even needed now?
					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");

					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
					EurekaServerInitializerConfiguration.this.running = true;
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context", ex);
				}
			}
		}).start();
	}
           

2.1.2.1 contextInitialized()

//初始化
public void contextInitialized(ServletContext context) {
    try {
    	//初始化運作環境
        this.initEurekaEnvironment();
        //初始化eureka
        this.initEurekaServerContext();
        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    } catch (Throwable var3) {
        log.error("Cannot bootstrap eureka server :", var3);
        throw new RuntimeException("Cannot bootstrap eureka server :", var3);
    }
}

           
2.1.2.1.1 initEurekaEnvironment
protected void initEurekaEnvironment() throws Exception {
		log.info("Setting the eureka configuration..");

		String dataCenter = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_DATACENTER);
		if (dataCenter == null) {
			log.info(
					"Eureka data center value eureka.datacenter is not set, defaulting to default");
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
		}
		String environment = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_ENVIRONMENT);
		if (environment == null) {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
			log.info(
					"Eureka environment value eureka.environment is not set, defaulting to test");
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
		}
	}
           
2.1.2.1.2 initEurekaServerContext
protected void initEurekaServerContext() throws Exception {
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
    if (this.isAws(this.applicationInfoManager.getInfo())) {
        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);
        this.awsBinder.start();
    }

    EurekaServerContextHolder.initialize(this.serverContext);
    log.info("Initialized server context");
    //從其他eureka節點同步資料
    int registryCount = this.registry.syncUp();
    //剔除無效節點
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    EurekaMonitors.registerAllStats();
}

           

2.1.2.1.2.1 registry.syncUp()

public int syncUp() {
    // Copy entire entry from neighboring DS node
    int count = 0;
	//預設擷取5次,前提條件是開啟注冊到eureka
	//開啟擷取注冊資訊時,才會從其他節點擷取
	//registerWithEureka與fetchRegistry都要配置成true
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        //擷取系統資料庫資訊
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                    	//将其他節點上的執行個體注冊到目前服務中
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

           

2.1.2.1.2.2 eurekaClient.getApplications()

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    ...
    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        //定時任務
        //定時更新注冊服務
        //定時續約
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());
		//續約任務
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff
		//更新系統資料庫
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff
		...
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }
	//擷取注冊資訊
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }
	...
    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    //初始化定時任務
    initScheduledTasks();
    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }
    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);
    initTimestampMs = System.currentTimeMillis();
}

           

2.1.2.1.2.3 fetchRegistry方法

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();
        if (clientConfig.shouldDisableDelta()//是否禁用增量
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)//系統資料庫為null
                || (applications.getRegisteredApplications().size() == 0)//系統資料庫裡的節點資料
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
          	//全量擷取
            getAndStoreFullRegistry();
        } else {
        	//增量擷取
            getAndUpdateDelta(applications);
        }
        //更新hashcode
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

           

getAndStoreFullRegistry 全量擷取(http請求擷取)

private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

           

getAndUpdateDelta 增量擷取(增量擷取)

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    Applications delta = null;
    //擷取增量資料
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
    //如果增量資料沒有,擷取全量資料
    if (delta == null) {
        getAndStoreFullRegistry();
    } 
    //cas修改更新次數
    else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        String reconcileHashCode = "";
        //加鎖
        if (fetchRegistryUpdateLock.tryLock()) {
            try {//更新增量節點
                updateDelta(delta);
                //擷取hashcode
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
            	//解鎖
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        //與服務端節點比較,判斷節點是否相同,不相同,做全量更新
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } 
}

           

Jersey提供服務,擷取所有服務

@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {

    ...
    //擷取key
    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );
    Response response;
    if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
    	//擷取資料responseCache.getGZIP(cacheKey)
        response = Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        response = Response.ok(responseCache.get(cacheKey))
                .build();
    }
    return response;
}
           

responseCache.getGZIP(cacheKey)

@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {

    ...
    //擷取key
    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );
    Response response;
    if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
    	//擷取資料responseCache.getGZIP(cacheKey)
        response = Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        response = Response.ok(responseCache.get(cacheKey))
                .build();
    }
    return response;
}

           

三級緩存定義

@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {

    ...
    //擷取key
    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );
    Response response;
    if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
    	//擷取資料responseCache.getGZIP(cacheKey)
        response = Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        response = Response.ok(responseCache.get(cacheKey))
                .build();
    }
    return response;
}
           

下面再看下Eureka Server方面的源碼,主要代碼都在

com.netflix.eureka:eureka-core-xxx.jar

包下。

先看下這個包下的

EurekaBootStrap

類,這個類實作了

ServletContextListener

接口,在 Servlet API 中有一個

ServletContextListener

接口,它能夠監聽

ServletContext

對象的生命周期,當Servlet 容器啟動或終止Web 應用時,會觸發

ServletContextEvent

事件,該事件由

ServletContextListener

來處理。在

ServletContextListener

接口中定義了處理ServletContextEvent 事件的兩個方法:

contextInitialized和contextDestroyed

EurekaBootStrap類中實作了這兩個方法,在容器初始化的時候,就會執行這個類中的方法。

@Override
    public void contextInitialized(ServletContextEvent event) {
        try {
            initEurekaEnvironment();
            initEurekaServerContext();

            ServletContext sc = event.getServletContext();
            sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
        } catch (Throwable e) {
            logger.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }
           

在contextInitialized中,會初始化

EurekaEnvironment

EurekaServerContext

,進入

EurekaServerContext

方法,可以看到,在方法中,建立了幾個類,

PeerAwareInstanceRegistryImpl

PeerEurekaNodes

等。

Eureka Server會接收Eureka Client發送的REST請求,進行服務的注冊,續約,下線等操作,這部分代碼在

com.netflix.eureka:eureka-core-xx.jar

包的resources目錄下。

在resources目錄下有個

ApplicationResource

類,類中有個方法,addInstance,這個方法就是接收注冊服務請求的,下面看下這個方法:

com.netflix.eureka.resources.ApplicationResource#addInstance

@POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }
           

這個方法接收一個

InstanceInfo info

參數,這個參數就是要注冊的Eureka Client節點的資訊,在對這個

InstanceInfo

資訊進行了一連串的校驗之後,會調用

registry.register(info, “true”.equals(isReplication))

這個方法,進行服務注冊,再進入這個方法看下:

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
           

這個方法就是在EurekaBootStrap中初始化的

PeerAwareInstanceRegistryImpl

類中的方法,在方法中,會擷取

InstanceInfo

的續約時間資訊,預設是90秒。然後調用父類的register方法注冊,注冊完後,會調用replicateToPeers方法,把這個節點的注冊資訊告訴其它Eureka Server節點。

先看下父類的register方法:

/**
     * Registers a new instance with a given duration.
     *
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     */
    public void register(InstanceInfo r, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(r.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap =
                        new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(r.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(r.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = r.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is " +
                                    "greater than the one that is being registered {}",
                            existingLastDirtyTimestamp,
                            registrationLastDirtyTimestamp);
                    r.setLastDirtyTimestamp(existingLastDirtyTimestamp);
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(r, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(r.getId(), lease);
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        r.getAppName() + "(" + r.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(r.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", r.getOverriddenStatus(), r.getId());
                if (!overriddenInstanceStatusMap.containsKey(r.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", r.getId());
                    overriddenInstanceStatusMap.put(r.getId(), r.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(r.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                r.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(r, existingLease, isReplication);
            r.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(r.getStatus())) {
                lease.serviceUp();
            }
            r.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            r.setLastUpdatedTimestamp();
            invalidateCache(r.getAppName(), r.getVIPAddress(), r.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    r.getAppName(), r.getId(), r.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
           

這個方法挺長的,有興趣的可以仔細看下,方法功能大體還是注冊資訊了,注冊的資訊會存放在map中,而且還是個兩層的

ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>

,外層map的key是appName,也就是服務名,内層map的key是instanceId,也就是執行個體名。更新完map資訊後,還會更新緩存資訊。

注冊完資訊後,調用了replicateToPeers方法,向其他Eureka Server轉發該注冊資訊,以便實作資訊的同步。進到這個方法裡面看下:

/**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMe(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
           

方法中會通過for循環周遊所有的PeerEurekaNode,調用replicateInstanceActionsToPeers方法,把資訊複制給其他的Eureka Server節點,下面是replicateInstanceActionsToPeers方法:

/**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }
           

方法中,會判斷action具體的動作,如果是Register,就會調用node.register(info);

/**
     * Sends the registration information of {@link InstanceInfo} receiving by
     * this node to the peer node represented by this class.
     *
     * @param info
     *            the instance information {@link InstanceInfo} of any instance
     *            that is send to this instance.
     * @throws Exception
     */
    public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }
           

在該方法中,是通過啟動了一個任務,來向其它節點同步資訊的,不是實時同步的。

2.2 Eureka Client源碼

先從服務注冊開始梳理,Eureka Client啟動的時候就去Eureka Server注冊服務。通過在啟動類上添加

@EnableDiscoveryClient

這個注解,來聲明這是一個Eureka Client。是以,先看下這個注解:

/**
 * Annotation to enable a DiscoveryClient implementation.
 * @author Spencer Gibb
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

}

           

這個注解上方有注釋,

說這個注解是為了開啟一個DiscoveryClient的執行個體

接下來就可以搜尋

DiscoveryClient

,可以發現有一個類,還有一個接口,檢視這個類的關系圖:

【SpringClould】Spring Cloud Eureka源碼分析1.概述2.源碼分析

這個類實作了EurekaClient接口,而EurekaClient又繼承了

LookupService

接口。這兩個接口都是

Netflix

開源包中的内容,主要定義了針對Eureka發現服務的抽象方法。是以,DiscoveryClient類主要就是發現服務的。

接下來,就詳細看下DiscoveryClient類,類上面的注釋,說明了這個類是用來幫助和Eureka Server互相協作的,可以進行服務注冊,服務續約,服務下線,擷取服務清單。需要配置一個

Eureka Server的Url

清單。

上面提到的這個清單,就是我們在配置檔案中配置的

eureka.client.service-url.defaultZone

這一選項,這個位址就是Eureka Server的位址,服務注冊、服務續約以及其他的操作,都是向這個位址發送請求的。

在DiscoveryClient類中可以看到有很多方法,包括

register()、renew()、shutdown()、unregister()

等。

既然Eureka Client需要一開始先初始化DiscoveryClient執行個體,那就看下DiscoveryClient的構造方法。

DiscoveryClient的構造方法還是挺長的,裡面初始化了一大堆的對象,不過可以觀察到在new了這麼一大堆對象之後,調用了

initScheduledTasks();

這個方法,是以,點進

initScheduledTasks()

方法裡面看下。

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
 。。。。。
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    initScheduledTasks();
....
}
           

内容如下

【SpringClould】Spring Cloud Eureka源碼分析1.概述2.源碼分析

在initScheduledTasks方法中,初始化了幾個任務。

一開始有個if判斷,判斷是否需要從Eureka Server擷取資料,如果為真,則初始化一個服務擷取的定時任務。

還有有個

if (clientConfig.shouldRegisterWithEureka())

的判斷,是以,當Eureka Client配置這個為

true

時,就會執行這個if語句裡面的邏輯。if語句中,會初始化一個

Heartbeat timer

InstanceInfoReplicator。Heartbeat timer

就是不斷的發送請求來維持心跳的,也就是服務續約的任務。而

InstanceInfoReplicator

類實作了Runnable接口,是以需要看下

InstanceInfoReplicator

類中的run方法。

public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
           

在run方法中,會調用我們之前看到的

discoveryClient.register()

方法進行服務注冊。

/**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }
           

register()

方法,就會把

instanceInfo

資訊,通過REST請求發送給Eureka Server。

instanceInfo

就是客服端服務的中繼資料。

是以,在initScheduledTasks方法中,做了三個操作,

向Eureka Server注冊服務,并且在條件滿足的情況下,建立服務擷取和服務續約兩個定時任務。

我們在Eureka Client的配置檔案中還配置了

eureka.client.service-url.defaultZone

這個位址,是以,在DiscoveryClient類中找一下serviceUrl這個關鍵字,可以看到有相應的方法:

/**
 * @deprecated use {@link #getServiceUrlsFromConfig(String, boolean)} instead.
 */
@Deprecated
public static List<String> getEurekaServiceUrlsFromConfig(String instanceZone, boolean preferSameZone) {
    return EndpointUtils.getServiceUrlsFromConfig(staticClientConfig, instanceZone, preferSameZone);
}
           

方法最終調用了EndpointUtils.getServiceUrlsFromConfig,點進這個方法看下:

/**
  * Get the list of all eureka service urls from properties file for the eureka client to talk to.
  *
  * @param clientConfig the clientConfig to use
  * @param instanceZone The zone in which the client resides
  * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise
  * @return The list of all eureka service urls for the eureka client to talk to
  */
 public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
     List<String> orderedUrls = new ArrayList<String>();
     String region = getRegion(clientConfig);
     String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
     if (availZones == null || availZones.length == 0) {
         availZones = new String[1];
         availZones[0] = DEFAULT_ZONE;
     }
     logger.debug("The availability zone for the given region {} are {}", region, Arrays.toString(availZones));
     int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);

     List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);
     if (serviceUrls != null) {
         orderedUrls.addAll(serviceUrls);
     }
     int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
     while (currentOffset != myZoneOffset) {
         serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]);
         if (serviceUrls != null) {
             orderedUrls.addAll(serviceUrls);
         }
         if (currentOffset == (availZones.length - 1)) {
             currentOffset = 0;
         } else {
             currentOffset++;
         }
     }

     if (orderedUrls.size() < 1) {
         throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
     }
     return orderedUrls;
}
           

在方法中,先擷取了該項目配置的

Region

,又根據region擷取了可用的zone清單。這裡可以看到,項目的

region

隻能屬于一個,

一個region下可以配置多個zone

再通過getZoneOffset方法,從多個zone中選擇對應的一個下标,根據這個zone來加載這個zone下的serviceUrls。

2.3 續約

DiscoveryClient的renew方法是服務續約,預設是30s一次

/**
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

           

參考:https://blog.csdn.net/u010883443/article/details/108470758

https://segmentfault.com/a/1190000011668299