天天看点

Spring Cloud Gateway实现灰度发布实现原理

作者:Spring全家桶实战案例

环境:SpringBoot2.6.14 + Spring Cloud Gateway3.1.4

什么是灰度发布

灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/B testing,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现、调整问题,以保证其影响度。

灰度发布类型

  • 金丝雀发布

将少量的请求引流到新版本上,因此部署新版本服务只需极小数的机器。验证新版本符合预期后,逐步调整流量权重比例,使得流量慢慢从老版本迁移至新版本,期间可以根据设置的流量比例,对新版本服务进行扩容,同时对老版本服务进行缩容,使得底层资源得到最大化利用。

Spring Cloud Gateway实现灰度发布实现原理

金丝雀发布的优点:

  1. 按比例将流量无差别地导向新版本,新版本故障影响范围小;
  2. 发布期间逐步对新版本扩容,同时对老版本缩容,资源利用率高。

金丝雀发布的缺点:

  1. 流量无差别地导向新版本,可能会影响重要用户的体验;
  2. 发布周期长。
  • A/B测试

A/B 测试基于用户请求的元信息将流量路由到新版本,这是一种基于请求内容匹配的灰度发布策略。只有匹配特定规则的请求才会被引流到新版本,常见的做法包括基于 Header 和 Cookie。基于 Header 方式例子,例如 User-Agent 的值为 Android 的请求 (来自安卓系统的请求)可以访问新版本,其他系统仍然访问旧版本。基于 Cookie 方式的例子,Cookie 中通常包含具有业务语义的用户信息,例如普通用户可以访问新版本,VIP 用户仍然访问旧版本。

Spring Cloud Gateway实现灰度发布实现原理
  • 蓝绿发布

蓝绿发布需要对服务的新版本进行冗余部署,一般新版本的机器规格和数量与旧版本保持一致,相当于该服务有两套完全相同的部署环境,只不过此时只有旧版本在对外提供服务,新版本作为热备。当服务进行版本升级时,我们只需将流量全部切换到新版本即可,旧版本作为热备。由于冗余部署的缘故,所以不必担心新版本的资源不够。如果新版本上线后出现严重的程序 BUG,那么我们只需将流量全部切回至旧版本,大大缩短故障恢复的时间。

Gateway实现灰度发布

本篇将文章将通过A/B测试方式实现灰度发布。接下来将展示在Spring Cloud Gateway中实现A/B测试核心组件。

  • 引入依赖
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-gateway</artifactId>
  <version>3.1.4</version>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-loadbalancer</artifactId>
  <version>3.1.4</version>
</dependency>           
  • 自定义负载均衡器

自定义负载均衡器作用是根据请求的header中的v进行服务实例的筛选。

public class GrayRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {

  private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class);

  final AtomicInteger position;

  final String serviceId;

  ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

  public GrayRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
      String serviceId) {
    this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
  }

  public GrayRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
      String serviceId, int seedPosition) {
    this.serviceId = serviceId;
    this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
    this.position = new AtomicInteger(seedPosition);
  }

  @SuppressWarnings("rawtypes")
  @Override
  public Mono<Response<ServiceInstance>> choose(Request request) {
    ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
        .getIfAvailable(NoopServiceInstanceListSupplier::new);
    return supplier.get(request).next()
        .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, request));
  }

  @SuppressWarnings("rawtypes")
  private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
      List<ServiceInstance> serviceInstances, Request request) {
    Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances, request);
    if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
      ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
    }
    return serviceInstanceResponse;
  }

  @SuppressWarnings("rawtypes")
  private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, Request request) {
    if (instances.isEmpty()) {
      if (log.isWarnEnabled()) {
        log.warn("No servers available for service: " + serviceId);
      }
      return new EmptyResponse();
    }
    List<ServiceInstance> result = instances.stream().filter(instance -> {
      Map<String, String> metadata = instance.getMetadata();
      Object orgId = metadata.get("v");
      RequestDataContext context = (RequestDataContext) request.getContext() ;
      RequestData requestData = context.getClientRequest() ;
      String v = null ;
      if (requestData instanceof GrayRequestData) {
        GrayRequestData grayRequestData = (GrayRequestData) requestData ;
        queryV = grayRequestData.getQueryParams().getFirst("v") ;
      }
      String value = requestData.getHeaders().getFirst("v") ;
      return v != null && (v.equals(value) || v.equals(queryV)) ;
    }).collect(Collectors.toList());
    if (result.isEmpty()) {
      result = instances;
    }
    int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;

    ServiceInstance instance = result.get(pos % result.size());

    return new DefaultResponse(instance);
  }

}           

以上负载均衡器将从header或者请求参数中获取v参数,然后根据v参数的值从服务实例列表中获取metadata信息进行比对。

全局过滤器

该过滤器的作用是通过上面的负载均衡器从其中选择一个服务实例进行服务的调用

@SuppressWarnings({ "rawtypes", "unchecked" })
@Component
public class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {

  private static final Log log = LogFactory.getLog(GrayReactiveLoadBalancerClientFilter.class);
  
  /**
   * Order of filter.
   */
  public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;

  private final LoadBalancerClientFactory clientFactory;

  private final GatewayLoadBalancerProperties properties;


  public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,
      GatewayLoadBalancerProperties properties) {
    this.clientFactory = clientFactory;
    this.properties = properties;
  }

  @Override
  public int getOrder() {
    return LOAD_BALANCER_CLIENT_FILTER_ORDER;
  }

  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
    if (url == null || (!"packlb".equals(url.getScheme()) && !"packlb".equals(schemePrefix))) {
      return chain.filter(exchange);
    }
    // preserve the original url
    addOriginalRequestUrl(exchange, url);

    URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    String serviceId = requestUri.getHost();
    Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
        .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
            RequestDataContext.class, ResponseData.class, ServiceInstance.class);
    DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
        new RequestDataContext(new GrayRequestData(exchange.getRequest()), getHint(serviceId)));
    LoadBalancerProperties loadBalancerProperties = clientFactory.getProperties(serviceId);
    return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {

      if (!response.hasServer()) {
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
            .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
        throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
      }

      ServiceInstance retrievedInstance = response.getServer();

      URI uri = exchange.getRequest().getURI();

      // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
      // if the loadbalancer doesn't provide one.
      String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
      if (schemePrefix != null) {
        overrideScheme = url.getScheme();
      }

      DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance,
          overrideScheme);

      URI requestUrl = reconstructURI(serviceInstance, uri);

      if (log.isTraceEnabled()) {
        log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
      }
      exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
      exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
      supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
    }).then(chain.filter(exchange))
        .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
            .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                CompletionContext.Status.FAILED, throwable, lbRequest,
                exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
        .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
            .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                CompletionContext.Status.SUCCESS, lbRequest,
                exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR), buildResponseData(exchange,
                    loadBalancerProperties.isUseRawStatusCodeInResponseData())))));
  }

  @SuppressWarnings("deprecation")
  private ResponseData buildResponseData(ServerWebExchange exchange, boolean useRawStatusCodes) {
    if (useRawStatusCodes) {
      return new ResponseData(new GrayRequestData(exchange.getRequest()), exchange.getResponse());
    }
    return new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()));
  }

  protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
    return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
  }

  private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId,
      Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
    ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(serviceId,
        ReactorServiceInstanceLoadBalancer.class);
    if (loadBalancer == null) {
      throw new NotFoundException("No loadbalancer available for " + serviceId);
    }
    supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
    return loadBalancer.choose(lbRequest);
  }

  private String getHint(String serviceId) {
    LoadBalancerProperties loadBalancerProperties = clientFactory.getProperties(serviceId);
    Map<String, String> hints = loadBalancerProperties.getHint();
    String defaultHint = hints.getOrDefault("default", "default");
    String hintPropertyValue = hints.get(serviceId);
    return hintPropertyValue != null ? hintPropertyValue : defaultHint;
  }

}           

配置

@Configuration
public class GrayDefaultConfiguration {

  @Bean
  public GrayRoundRobinLoadBalancer grayRandomLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
    String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
    return new GrayRoundRobinLoadBalancer(
        loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
  }
  
  // 由于没有使用服务注册及发现,这里通过编码的方式定义服务实例
  @Bean
  public ServiceInstanceListSupplier sscServiceInstanceListSupplier() {
    return new ServiceInstanceListSupplier() {
      @Override
      public Flux<List<ServiceInstance>> get() {
        List<ServiceInstance> instances = new ArrayList<>() ;
        Map<String, String> metadata1 = new HashMap<>() ;
        metadata1.put("v", "1") ;
        ServiceInstance s1 = new DefaultServiceInstance("s1", "ssc", "localhost", 8088 , false, metadata1) ;
        instances.add(s1) ;
        
        Map<String, String> metadata2 = new HashMap<>() ;
        metadata2.put("v", "2") ;
        ServiceInstance s2 = new DefaultServiceInstance("s2", "ssc", "localhost", 8099 , false, metadata2) ;
        instances.add(s2) ;
        
        return Flux.just(instances) ;
      }
      @Override
      public String getServiceId() {
        return "ssc" ;
      }
    };
  }
  
}           

负载均衡客户端设置默认的配置

@LoadBalancerClients(defaultConfiguration = GrayDefaultConfiguration.class)
public class SpringCloudGatewayApplication {
}           

以上就是实现灰度发布的核心组件。

Spring Cloud Gateway实现灰度发布实现原理

测试,设置不同的v返回不同服务的结果数据

完毕!!!

求关注

Spring Cloud Gateway应用详解1之谓词

Spring Cloud Gateway应用详解2内置过滤器

Spring Cloud Gateway核心全局过滤器

SpringCloud Gateway 应用Hystrix 限流功能 自定义Filter详解

Spring Cloud Gateway核心过滤器之服务熔断降级

SpringCloud Gateway 路由配置定位原理分析

Spring Cloud Gateway核心过滤器之请求限流详解

Spring Cloud Gateway实现灰度发布实现原理
Spring Cloud Gateway实现灰度发布实现原理