天天看点

《Loy解说Eureka服务端源码(二)》

01 | 说明

    接着上一篇文章,上一篇主要讲解了Eureka的搭建过程,以及自动配置类的一些相关配置,本篇重点说明以下内容

    1.1  服务端是如何接收客户端请求的?

02 | 服务端如何接收客户端请求

    2.1 构建Jersey过滤器

           1. 该过滤器的作用类似于Spring-MVC,用于拦截rest请求

           2. 与Spring-MVC不同的是,Spring-MVC使用拦截器进行请求拦截,而Jersey使用过滤器拦截。

      2.2 构建过程

@Bean
 //jersey过滤器
  public FilterRegistrationBean jerseyFilterRegistration(
           javax.ws.rs.core.Application eurekaJerseyApp) {
       FilterRegistrationBean bean = new FilterRegistrationBean();
       bean.setFilter(new ServletContainer(eurekaJerseyApp));
       bean.setOrder(Ordered.LOWEST_PRECEDENCE);
       bean.setUrlPatterns(
                Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

        return bean;
    }


@Bean
// 构造一个jersey应用,相当于Spring-mvc,构建rest请求
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
			ResourceLoader resourceLoader) {

		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
				false, environment);

		// Filter to include only classes that have a particular annotation.
		// 过滤Path.class以及Provider.class注解修饰的类
		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));

		// Find classes in Eureka packages (or subpackages)
		// 通过指定的包名查找类
		Set<Class<?>> classes = new HashSet<>();
		for (String basePackage : EUREKA_PACKAGES) {
			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
			for (BeanDefinition bd : beans) {
				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
						resourceLoader.getClassLoader());
				classes.add(cls);
			}
		}

		// Construct the Jersey ResourceConfig
		Map<String, Object> propsAndFeatures = new HashMap<>();
		propsAndFeatures.put(
				// Skip static content used by the webapp
				ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");

		DefaultResourceConfig rc = new DefaultResourceConfig(classes);
		rc.setPropertiesAndFeatures(propsAndFeatures);

		return rc;
	}
           

    2.3  处理实例的注册过程

        1.   通过上面构建Jersey应用可知,在构建过程中,会对指定的包名进行扫描,通过断点可知,最终扫描的类主要有九个,其中我们最关心的是ApplicationResource这个类。

《Loy解说Eureka服务端源码(二)》

        客户端的注册请求,主要就是在这个ApplicationResource进行拦截及处理。处理的过程如下:

        2. 通过请求进入该类的addInstance方法,该方法主要用于判断请求实例的一些基本信息,包括实例ID是否为空,hostName是否为空等  

《Loy解说Eureka服务端源码(二)》

    2.4. 启动实例节点注册器

         1. 构建实例节点注册器,用于注册节点

@Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
			ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); 
         // force initialization
		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
				serverCodecs, this.eurekaClient,
				this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}
           

  2.5  服务端context初始化

      2.5.1  代码

@Bean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}
           

      2.5.1 EurekaContext 初始化的过程

            1. 通过DefaultEurekaServerContext的initialize()方法进行初始化

            2. 通过peerEurekaNodes.start()方法,进行定时的刷新节点信息(关于定时刷新节点信息,后续补充源码分析,这里只讲述有这么个过程)

            3. 通过注册器的实现类,进行注册器的初始化 registry.init(peerEurekaNodes) 实现类为:PeerAwareInstanceRegistryImpl

2.6 构建Eureka节点信息

     2.6.1 代码

@Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
			ServerCodecs serverCodecs) {
		return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
	}
           

2.7 启动EurekaServerBootstrap

          2.7.1 代码

@Bean
	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
			EurekaServerContext serverContext) {
		return new EurekaServerBootstrap(this.applicationInfoManager,
				this.eurekaClientConfig, this.eurekaServerConfig, registry,
				serverContext);
	}
           

          2.7.2 初始化

                 1.  初始化Eureka的环境信息 initEurekaEnvironment();

                 2.  初始化Eureka的Context信息 initEurekaServerContext();

2.8 同步节点信息的过程分析

        2.8.1 同步所有节点信息

@Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            
            // 获取所有的节点信息
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                         // 判断是否可以注册
                        if (isRegisterable(instance)) {
                            // 进行注册
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }
           

          2.8.2  注册过程说明

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        // 上只读锁
        read.lock();
        // 从本地MAP里面获取当前实例的信息。
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        // 增加注册次数到监控信息里面去。
        REGISTER.increment(isReplication);
        if (gMap == null) {
            // 如果第一次进来,那么gMap为空,则创建一个ConcurrentHashMap放入到registry里面去
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            // putIfAbsent方法主要是在向ConcurrentHashMap中添加键—值对的时候,它会先判断该键值对是否已经存在。
            // 如果不存在(新的entry),那么会向map中添加该键值对,并返回null。
            // 如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值。
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                // 表明map中确实不存在,则设置gMap为最新创建的那个
                gMap = gNewMap;
            }
        }
        // 从MAP中查询已经存在的Lease信息 (比如第二次来)
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // 当Lease的对象不为空时。
        if (existingLease != null && (existingLease.getHolder() != null)) {
            // 当instance已经存在是,和客户端的instance的信息做比较,时间最新的那个,为有效instance信息
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // server
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();   // client
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            // 这里只有当existinglease不存在时,才会进来。 像那种恢复心跳,信息过期的,都不会进入这里。
            //  Eureka-Server的自我保护机制做的操作,为每分钟最大续约数+2 ,同时重新计算每分钟最小续约数
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        // 构建一个最新的Lease信息
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            // 当原来存在Lease的信息时,设置他的serviceUpTimestamp, 保证服务开启的时间一直是第一次的那个
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 放入本地Map中
        gMap.put(registrant.getId(), lease);
        // 添加到最近的注册队列里面去,以时间戳作为Key, 名称作为value,主要是为了运维界面的统计数据。
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        // 分析instanceStatus
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        // 得到instanceStatus,判断是否是UP状态,
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        // 设置注册类型为添加
        registrant.setActionType(ActionType.ADDED);
        // 租约变更记录队列,记录了实例的每次变化, 用于注册信息的增量获取、
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        // 清理缓存 ,传入的参数为key
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}
           

      (1) 加锁,避免多实例同时注册并发问题

      (2)从本地MAP里面获取当前实例信息,如果是第一次进入,则Map为空,则创建一个ConcurrentMap,用于存储对应的实例信息

      (3)构建实例相关信息,加入Map中

      (4)分析实例的状态

      (5)设置注册类型为添加

      (6)清理缓存

继续阅读