天天看點

Eureka 源碼分析之 Eureka Client

文章首發于微信公衆号《程式員果果》

位址:

https://mp.weixin.qq.com/s/47TUd96NMz67_PCDyvyInQ

簡介

Eureka是一種基于REST(Representational State Transfer)的服務,主要用于AWS雲,用于定位服務,以實作中間層伺服器的負載平衡和故障轉移。我們将此服務稱為Eureka Server。Eureka還附帶了一個基于Java的用戶端元件Eureka Client,它使與服務的互動變得更加容易。用戶端還有一個内置的負載均衡器,可以進行基本的循環負載均衡。在Netflix,一個更複雜的負載均衡器包含Eureka基于流量,資源使用,錯誤條件等多種因素提供權重負載平衡,以提供卓越的彈性。

先看一張 github 上 Netflix Eureka 的一架構圖,如下:

Eureka 源碼分析之 Eureka Client

從圖可以看出在這個體系中,有2個角色,即Eureka Server和Eureka Client。而Eureka Client又分為Applicaton Service和Application Client,即服務提供者何服務消費者。 每個區域有一個Eureka叢集,并且每個區域至少有一個eureka伺服器可以處理區域故障,以防伺服器癱瘓。

Eureka Client 在 Eureka Server 注冊,然後Eureka Client 每30秒向 Eureka Server 發送一次心跳來更新一次租約。如果 Eureka Client 無法續訂租約幾次,則會在大約90秒内 Eureka Server 将其從伺服器系統資料庫中删除。注冊資訊和續訂将複制到群集中的所有 Eureka Server 節點。來自任何區域的用戶端都可以查找系統資料庫資訊(每30秒發生一次)根據這些系統資料庫資訊,Application Client 可以遠端調用 Applicaton Service 來消費服務。

源碼分析

基于Spring Cloud的 eureka 的 client 端在啟動類上加上 @EnableDiscoveryClient 注解,就可以 用 NetFlix 提供的 Eureka client。下面就以 @EnableDiscoveryClient 為入口,進行Eureka Client的源碼分析。

@EnableDiscoveryClient,通過源碼可以發現這是一個标記注解:

/**
 * Annotation to enable a DiscoveryClient implementation.
 * @author Spencer Gibb
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

    boolean autoRegister() default true;
}           

通過注釋可以知道 @EnableDiscoveryClient 注解是用來 啟用 DiscoveryClient 的實作,DiscoveryClient接口代碼如下:

public interface DiscoveryClient {
    
    String description();

    List<ServiceInstance> getInstances(String serviceId);

    List<String> getServices();

}           

接口說明:

  • description():實作描述。
  • getInstances(String serviceId):擷取與特定serviceId關聯的所有ServiceInstance
  • getServices():傳回所有已知的服務ID

DiscoveryClient 接口的實作結構圖:

Eureka 源碼分析之 Eureka Client
  • EurekaDiscoveryClient:Eureka 的 DiscoveryClient 實作類。
  • CompositeDiscoveryClient:用于排序可用用戶端的發現用戶端的順序。
  • NoopDiscoveryClient:什麼都不做的服務發現實作類,已經被廢棄。
  • SimpleDiscoveryClient:簡單的服務發現實作類 SimpleDiscoveryClient,具體的服務執行個體從 SimpleDiscoveryProperties 配置中擷取。

EurekaDiscoveryClient 是 Eureka 對 DiscoveryClient接口的實作,代碼如下:

public class EurekaDiscoveryClient implements DiscoveryClient {

    public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";

    private final EurekaInstanceConfig config;

    private final EurekaClient eurekaClient;

    public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
        this.config = config;
        this.eurekaClient = eurekaClient;
    }

    @Override
    public String description() {
        return DESCRIPTION;
    }

    @Override
    public List<ServiceInstance> getInstances(String serviceId) {
        List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
                false);
        List<ServiceInstance> instances = new ArrayList<>();
        for (InstanceInfo info : infos) {
            instances.add(new EurekaServiceInstance(info));
        }
        return instances;
    }

    @Override
    public List<String> getServices() {
        Applications applications = this.eurekaClient.getApplications();
        if (applications == null) {
            return Collections.emptyList();
        }
        List<Application> registered = applications.getRegisteredApplications();
        List<String> names = new ArrayList<>();
        for (Application app : registered) {
            if (app.getInstances().isEmpty()) {
                continue;
            }
            names.add(app.getName().toLowerCase());

        }
        return names;
    }

}           

從代碼可以看出 EurekaDiscoveryClient 實作了 DiscoveryClient 定義的規範接口,真正實作發現服務的是 EurekaClient,下面是 EurekaClient 依賴結構圖:

Eureka 源碼分析之 Eureka Client

EurekaClient 唯一實作類 DiscoveryClient,DiscoveryClient 的構造方法如下:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    //省略...

    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff
         //省略...
       initScheduledTasks();
    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }
    //省略...
}           

可以看到這個構造方法裡面,主要做了下面幾件事:

  • 建立了scheduler定時任務的線程池,heartbeatExecutor心跳檢查線程池(服務續約),cacheRefreshExecutor(服務擷取)
  • 然後initScheduledTasks()開啟上面三個線程池,往上面3個線程池分别添加相應任務。然後建立了一個instanceInfoReplicator(Runnable任務),然後調用InstanceInfoReplicator.start方法,把這個任務放進上面scheduler定時任務線程池(服務注冊并更新)。

服務注冊(Registry)

上面說了,initScheduledTasks()方法中調用了InstanceInfoReplicator.start()方法,InstanceInfoReplicator 的 run()方法代碼如下:

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}           

發現 InstanceInfoReplicator的run方法,run方法中會調用DiscoveryClient的register方法。DiscoveryClient 的 register方法 代碼如下:

/**
 * Register with the eureka service by making the appropriate REST call.
 */
boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}           

最終又經過一系列調用,最終會調用到AbstractJerseyEurekaHttpClient的register方法,代碼如下:

public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                    response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}           

可以看到最終通過http rest請求eureka server端,把應用自身的InstanceInfo執行個體注冊給server端,我們再來完整梳理一下服務注冊流程:

Eureka 源碼分析之 Eureka Client

Renew服務續約

服務續約和服務注冊非常類似,HeartbeatThread 代碼如下:

private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            //更新最後一次心跳的時間
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}
// 續約的主方法
boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}           

發送心跳 ,請求eureka server 端 ,如果接口傳回值為404,就是說服務不存在,那麼重新走注冊流程。

如果接口傳回值為404,就是說不存在,從來沒有注冊過,那麼重新走注冊流程。

服務續約流程如下圖:

Eureka 源碼分析之 Eureka Client

服務下線cancel

在服務shutdown的時候,需要及時通知服務端把自己剔除,以避免用戶端調用已經下線的服務,shutdown()方法代碼如下:

public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");

        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }
        
        // 關閉各種定時任務
        // 關閉重新整理執行個體資訊/注冊的定時任務
        // 關閉續約(心跳)的定時任務
        // 關閉擷取注冊資訊的定時任務
        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null
                && clientConfig.shouldRegisterWithEureka()
                && clientConfig.shouldUnregisterOnShutdown()) {
            // 更改執行個體狀态,使執行個體不再接收流量
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            //向EurekaServer端發送下線請求
            unregister();
        }

        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }

        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        logger.info("Completed shut down of DiscoveryClient");
    }
}

private void cancelScheduledTasks() {
    if (instanceInfoReplicator != null) {
        instanceInfoReplicator.stop();
    }
    if (heartbeatExecutor != null) {
        heartbeatExecutor.shutdownNow();
    }
    if (cacheRefreshExecutor != null) {
        cacheRefreshExecutor.shutdownNow();
    }
    if (scheduler != null) {
        scheduler.shutdownNow();
    }
}
    
void unregister() {
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
        }
    }
}           

先關閉各種定時任務,然後向eureka server 發送服務下線通知。服務下線流程如下圖:

Eureka 源碼分析之 Eureka Client

參考

https://github.com/Netflix/eureka/wiki http://yeming.me/2016/12/01/eureka1/ http://blog.didispace.com/springcloud-sourcecode-eureka/ https://www.jianshu.com/p/71a8bdbf03f4