天天看點

Eureka源碼解析(2.2.0.RELEASE)

(基于eureka-2.2.0.RELEASE)

Eureka的一些概念

  • Register:服務注冊 當Eureka用戶端向Eureka Server注冊時,它提供自身的中繼資料,比如IP位址、端口,運作狀況訓示符URL,首頁等。
  • Renew:服務續約 Eureka客戶會每隔30秒發送一次心跳來續約。 通過續約來告知Eureka Server該Eureka客戶仍然存在,沒有出現問題。正常情況下,如果Eureka Server在90秒沒有收到Eureka客戶的續約,它會将執行個體從其系統資料庫中删除。 建議不要更改續約間隔.
  • Fetch Registries:擷取注冊清單資訊 Eureka用戶端從伺服器擷取系統資料庫資訊,并将其緩存在本地。用戶端會使用該資訊查找其他服務,進而進行遠端調用。該注冊清單資訊定期(每30秒鐘)更新一次。每次傳回注冊清單資訊可能與Eureka用戶端的緩存資訊不同, Eureka用戶端自動處理。如果由于某種原因導緻注冊清單資訊不能及時比對,Eureka用戶端則會重新擷取整個系統資料庫資訊。 Eureka伺服器緩存注冊清單資訊,整個系統資料庫以及每個應用程式的資訊進行了壓縮,壓縮内容和沒有壓縮的内容完全相同。Eureka用戶端和Eureka 伺服器可以使用JSON / XML格式進行通訊。在預設的情況下Eureka用戶端使用壓縮JSON格式來擷取注冊清單的資訊。
  • Cancel:服務下線 Eureka用戶端在程式關閉時向Eureka伺服器發送取消請求。 發送請求後,該用戶端執行個體資訊将從伺服器的執行個體系統資料庫中删除。該下線請求不會自動完成,它需要調用以下内容: DiscoveryManager.getInstance().shutdownComponent();
  • Eviction 服務剔除 在預設的情況下,當Eureka用戶端連續90秒沒有向Eureka伺服器發送服務續約,即心跳,Eureka伺服器會将該服務執行個體從服務注冊清單删除,即服務剔除。

Eureka的高可用架構

如圖為Eureka的進階架構圖,該圖檔來自于Eureka開源代碼的文檔,位址為https://github.com/Netflix/eureka/wiki/Eureka-at-a-glance 。

Eureka源碼解析(2.2.0.RELEASE)

從圖可以看出在這個體系中,有2個角色,即Eureka Server和Eureka Client。而Eureka Client又分為Applicaton Service和Application Client,即服務提供者何服務消費者。 每個區域有一個Eureka叢集,并且每個區域至少有一個eureka伺服器可以處理區域故障,以防伺服器癱瘓。

Eureka Client向Eureka Serve注冊,并将自己的一些用戶端資訊發送Eureka Serve。然後,Eureka Client通過向Eureka Server發送心跳(每30秒)來續約服務的。 如果用戶端持續不能續約,那麼,它将在大約90秒内從伺服器系統資料庫中删除。 注冊資訊和續訂被複制到叢集中的Eureka Serve所有節點。 來自任何區域的Eureka Client都可以查找系統資料庫資訊(每30秒發生一次)。根據這些系統資料庫資訊,Application Client可以遠端調用Applicaton Service來消費服務。

Register服務注冊

服務注冊,即Eureka Client向Eureka Server送出自己的服務資訊,包括IP位址、端口、service ID等資訊。如果Eureka Client沒有寫service ID,則預設為${spring.application.name}。

服務注冊其實很簡單,在Eureka Client啟動的時候,将自身的服務的資訊發送到Eureka Server。現在來簡單的閱讀下源碼。在Maven的依賴包下,找到eureka-client-1.6.2.jar包。在com.netflix.discovery包下有個DiscoveryClient類,該類包含了Eureka Client和Eureka Server的相關方法。其中DiscoveryClient實作了EurekaClient接口,并且它是一個單例模式,而EurekaClient繼承了LookupService接口。

入口EurekaClientAutoConfiguration,自動配置DiscoveryClient執行個體

@Bean
	public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
		return new EurekaDiscoveryClient(config, client);
	}

  @Configuration
	@ConditionalOnMissingRefreshScope
	protected static class EurekaClientConfiguration {

    //CloudEurekaClient繼承了DiscoveryClient
		@Bean(destroyMethod = "shutdown")
		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
		public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
			return new CloudEurekaClient(manager, config, this.optionalArgs,
					this.context);
		}
}
           

入口DiscoveryClient構造方法

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
        // ......
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        // ......
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            // 心跳的線程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

						// 重新整理本地緩存的線程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

						// 
            eurekaTransport = new EurekaTransport();
            **scheduleServerEndpointTask**(eurekaTransport, args);

            // ......
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        // ......

        // 初始化定時排程任務
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();

        // .......
    }
           

初始化定時排程任務

private void initScheduledTasks() {
				// 是否應該向eureka server擷取注冊資訊
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            // 定時重新整理本地注冊服務緩存的排程任務
						scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

				// 是否向eureka server注冊
        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

						// 定時排程心跳任務,續約租期
            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

						// 執行個體資訊複制器,用于執行個體之間資訊複制
            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
						// 執行個體資訊狀态監聽器
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
										// 
                    instanceInfoReplicator.**onDemandUpdate**();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
						// instanceInfoReplicator繼承了Runnable
						// start方法會使用scheduler延遲排程instanceInfoReplicator
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
           

InstanceInfoReplicator類

class InstanceInfoReplicator implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(InstanceInfoReplicator.class);

    private final DiscoveryClient discoveryClient;
    private final InstanceInfo instanceInfo;

    private final int replicationIntervalSeconds;
    private final ScheduledExecutorService scheduler;
    private final AtomicReference<Future> scheduledPeriodicRef;

    private final AtomicBoolean started;
    private final RateLimiter rateLimiter;
    private final int burstSize;
    private final int allowedRatePerMinute;

    InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
        this.discoveryClient = discoveryClient;
        this.instanceInfo = instanceInfo;
        this.scheduler = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
                        .setDaemon(true)
                        .build());

        this.scheduledPeriodicRef = new AtomicReference<Future>();

        this.started = new AtomicBoolean(false);
        this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
        this.replicationIntervalSeconds = replicationIntervalSeconds;
        this.burstSize = burstSize;

        this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
        logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
    }

		// 
    public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

    public void stop() {
        shutdownAndAwaitTermination(scheduler);
        started.set(false);
    }

    // ......

		// 執行個體狀态變更時背景進行更新
    public boolean **onDemandUpdate**() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            if (!scheduler.isShutdown()) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        logger.debug("Executing on-demand update of local InstanceInfo");
    
                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                            latestPeriodic.cancel(false);
                        }
    
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
                logger.warn("Ignoring onDemand update due to stopped scheduler");
                return false;
            }
        } else {
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }

		// 線程的主方法,調用discoveryClient的register()發起注冊
    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

						// 當本地中繼資料有變動時,會設定dirtyTimestamp,此時需要重新注冊
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
						// 重複延遲排程
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

}
           

DiscoveryClient類的服務注冊方法register(),其代碼如下:

@Singleton
public class DiscoveryClient implements EurekaClient {

		// ......

		// DiscoveryClient類的服務注冊方法
		boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
						// 通過Http請求向Eureka Server注冊
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

		// ......
}
           

registrationClient在DiscoveryClient的scheduleServerEndpointTask()方法中執行個體化

private void **scheduleServerEndpointTask**(EurekaTransport eurekaTransport,
                                            AbstractDiscoveryClientOptionalArgs args) {
		// ......
		if (clientConfig.shouldRegisterWithEureka()) {
            EurekaHttpClientFactory newRegistrationClientFactory = null;
            EurekaHttpClient newRegistrationClient = null;
            try {
                newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                        eurekaTransport.bootstrapResolver,
                        eurekaTransport.transportClientFactory,
                        transportConfig
                );
                newRegistrationClient = newRegistrationClientFactory.newClient();
            } catch (Exception e) {
                logger.warn("Transport initialization failure", e);
            }
            eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
            eurekaTransport.**registrationClient** = newRegistrationClient;
        }

		// ......
}
           

跟蹤代碼,發現registrationClient 為SessionedEurekaHttpClient的執行個體對象,SessionedEurekaHttpClient繼承了EurekaHttpClientDecorator,最終調用該類的register()發起注冊

public abstract class **EurekaHttpClientDecorator** implements EurekaHttpClient {

		@Override
    public EurekaHttpResponse<Void> **register**(final InstanceInfo info) {
        return execute(new RequestExecutor<Void>() {
            @Override
            public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
                return delegate.register(info);
            }

            @Override
            public RequestType getRequestType() {
                return RequestType.Register;
            }
        });
    }

}
           

繼續跟蹤,delegate為AbstractJerseyEurekaHttpClient

public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {

    @Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

}
           

Eureka server

然後在來看Eureka server端的代碼,在Maven的eureka-core:1.6.2的jar包下。打開com.netflix.eureka包,很輕松的就發現了又一個EurekaBootStrap的類,BootStrapContext具有最先初始化的權限,是以先看這個類。

public class EurekaBootStrap implements ServletContextListener {

		/**
     * Initializes Eureka, including syncing up with other Eureka peers and publishing the registry.
     *
     * @see
     * javax.servlet.ServletContextListener#contextInitialized(javax.servlet.ServletContextEvent)
     */
    @Override
    public void contextInitialized(ServletContextEvent event) {
        try {
            initEurekaEnvironment();
            initEurekaServerContext();

            ServletContext sc = event.getServletContext();
            sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
        } catch (Throwable e) {
            logger.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

		protected void initEurekaServerContext() throws Exception {
		 
		 ...//省略代碼
		   PeerAwareInstanceRegistry registry;
		        if (isAws(applicationInfoManager.getInfo())) {
		           ...//省略代碼,如果是AWS的代碼
		        } else {
		            registry = new PeerAwareInstanceRegistryImpl(
		                    eurekaServerConfig,
		                    eurekaClient.getEurekaClientConfig(),
		                    serverCodecs,
		                    eurekaClient
		            );
		        }
		
		        PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
		                registry,
		                eurekaServerConfig,
		                eurekaClient.getEurekaClientConfig(),
		                serverCodecs,
		                applicationInfoManager
		        );

					// Copy registry from neighboring eureka node
					// 從其他注冊節點同步注冊資訊
          int registryCount = registry.syncUp();
          registry.openForTraffic(applicationInfoManager, registryCount);
		 }

}
           

可見EurekaBootStrap 為一個ServletContextListener,web應用初始化時會調用其contextInitialized()方法。

其中PeerAwareInstanceRegistryImpl和PeerEurekaNodes兩個類看其命名,應該和服務注冊以及Eureka Server高可用有關。

PeerAwareInstanceRegistryImpl:繼承了AbstractInstanceRegistry

**PeerAwareInstanceRegistryImpl:**處理Server執行個體節點間的資訊同步,包括注冊、續期、下線、過期、狀态變更。

AbstractInstanceRegistry:處理來自client節點的所有注冊請求,包括注冊、續期、下線、過期、狀态變更

先追蹤PeerAwareInstanceRegistryImpl類,在該類有個register()方法,該方法提供了注冊,并且将注冊後資訊同步到其他的Eureka Server服務。代碼如下:

/**
 * Handles replication of all operations to AbstractInstanceRegistry to peer Eureka
 * nodes to keep them all in sync.
 * Primary operations that are replicated are the **Registers**,**Renewals**,**Cancels**
 * ,**Expirations** and **Status** Changes
 */
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

			public void register(final InstanceInfo info, final boolean isReplication) {
	        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
	        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
	            leaseDuration = info.getLeaseInfo().getDurationInSecs();
	        }
					// 父類處理注冊請求
	        super.register(info, leaseDuration, isReplication);
          // 注冊資訊複制到其他節點
	        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
	    }

			private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

						// 周遊循環向所有的Peers節點
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
								// Replicates all instance changes to peer eureka nodes except for replication traffic to this node.
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
}
           

其中 super.register(info, leaseDuration, isReplication)方法,點選進去到父類AbstractInstanceRegistry可以發現更多細節,其中注冊清單的資訊被儲存在一個Map中。

replicateToPeers()方法,即同步到其他Eureka Server的其他Peers節點,追蹤代碼,發現它會周遊循環向所有的Peers節點注冊,最終執行類PeerEurekaNodes的register()方法,該方法通過執行一個任務向其他節點同步該注冊資訊,代碼如下:

public class PeerEurekaNode {
			private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
			
			// 将此節點接收的InstanceInfo的注冊資訊發送到由該類表示的對等節點。
			public void register(final InstanceInfo info) throws Exception {
	        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
	        batchingDispatcher.process(
	                taskId("register", info),
	                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
	                    public EurekaHttpResponse<Void> execute() {
	                        return replicationClient.register(info);
	                    }
	                },
	                expiryTime
	        );
	    }

}
           

經過一系列的源碼追蹤,可以發現PeerAwareInstanceRegistryImpl的register()方法實作了服務的注冊,并且向其他Eureka Server的Peer節點同步了該注冊資訊,那麼register()方法被誰調用了呢?

之前在Eureka Client的分析可以知道,Eureka Client是通過 http來向Eureka Server注冊的,那麼Eureka Server肯定會提供一個注冊的接口給Eureka Client調用,那麼PeerAwareInstanceRegistryImpl的register()方法肯定最終會被暴露的Http接口所調用。在Idea開發工具,按住alt+滑鼠左鍵,可以很快定位到ApplicationResource類的addInstance ()方法,即服務注冊的接口,其代碼如下:

@POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
       
    ...//省略代碼                 
               registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }
           

Renew服務續約

服務續約和服務注冊非常類似,通過之前的分析可以知道,服務注冊在Eureka Client程式啟動之後開啟,并同時開啟服務續約的定時任務。在eureka-client-1.6.2.jar的DiscoveryClient的類下有renew()方法,其代碼如下:

@Singleton
public class DiscoveryClient implements EurekaClient {
	  
		/**
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
						// registrationClient為SessionedEurekaHttpClient,
						// 向Server發送心跳資訊
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

}
           

另外服務端的續約接口在eureka-core:1.6.2.jar的 com.netflix.eureka包下的InstanceResource類下,接口方法為renewLease(),它是REST接口。為了減少類篇幅,省略了大部分代碼的展示。其中有個registry.renew()方法,即服務續約,代碼如下:

@PUT
public Response renewLease(...參數省略){
    // ...  代碼省略
    boolean isSuccess=registry.renew(app.getName(),id, isFromReplicaNode);
    //   ...  代碼省略
 }
           

讀者可以跟蹤registry.renew的代碼一直深入研究。在這裡就不再多講述。另外服務續約有2個參數是可以配置,即Eureka Client發送續約心跳的時間參數和Eureka Server在多長時間内沒有收到心跳将執行個體剔除的時間參數,在預設的情況下這兩個參數分别為30秒和90秒,官方給的建議是不要修改,如果有特殊要求還是可以調整的,隻需要分别在Eureka Client和Eureka Server修改以下參數:

eureka.instance.leaseRenewalIntervalInSeconds
eureka.instance.leaseExpirationDurationInSeconds
           

最後,服務注冊清單的擷取、服務下線和服務剔除就不在這裡進行源碼跟蹤解讀,因為和服務注冊和續約類似,有興趣的朋友可以自己看下源碼,深入了解。總的來說,通過讀源碼,可以發現,整體架構與前面小節的eureka 的高可用架構圖完全一緻。

Eureka Client注冊一個執行個體為什麼這麼慢

  • Eureka Client一啟動(不是啟動完成),不是立即向Eureka Server注冊,它有一個延遲向服務端注冊的時間,通過跟蹤源碼,可以發現預設的延遲時間為40秒,源碼在eureka-client-1.6.2.jar的DefaultEurekaClientConfig類下,代碼如下:
public int getInitialInstanceInfoReplicationIntervalSeconds() {
    return configInstance.getIntProperty(
        namespace + INITIAL_REGISTRATION_REPLICATION_DELAY_KEY, 40).get();
 }
           
  • Eureka Server的響應緩存 Eureka Server維護每30秒更新的響應緩存,可通過更改配置eureka.server.responseCacheUpdateIntervalMs來修改。 是以即使執行個體剛剛注冊,它也不會出現在調用/ eureka / apps REST端點的結果中。
  • Eureka Server重新整理緩存 Eureka用戶端保留系統資料庫資訊的緩存。 該緩存每30秒更新一次(如前所述)。 因 此,用戶端決定重新整理其本地緩存并發現其他新注冊的執行個體可能需要30秒。
  • LoadBalancer Refresh Ribbon的負載平衡器從本地的Eureka Client擷取服務注冊清單資訊。Ribbon本身還維護本地緩存,以避免為每個請求調用本地用戶端。 此緩存每30秒重新整理一次(可由ribbon.ServerListRefreshInterval配置)。 是以,可能需要30多秒才能使用新注冊的執行個體。

綜上幾個因素,一個新注冊的執行個體,特别是啟動較快的執行個體(預設延遲40秒注冊),不能馬上被Eureka Server發現。另外,剛注冊的Eureka Client也不能立即被其他服務調用,因為調用方因為各種緩存沒有及時的擷取到新的注冊清單。

Eureka 的自我保護模式

當一個新的Eureka Server出現時,它嘗試從相鄰節點擷取所有執行個體系統資料庫資訊。如果從Peer節點擷取資訊時出現問題,Eureka Serve會嘗試其他的Peer節點。如果伺服器能夠成功擷取所有執行個體,則根據該資訊設定應該接收的更新門檻值。如果有任何時間,Eureka Serve接收到的續約低于為該值配置的百分比(預設為15分鐘内低于85%),則伺服器開啟自我保護模式,即不再剔除注冊清單的資訊。

這樣做的好處就是,如果是Eureka Server自身的網絡問題,導緻Eureka Client的續約不上,Eureka Client的注冊清單資訊不再被删除,也就是Eureka Client還可以被其他服務消費。

繼續閱讀