天天看點

Spring Cloud Gateway源碼解析-09-結合注冊中心實作動态路由示例原理

示例

SCG配置

spring:
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true # 預設關閉
          url-expression: "'lb://'+serviceId" #路由目标位址表達式,預設值就是"'lb://'+serviceId"
    #配置nacos注冊中心
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848 #位址
           

假如我們已經在nacos中注冊了一個user-service,user-service有一個接口為/api/hello,當我們請求SCG /user-service/api/hello時就會請求到user-service。

Spring Cloud Gateway源碼解析-09-結合注冊中心實作動态路由示例原理

原理

核心類之RouteDefinition的裝配

Spring Cloud Gateway源碼解析-09-結合注冊中心實作動态路由示例原理

補充了下之前的圖

Spring Cloud Gateway源碼解析-09-結合注冊中心實作動态路由示例原理

右上角紅色的部分為結合注冊中心涉及的類。從圖中可以看到它們之間的關系,

DiscoveryLocatorProperties

GatewayProperties

類似用于讀取discovery相關的配置,通過

DiscoveryLocatorPropertis

裝配

DiscoveryClientRouteDefinitionLocator

DiscoveryClientRouteDefinitionLocator

RouteDefinitionLocator

的子類,也是用來存放

RouteDefinition

的,最終會同

PropertiesRouteDefinitionLocator

一樣被組合到

CompositeRouteDefinitionLocator

中。

DiscoveryLocatorProperties

DiscoveryClientRouteDefinitionLocator

是在

GatewayDiscoveryClientAutoConfiguration

裝配的。

DiscoveryLocatorProperties

DiscoveryRouteDefinition會使用

PathRoutePredicateFactory

RewritePathGatewayFilterFactory

,進行Path比對和請求Path重寫。

@ConfigurationProperties("spring.cloud.gateway.discovery.locator")
public class DiscoveryLocatorProperties {

	//開啟辨別,預設關閉
	private boolean enabled = false;

	/**
	 * 路由ID字首,預設為DiscoveryClient的類名稱 {@link org.springframework.cloud.client.discovery.DiscoveryClient}
	 */
	private String routeIdPrefix;

	//是否使用SpEL表達式
	private String includeExpression = "true";

	//用來建立路由Route的uri表達式,最終會被解析為類似uri=lb://user-service,可覆寫
	private String urlExpression = "'lb://'+serviceId";

	/**
	 * Option to lower case serviceId in predicates and filters, defaults to false. Useful
	 * with eureka when it automatically uppercases serviceId. so MYSERIVCE, would match
	 * /myservice/**
	 */
	private boolean lowerCaseServiceId = false;
	
	private List<PredicateDefinition> predicates = new ArrayList<>();

	private List<FilterDefinition> filters = new ArrayList<>();
           

GatewayDiscoveryClientAutoConfiguration

public static List<PredicateDefinition> initPredicates() {
		ArrayList<PredicateDefinition> definitions = new ArrayList<>();
		// add a predicate that matches the url at /serviceId/**
		PredicateDefinition predicate = new PredicateDefinition();
		//設定Predicate名稱,Path,DiscoveryRouteDefinition會使用PathRoutePredicateFactory
		predicate.setName(normalizeRoutePredicateName(PathRoutePredicateFactory.class));
		//設定Path參數,serviceId會在DiscoveryClientRouteDefinitionLocator#getRouteDefinition中替換為注冊中心上的服務名,例如user-service
		predicate.addArg(PATTERN_KEY, "'/'+serviceId+'/**'");
		definitions.add(predicate);
		return definitions;
	}

	public static List<FilterDefinition> initFilters() {
		ArrayList<FilterDefinition> definitions = new ArrayList<>();

		// add a filter that removes /serviceId by default
		FilterDefinition filter = new FilterDefinition();
		//設定使用的過濾器,此處使用RewritePathGatewayFilterFactory,因為後邊會重寫請求Path
		filter.setName(normalizeFilterFactoryName(RewritePathGatewayFilterFactory.class));
		//同Predicate,會在DiscoveryClientRouteDefinitionLocator#getRouteDefinition中将'service-id'替換為注冊中心上的服務名,例如 /user-service/(?<remaining>.*)
		String regex = "'/' + serviceId + '/(?<remaining>.*)'";
		String replacement = "'/${remaining}'";
		filter.addArg(REGEXP_KEY, regex);
		filter.addArg(REPLACEMENT_KEY, replacement);
		definitions.add(filter);

		return definitions;
	}

	@Bean
	public DiscoveryLocatorProperties discoveryLocatorProperties() {
		DiscoveryLocatorProperties properties = new DiscoveryLocatorProperties();
		//設定Predicate
		properties.setPredicates(initPredicates());
		//設定GatewayFilter
		properties.setFilters(initFilters());
		return properties;
	}
           

結合注冊中心其實有兩種

DiscoveryClient

使用,一種是原始的

DiscoveryClient

,一種是

ReactiveDiscoveryClient

,不同的注冊中心都有相應的實作,如nacos的

NacosReactiveDiscoveryClient

。可以通過配置

spring.cloud.discovery.reactive.enabled=true

來開啟使用Reactive模式的。

@Configuration(proxyBeanMethods = false)
	@ConditionalOnProperty(value = "spring.cloud.discovery.reactive.enabled",
			matchIfMissing = true)
	public static class ReactiveDiscoveryClientRouteDefinitionLocatorConfiguration {

		/**
		 *
		 * @param discoveryClient Reactive的實作,如果使用nacos,這裡注入的為 {@link com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClient}
		 */
		@Bean
		@ConditionalOnProperty(name = "spring.cloud.gateway.discovery.locator.enabled")
		public DiscoveryClientRouteDefinitionLocator discoveryClientRouteDefinitionLocator(
				ReactiveDiscoveryClient discoveryClient,
				DiscoveryLocatorProperties properties) {
			return new DiscoveryClientRouteDefinitionLocator(discoveryClient, properties);
		}

	}

	/**
	 * @deprecated In favor of the native reactive service discovery capability.
	 */
	@Configuration(proxyBeanMethods = false)
	@Deprecated
	@ConditionalOnProperty(value = "spring.cloud.discovery.reactive.enabled",
			havingValue = "false")
	public static class BlockingDiscoveryClientRouteDefinitionLocatorConfiguration {

		@Bean
		@ConditionalOnProperty(name = "spring.cloud.gateway.discovery.locator.enabled")
		public DiscoveryClientRouteDefinitionLocator discoveryClientRouteDefinitionLocator(
				DiscoveryClient discoveryClient, DiscoveryLocatorProperties properties) {
			return new DiscoveryClientRouteDefinitionLocator(discoveryClient, properties);
		}

	}
           

DiscoveryClientRouteDefinitionLocator

DiscoveryClientRouteDefinitionLocator

的主要工作是擷取到所有的注冊中心上的服務執行個體,根據服務資訊建立PredicateDefnition->FilterDefinition->RouteDefinition。供

CompositeRouteDefinitionLocator

擷取。

每一個服務都會生成一個RouteDefinition。

public class DiscoveryClientRouteDefinitionLocator implements RouteDefinitionLocator {

	private final DiscoveryLocatorProperties properties;

	private final String routeIdPrefix;

	private final SimpleEvaluationContext evalCtxt;

	private Flux<List<ServiceInstance>> serviceInstances;

	public DiscoveryClientRouteDefinitionLocator(ReactiveDiscoveryClient discoveryClient,
			DiscoveryLocatorProperties properties) {
		this(discoveryClient.getClass().getSimpleName(), properties);
		//通過對應注冊中心的discoveryClient擷取到所有的服務執行個體
		serviceInstances = discoveryClient.getServices()
				.flatMap(service -> discoveryClient.getInstances(service).collectList());
	}

	private DiscoveryClientRouteDefinitionLocator(String discoveryClientName,
			DiscoveryLocatorProperties properties) {
		this.properties = properties;
		//判斷是否有路由ID字首,如果沒有則
		if (StringUtils.hasText(properties.getRouteIdPrefix())) {
			routeIdPrefix = properties.getRouteIdPrefix();
		}
		else {
			routeIdPrefix = discoveryClientName + "_";
		}
		evalCtxt = SimpleEvaluationContext.forReadOnlyDataBinding().withInstanceMethods()
				.build();
	}

	@Override
	public Flux<RouteDefinition> getRouteDefinitions() {

		return serviceInstances.filter(instances -> !instances.isEmpty())
				.map(instances -> instances.get(0)).filter(includePredicate)
				.map(instance -> {
					//建立RouteDefinition
					RouteDefinition routeDefinition = buildRouteDefinition(urlExpr,
							instance);

					final ServiceInstance instanceForEval = new DelegatingServiceInstance(
							instance, properties);

					for (PredicateDefinition original : this.properties.getPredicates()) {
						//根據服務資訊重新建構PredicateDefinition
						PredicateDefinition predicate = new PredicateDefinition();
						predicate.setName(original.getName());
						for (Map.Entry<String, String> entry : original.getArgs()
								.entrySet()) {
							//将Path參數值的service-id替換為服務名稱,如/user-service/**
							String value = getValueFromExpr(evalCtxt, parser,
									instanceForEval, entry);
							predicate.addArg(entry.getKey(), value);
						}
						routeDefinition.getPredicates().add(predicate);
					}

					for (FilterDefinition original : this.properties.getFilters()) {
						FilterDefinition filter = new FilterDefinition();
						filter.setName(original.getName());
						for (Map.Entry<String, String> entry : original.getArgs()
								.entrySet()) {
							//将Filter的regex -> '/' + serviceId + '/(?<remaining>.*)' 中的serviceId替換為服務ID 如user-service
							String value = getValueFromExpr(evalCtxt, parser,
									instanceForEval, entry);
							filter.addArg(entry.getKey(), value);
						}
						routeDefinition.getFilters().add(filter);
					}

					return routeDefinition;
				});
	}

	protected RouteDefinition buildRouteDefinition(Expression urlExpr,
			ServiceInstance serviceInstance) {
		//擷取服務ID,預設小寫
		String serviceId = serviceInstance.getServiceId();
		RouteDefinition routeDefinition = new RouteDefinition();
		//設定路由ID
		routeDefinition.setId(this.routeIdPrefix + serviceId);
		//通過Spel解析器生成RouteUri
		String uri = urlExpr.getValue(this.evalCtxt, serviceInstance, String.class);
		routeDefinition.setUri(URI.create(uri));
		//設定中繼資料資訊,包括權重、健康狀态等
		routeDefinition.setMetadata(new LinkedHashMap<>(serviceInstance.getMetadata()));
		return routeDefinition;
	}
}
           

請求處理

RewritePathGatewayFilterFactory

上邊講到了當結合注冊中心時SCG會為每個路由添加

PathRoutePredicateFactory

RewritePathGatewayFilterFactory

PathRoutePredicateFactory

用來計算請求是否符合目前路由的條件,

RewritePathGatewayFilterFactory

用來重寫請求Path,參數

regexp=/user-service/(?<remaining>.*),replacement=$(remaining)

,例如請求的Path為/user-service/api/hello,會被重寫為

/api/hello

public class RewritePathGatewayFilterFactory
		extends AbstractGatewayFilterFactory<RewritePathGatewayFilterFactory.Config> {

	/**
	 * Regexp key.
	 */
	public static final String REGEXP_KEY = "regexp";

	/**
	 * Replacement key.
	 */
	public static final String REPLACEMENT_KEY = "replacement";

	public RewritePathGatewayFilterFactory() {
		super(Config.class);
	}

	@Override
	public List<String> shortcutFieldOrder() {
		return Arrays.asList(REGEXP_KEY, REPLACEMENT_KEY);
	}

	@Override
	public GatewayFilter apply(Config config) {
		String replacement = config.replacement.replace("$\\", "$");
		return new GatewayFilter() {
			@Override
			public Mono<Void> filter(ServerWebExchange exchange,
					GatewayFilterChain chain) {
				ServerHttpRequest req = exchange.getRequest();
				//每次進行重寫時,都在上下文中保留一次原址的請求URI
				addOriginalRequestUrl(exchange, req.getURI());
				String path = req.getURI().getRawPath();
				//根據配置的正則進行替換
				// regexp=/user-service/(?<remaining>.*),replacement=$(remaining),例如請求的Path為/user-service/api/hello,會被重寫為/api/hello。
				String newPath = path.replaceAll(config.regexp, replacement);
				//基于重寫後的Path建構新的請求
				ServerHttpRequest request = req.mutate().path(newPath).build();
				//将新的請求URI放入上下文中,供後邊的Filter使用
				exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, request.getURI());

				return chain.filter(exchange.mutate().request(request).build());
			}

			@Override
			public String toString() {
				return filterToStringCreator(RewritePathGatewayFilterFactory.this)
						.append(config.getRegexp(), replacement).toString();
			}
		};
	}
           

RouteToRequestUrlFilter

RewritePathGatewayFilterFactory

重寫完請求Path後會執行GlobalFilter

RouteToRequestUrlFilter

,該Filter在結合注冊中心的情況下,主要是用來将

RewritePathGatewayFilterFactory

生成的新的request的scheme修改為路由的lb,例如

RewritePathGatewayFilterFactory

生成的請求URI為

http://locahost:8080/api/hello

RouteToRequestUrlFilter

會将其修改為

lb://user-service/api/hello

,供

lbClicentFilter

使用。

@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
		//判斷上下中是否有GATEWAY_ROUTE_ATTR,在RoutePredicateHandlerMapping中放入的
		//如果沒有則不執行
		if (route == null) {
			return chain.filter(exchange);
		}
		log.trace("RouteToRequestUrlFilter start");
		//擷取請求的URI
		URI uri = exchange.getRequest().getURI();
		//判斷是否包含編碼的部分,如%
		boolean encoded = containsEncodedParts(uri);
		//擷取Route的uri
		URI routeUri = route.getUri();

		//判斷是否為其他類型的協定
		if (hasAnotherScheme(routeUri)) {
			// this is a special url, save scheme to special attribute
			// replace routeUri with schemeSpecificPart
			//将目前請求的schema放入上下文中
			exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
					routeUri.getScheme());
			routeUri = URI.create(routeUri.getSchemeSpecificPart());
		}
		//如果RouteUri以lb開頭,必須請求中帶有host
		if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
			// Load balanced URIs should always have a host. If the host is null it is
			// most
			// likely because the host name was invalid (for example included an
			// underscore)
			throw new IllegalStateException("Invalid host: " + routeUri.toString());
		}
		//生成RequestURL,并放入上下文中
		//使用RouteUri的scheme,如果使用lb的話,那麼此處生成的mergedUrl則是lb://xxxxxx
		URI mergedUrl = UriComponentsBuilder.fromUri(uri)
				// .uri(routeUri)
				.scheme(routeUri.getScheme()).host(routeUri.getHost())
				.port(routeUri.getPort()).build(encoded).toUri();
		//将新的URL放入請求上下文
		exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
		return chain.filter(exchange);
	}

}
           

LoadBalancerClientFilter

到目前為止,還沒有擷取到真正要調用的服務資訊,

LoadBalancerClientFilter

就是做這件事的。

@Override
	@SuppressWarnings("Duplicates")
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
		String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
		//如果不是lb的請求,則不執行
		if (url == null
				|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
			return chain.filter(exchange);
		}
		// preserve the original url
		//保留原始的請求位址
		addOriginalRequestUrl(exchange, url);

		if (log.isTraceEnabled()) {
			log.trace("LoadBalancerClientFilter url before: " + url);
		}
		//負載均衡擷取真實的服務資訊
		final ServiceInstance instance = choose(exchange);

		if (instance == null) {
			throw NotFoundException.create(properties.isUse404(),
					"Unable to find instance for " + url.getHost());
		}

		URI uri = exchange.getRequest().getURI();

		// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
		// if the loadbalancer doesn't provide one.
		String overrideScheme = instance.isSecure() ? "https" : "http";
		if (schemePrefix != null) {
			overrideScheme = url.getScheme();
		}
		//使用最終調用服務資訊建構URI
		URI requestUrl = loadBalancer.reconstructURI(
				new DelegatingServiceInstance(instance, overrideScheme), uri);

		if (log.isTraceEnabled()) {
			log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
		}
		//将請求URI放入上下文,供NettyRoutingFilter使用
		exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
		return chain.filter(exchange);
	}

	protected ServiceInstance choose(ServerWebExchange exchange) {
		//此處調用RibbonLoadBalancer負載均衡擷取真實服務資訊
		return loadBalancer.choose(
				((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
	}
           

動态路由重新整理

事件機制。

在SCG中有

RouteRefreshListener

用來監聽重新整理的事件,比如Nacos使用

NacosWatch

來發送

HeartbeatEvent。

public void nacosServicesWatch() {

	// nacos doesn't support watch now , publish an event every 30 seconds.
	this.publisher.publishEvent(
			new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));

}
           

RouteRefreshListener

中監聽到

HeartbeatEvent

後會發送

RefreshRoutesEvent

CachingRouteLocator

中監聽了該事件,而後觸發

DiscoveryClientRouteDefinitionLocator#getRouteDefinition

從注冊中心重新擷取一次服務資訊,生成RouteDefinition。

繼續閱讀