天天看点

concurrenthashmap实现原理_SpringCloud源码解析 -- Eureka原理探究

concurrenthashmap实现原理_SpringCloud源码解析 -- Eureka原理探究

SpringCloud源码解析 -- RestTemplate与@LoadBalanced

SpringCloud源码解析 -- Spring Cloud Config与@RefreshScope

SpringCloud源码解析 -- Zuul实现原理

SpringCloud源码解析 -- Spring Cloud Sleuth原理探究

SpringCloud源码解析 -- Eureka原理探究

本文通过阅读Eureka源码,分享Eureka的实现原理。

本文主要梳理Eureka整体设计及实现,并不一一列举Eureka源码细节。

源码分析基于Spring Cloud Hoxton,Eureka版本为1.9

Eureka分为Eureka Client,Eureka Server,多个Eureka Server节点组成一个Eureka集群,服务通过Eureka Client注册到Eureka Server。

concurrenthashmap实现原理_SpringCloud源码解析 -- Eureka原理探究
CAP理论

指出,一个分布式系统不可能同时满足C(一致性)、A(可用性)和P(分区容错性)。

由于分布式系统中必须保证分区容错性,因此我们只能在A和C之间进行权衡。

Zookeeper保证的是CP, 而Eureka则是保证AP。

为什么呢?

在注册中心这种场景中,可用性比一致性更重要。

作为注册中心,其实数据是不经常变更的,只有服务发布,机器上下线,服务扩缩容时才变更。

因此Eureka选择AP,即使出问题了,也返回旧数据,保证服务能(最大程度)正常调用, 避免出现因为注册中心的问题导致服务不可用这种得不偿失的情况。

所以,Eureka各个节点都是平等的(去中心化的架构,无master/slave区分),挂掉的节点不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务。

Eureka Client

Eureka 1.9只要引入spring-cloud-starter-netflix-eureka-client依赖,即使不使用@EnableDiscoveryClient或@EnableEurekaClient注解,服务也会注册到Eureka集群。

client主要逻辑在com.netflix.discovery.DiscoveryClient实现,EurekaClientAutoConfiguration中构建了其子类CloudEurekaClient。

定时任务

DiscoveryClient#initScheduledTasks方法设置定时任务,主要有CacheRefreshThread,HeartbeatThread,以及InstanceInfoReplicator。

同步

服务注册信息缓存在DiscoveryClient#localRegionApps变量中,CacheRefreshThread负责定时从Eureka Server读取最新的服务注册信息,更新到本地缓存。

CacheRefreshThread -> DiscoveryClient#refreshRegistry -> DiscoveryClient#fetchRegistry

当存在多个Eureka Server节点时,Client会与eureka.client.serviceUrl.defaultZone配置的第一个Server节点同步数据,当第一个Server节点同步失败,才会同步第二个节点,以此类推。

从DiscoveryClient#fetchRegistry可以看到,同步数据有两个方法

(1)全量同步

由DiscoveryClient#getAndStoreFullRegistry方法实现,通过Http Get调用Server接口

apps/

获取Server节点中所有服务注册信息替换DiscoveryClient#localRegionApps

注意:Client请求Server端的服务,都是通过EurekaHttpClient接口发起,该接口实现类EurekaHttpClientDecorator通过RequestExecutor接口将请求委托给其他EurekaHttpClient实现类,并提供execute方法给子类实现扩展处理(该扩展处理可以针对每一个EurekaHttpClient方法,类似AOP)。子类RetryableEurekaHttpClient#execute中,会获取eureka.client.service-url.defaultZone中配置的地址,通过TransportClientFactory#newClient,构造一个RestTemplateTransportClientFactory,再真正发起请求。

(2)增量同步

由DiscoveryClient#getAndUpdateDelta方法实现,通过Http Get调用Server接口

apps/delta

,获取最新ADDED、MODIFIED,DELETED操作,更新本地缓存。

如果获取最新操作失败,则会发起全量同步。

配置:

eureka.client.fetch-registry,是否定时同步信息,默认true

eureka.client.registry-fetch-interval-seconds,间隔多少秒同步一次服务注册信息,默认30

心跳

HeartbeatThread -> DiscoveryClient#renew -> EurekaHttpClient#sendHeartBeat

通过Http Put调用Server接口

apps/{appName}/{instanceId}

appName是服务的spring.application.name,instanceId是服务IP加服务端口。

注意:如果Server返回NOT_FOUND状态,则重新注册。

配置:

eureka.client.register-with-eureka,当前应用是否注册到Eureka集群,默认true

eureka.instance.lease-renewal-interval-in-seconds,间隔多少秒发送一次心跳,默认30

注册

DiscoveryClient#构造函数 -> DiscoveryClient#register

通过Http Post调用Server接口

apps/{appName}

,发送当前应用的注册信息到Server。

配置:

eureka.client.register-with-eureka,当前应用是否注册到Eureka集群,默认true

eureka.client.should-enforce-registration-at-init,是否在初始化时注册,默认false

InstanceInfoReplicator

InstanceInfoReplicator任务会去监测应用自身的IP信息以及配置信息是否发生改变,如果发生改变,则会重新发起注册。

配置:

eureka.client.initial-instance-info-replication-interval-seconds,间隔多少秒检查一次自身信息,默认40

下线

EurekaClientAutoConfiguration配置了CloudEurekaClient的销毁方法

@Bean(destroyMethod = "shutdown")
           

DiscoveryClient#shutdown方法完成下线的处理工作,包括取消定时任务,调用unregister方法(通过Http Delete调用Server接口

apps/{appName}/{id}

),取消监控任务等

Eureka Server

@EnableEurekaServer引入EurekaServerMarkerConfiguration,EurekaServerMarkerConfiguration构建EurekaServerMarkerConfiguration.Marker。

EurekaServerAutoConfiguration会在Spring上下文中存在EurekaServerMarkerConfiguration.Marker时生效,构造Server端组件类。

Eureka Server也要使用DiscoveryClient,拉取其他Server节点的服务注册信息或者将自身注册到Eureka集群中。

启动同步

Server启动时,需要从相邻Server节点获取服务注册信息,同步到自身内存。

Server的服务注册信息存放在AbstractInstanceRegistry#registry变量中,类型为ConcurrentHashMap>>。

外层Map Key为appName,外层Map Key为instanceId,Lease代表Client与Server之间维持的一个契约。InstanceInfo保存具体的服务注册信息,如instanceId,appName,ipAddr,port等。

EurekaServerBootstrap是Server端的启动引导类,EurekaServerInitializerConfiguration实现了Lifecycle接口,start方法调用eurekaServerBootstrap.contextInitialized完成Server端初始化。

eurekaServerBootstrap.contextInitialized -> EurekaServerBootstrap#initEurekaServerContext -> PeerAwareInstanceRegistryImpl#syncUp -> AbstractInstanceRegistry#register

PeerAwareInstanceRegistryImpl#syncUp调用DiscoveryClient#getApplications方法,获取相邻server节点的所有服务注册信息,再调用AbstractInstanceRegistry#register方法,注册到AbstractInstanceRegistry#registry变量中。

AbstractInstanceRegistry#register

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        ...
        // #1
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());   
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            ...
            // #2
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {  
                registrant = existingLease.getHolder();
            }
        } else {
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    // #3
                    updateRenewsPerMinThreshold();  
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // #4
        gMap.put(registrant.getId(), lease);    
        ...
        registrant.setActionType(ActionType.ADDED);
        // #5
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));   
        registrant.setLastUpdatedTimestamp();
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); 
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}
           

#1

通过appName,instanceId查询已有的Lease

#2

如果该服务已存在Lease,并且LastDirtyTimestamp的值更大,使用已存在的Lease。

#3

更新numberOfRenewsPerMinThreshold,该值用于自我保护模式。

#4

构建一个新的Lease,添加到AbstractInstanceRegistry#registry缓存中。

#5

添加recentlyChangedQueue,

apps/delta

接口从中获取最新变更操作。

提供服务

Server通过ApplicationsResource/ApplicationResource/InstanceResource对外提供Http服务。

AbstractInstanceRegistry负责实现cancle,register,renew,statusUpdate,deleteStatusOverride等操作的业务逻辑。

PeerAwareInstanceRegistryImpl通过replicateToPeers方法将操作同步到其他节点,以保证集群节点数据同步。

PeerAwareInstanceRegistryImpl#replicateToPeers方法最后一个参数isReplication,决定是否需要进行同步。

如果Server节点接收到其他Server节点发送的同步操作,是不需要再继续向其他Server同步的,否则会引起循环更新。

该参数通过Http Requst的Header参数x-netflix-discovery-replication决定(只有Client发送的请求该参数才为true)。

数据一致

PeerAwareInstanceRegistryImpl#replicateToPeers方法通过PeerEurekaNodes#getPeerEurekaNodes获取其他server节点地址,

PeerEurekaNodes#peerEurekaNodes变量维护了所有的Server节点信息。

PeerEurekaNodes通过peersUpdateTask任务定时从DNS或配置文件获取最新的Server节点地址列表,并更新PeerEurekaNodes#peerEurekaNodes。

配置:

eureka.server.peer-eureka-nodes-update-interval-ms,间隔多少分钟拉取一次Server节点地址列表,默认10

PeerEurekaNode管理具体一个Server节点,并负责向该Server节点同步register,cancel,heartbeat等操作。

PeerEurekaNode通过定时任务的方式同步这些操作。它维护了两个TaskDispatcher,批处理调度器batchingDispatcher和非批处理调度器nonBatchingDispatcher。

PeerEurekaNode#构造方法调用TaskDispatchers#createBatchingTaskDispatcher构造TaskDispatcher

public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
                                                                         int maxBufferSize,
                                                                         int workloadSize,
                                                                         int workerCount,
                                                                         long maxBatchingDelay,
                                                                         long congestionRetryDelayMs,
                                                                         long networkFailureRetryMs,
                                                                         TaskProcessor<T> taskProcessor) {
    final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
            id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
    );
    final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
    return new TaskDispatcher<ID, T>() {
        public void process(ID id, T task, long expiryTime) {
            acceptorExecutor.process(id, task, expiryTime);
        }

        public void shutdown() {
            acceptorExecutor.shutdown();
            taskExecutor.shutdown();
        }
    };
}
           

TaskDispatcher负责任务分发,过期任务会被抛弃,如果两个任务有相同id,则前一个任务则会被删除。

AcceptorExecutor负责整合任务,将任务放入批次中。

TaskExecutors将整合好的任务(批次)分给TaskProcessor处理,实际处理任务的是ReplicationTaskProcessor。

ReplicationTaskProcessor可以重复执行失败的任务,

ReplicationTaskProcessor#process(List<ReplicationTask> tasks)

处理批次任务,将tasks合并到一个请求,发送到下游Server接口

peerreplication/batch/

任务类为ReplicationTask,它提供了handleFailure方法,当下游Server接口返回statusCode不在[200,300)区间,则调用该方法。

从TaskExecutors#BatchWorkerRunnable的run方法可以看到,

调用下游Server接口时,如果下游返回503状态或发生IO异常,会通过taskDispatcher.reprocess重新执行任务,以保证最终一致性。

如果发生其他异常,只打印日志,不重复执行任务。

配置:

eureka.server.max-elements-in-peer-replication-pool,等待执行任务最大数量,默认为10000

需要注意一下PeerEurekaNode#heartbeat方法,心跳任务实现了handleFailure方法

public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
    super.handleFailure(statusCode, responseEntity);
    if (statusCode == 404) {
        logger.warn("{}: missing entry.", getTaskName());
        if (info != null) {
            logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                    getTaskName(), info.getId(), info.getStatus());
            register(info);
        }
    } 
    ...
}
           

如果下游server节点没有找到服务注册信息,就返回404状态,这时需要重新注册该服务。这点很重要,它可以保证不同Server节点保持数据一致。

假设有一个client,注册到Eureka集群server1,server2,server3。下面来分析两个场景

场景1. client启动时,server1接收带client的注册信息,但同步给server2前宕机了,怎么办?

这时,client定时发起心跳,但它与server1心跳操作失败,只能向server2发起心跳,server2返回404(NOT_FOUND状态),client重新注册。

场景2. server3与其他机器server1,server2之间出现了网络分区,这时client注册到eureka集群。然后网络恢复了,server3怎么同步数据呢?

当server1向server3同步心跳时,server3返回404,于是server1重新向server3注册client信息,数据最终保持一致。

主动失效

AbstractInstanceRegistry#deltaRetentionTimer任务会定时移除recentlyChangedQueue中过期的增量操作信息

配置:

eureka.server.delta-retention-timer-interval-in-ms,间隔多少秒清理一次过期的增量操作信息,默认30

eureka.server.retention-time-in-m-s-in-delta-queue,增量操作保留多少分钟,默认3

AbstractInstanceRegistry#evictionTimer任务会定时剔除AbstractInstanceRegistry#registry中已经过期的(太久没收到心跳)服务注册信息。

计算服务失效时间时还要加上补偿时间,即计算本次任务执行的时间和上次任务执行的时间差,若超过eviction-interval-timer-in-ms配置值则加上超出时间差作为补偿时间。

每次剔除服务的数量都有一个上限,为注册服务数量*renewal-percent-threshold,Eureka会随机剔除过期的服务。

配置:

eureka.server.eviction-interval-timer-in-ms,间隔多少秒清理一次过期的服务,默认60

eureka.instance.lease-expiration-duration-in-seconds,间隔多少秒没收到心跳则判定服务过期,默认90

eureka.server.renewal-percent-threshold,自我保护阀值因子,默认0.85

自我保护机制

PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask,定时更新numberOfRenewsPerMinThreshold,该值用于判定是否进入自我保护模式,在自我保护模式下,AbstractInstanceRegistry#evictionTimer任务直接返回,不剔除过期服务。

numberOfRenewsPerMinThreshold计算在PeerAwareInstanceRegistryImpl#updateRenewsPerMinThreshold

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}
           

expectedNumberOfClientsSendingRenews

-> 已注册服务总数

60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()

-> expected-client-renewal-interval-seconds配置了Client间隔多少秒发一次心跳,这里计算一个Client每分钟发送心跳数量。

RenewalPercentThreshold 自我保护阀值因子。

可以看到,numberOfRenewsPerMinThreshold表示一分钟内Server接收心跳最低次数,实际数量少于该值则进入自我保护模式。

此时Eureka认为客户端与注册中心出现了网络故障(比如网络故障或频繁的启动关闭客户端),不再剔除任何服务,它要等待网络故障恢复后,再退出自我保护模式。这样可以最大程度保证服务间正常调用。

PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled方法判定当前是否处于自我保护模式。该方法比较renewsLastMin中的值是否大于numberOfRenewsPerMinThreshold,AbstractInstanceRegistry#renewsLastMin统计一分钟内心跳次数。

配置:

eureka.server.enable-self-preservation,是否启用自我保护机制,默认为true

eureka.server.expected-client-renewal-interval-seconds,Client间隔多少秒发送一次心跳

eureka.server.renewal-percent-threshold,自我保护阀值因子,默认0.85

状态更新

InstanceInfo维护了状态变量status和覆盖状态变量overriddenStatus。

status是Eureka Client本身发布的状态。

overriddenstatus是手动或通过工具强制执行的状态。

Server端提供服务

apps/{appName}/{instanceId}/status

,可以变更服务实例status以及overriddenStatus,从而主动变更服务状态。

注意,并不会修改Client端的服务状态,而是修改Server段服务注册信息中保存的服务状态。

而Server处理Client注册或心跳时,会使用overriddenstatus覆盖status。

Eureka Client在获取到注册信息时,会调用DiscoveryClient#shuffleInstances方法,过滤掉非InstanceStatus.UP状态的服务实例,从而避免调动该实例,以达到服务实例的暂停服务,而无需关闭服务实例。

InstanceInfo还维护了lastDirtyTimestamp变量,代表服务注册信息最后更新时间。

从InstanceResource可以看到,更新状态statusUpdate或者删除状态deleteStatusUpdate时都可以提供lastDirtyTimestamp,

而处理心跳的renewLease方法,必须有lastDirtyTimestamp参数,validateDirtyTimestamp方法负责检验lastDirtyTimestamp参数

  1. 当lastDirtyTimestamp参数等于当前注册信息中的lastDirtyTimestamp,返回处理成功。
  2. 当lastDirtyTimestamp参数大于当前注册信息中的lastDirtyTimestamp,返回NOT_FOUND状态,表示Client的信息已经过期,需要重新注册。
  3. 当lastDirtyTimestamp参数小于当前注册信息中的lastDirtyTimestamp,返回CONFLICT(409)状态,表示数据冲突,并返回当前节点中该服务的注册信息。

    这时如果心跳是Client发起的,Client会忽略409的返回状态(DiscoveryClient#renew),但如果是其他Server节点同步过来的,发送心跳的Server节点会使用返回的服务注册信息更新本节点的注册信息(PeerEurekaNode#heartbeat)。

配置:

eureka.client.filter-only-up-instances,获取实例时是否只保留UP状态的实例,默认为true

eureka.server.sync-when-timestamp-differs,当时间戳不一致时,是否进行同步数据,默认为true

文本关于Eureka的分享就到这里,我们可以Eureka设计和实现都比较简单,但是非常实用。

我在深入阅读Eureka源码前犹豫了一段时间(毕竟Eureka 2.0 开源流产),不过经过一段时间深入学习,收获不少,希望这篇文章也可以给对Eureka感兴趣的同学提供一个深入学习思路。

如果您觉得本文不错,欢迎关注我的微信公众号,您的关注是我坚持的动力!

concurrenthashmap实现原理_SpringCloud源码解析 -- Eureka原理探究

继续阅读