01 | 说明
接着上一篇文章,上一篇主要讲解了Eureka的搭建过程,以及自动配置类的一些相关配置,本篇重点说明以下内容
1.1 服务端是如何接收客户端请求的?
02 | 服务端如何接收客户端请求
2.1 构建Jersey过滤器
1. 该过滤器的作用类似于Spring-MVC,用于拦截rest请求
2. 与Spring-MVC不同的是,Spring-MVC使用拦截器进行请求拦截,而Jersey使用过滤器拦截。
2.2 构建过程
@Bean
//jersey过滤器
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
@Bean
// 构造一个jersey应用,相当于Spring-mvc,构建rest请求
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
false, environment);
// Filter to include only classes that have a particular annotation.
// 过滤Path.class以及Provider.class注解修饰的类
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// Find classes in Eureka packages (or subpackages)
// 通过指定的包名查找类
Set<Class<?>> classes = new HashSet<>();
for (String basePackage : EUREKA_PACKAGES) {
Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
for (BeanDefinition bd : beans) {
Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
resourceLoader.getClassLoader());
classes.add(cls);
}
}
// Construct the Jersey ResourceConfig
Map<String, Object> propsAndFeatures = new HashMap<>();
propsAndFeatures.put(
// Skip static content used by the webapp
ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
DefaultResourceConfig rc = new DefaultResourceConfig(classes);
rc.setPropertiesAndFeatures(propsAndFeatures);
return rc;
}
2.3 处理实例的注册过程
1. 通过上面构建Jersey应用可知,在构建过程中,会对指定的包名进行扫描,通过断点可知,最终扫描的类主要有九个,其中我们最关心的是ApplicationResource这个类。
客户端的注册请求,主要就是在这个ApplicationResource进行拦截及处理。处理的过程如下:
2. 通过请求进入该类的addInstance方法,该方法主要用于判断请求实例的一些基本信息,包括实例ID是否为空,hostName是否为空等
2.4. 启动实例节点注册器
1. 构建实例节点注册器,用于注册节点
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications();
// force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
2.5 服务端context初始化
2.5.1 代码
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
2.5.1 EurekaContext 初始化的过程
1. 通过DefaultEurekaServerContext的initialize()方法进行初始化
2. 通过peerEurekaNodes.start()方法,进行定时的刷新节点信息(关于定时刷新节点信息,后续补充源码分析,这里只讲述有这么个过程)
3. 通过注册器的实现类,进行注册器的初始化 registry.init(peerEurekaNodes) 实现类为:PeerAwareInstanceRegistryImpl
2.6 构建Eureka节点信息
2.6.1 代码
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
2.7 启动EurekaServerBootstrap
2.7.1 代码
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
2.7.2 初始化
1. 初始化Eureka的环境信息 initEurekaEnvironment();
2. 初始化Eureka的Context信息 initEurekaServerContext();
2.8 同步节点信息的过程分析
2.8.1 同步所有节点信息
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 获取所有的节点信息
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
// 判断是否可以注册
if (isRegisterable(instance)) {
// 进行注册
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
2.8.2 注册过程说明
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 上只读锁
read.lock();
// 从本地MAP里面获取当前实例的信息。
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 增加注册次数到监控信息里面去。
REGISTER.increment(isReplication);
if (gMap == null) {
// 如果第一次进来,那么gMap为空,则创建一个ConcurrentHashMap放入到registry里面去
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
// putIfAbsent方法主要是在向ConcurrentHashMap中添加键—值对的时候,它会先判断该键值对是否已经存在。
// 如果不存在(新的entry),那么会向map中添加该键值对,并返回null。
// 如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值。
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
// 表明map中确实不存在,则设置gMap为最新创建的那个
gMap = gNewMap;
}
}
// 从MAP中查询已经存在的Lease信息 (比如第二次来)
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// 当Lease的对象不为空时。
if (existingLease != null && (existingLease.getHolder() != null)) {
// 当instance已经存在是,和客户端的instance的信息做比较,时间最新的那个,为有效instance信息
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // server
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // client
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// 这里只有当existinglease不存在时,才会进来。 像那种恢复心跳,信息过期的,都不会进入这里。
// Eureka-Server的自我保护机制做的操作,为每分钟最大续约数+2 ,同时重新计算每分钟最小续约数
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 构建一个最新的Lease信息
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 当原来存在Lease的信息时,设置他的serviceUpTimestamp, 保证服务开启的时间一直是第一次的那个
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 放入本地Map中
gMap.put(registrant.getId(), lease);
// 添加到最近的注册队列里面去,以时间戳作为Key, 名称作为value,主要是为了运维界面的统计数据。
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
// 分析instanceStatus
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 得到instanceStatus,判断是否是UP状态,
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// 设置注册类型为添加
registrant.setActionType(ActionType.ADDED);
// 租约变更记录队列,记录了实例的每次变化, 用于注册信息的增量获取、
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 清理缓存 ,传入的参数为key
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
(1) 加锁,避免多实例同时注册并发问题
(2)从本地MAP里面获取当前实例信息,如果是第一次进入,则Map为空,则创建一个ConcurrentMap,用于存储对应的实例信息
(3)构建实例相关信息,加入Map中
(4)分析实例的状态
(5)设置注册类型为添加
(6)清理缓存