天天看點

Spring Cloud Gateway實作灰階釋出實作原理

作者:Spring全家桶實戰案例

環境:SpringBoot2.6.14 + Spring Cloud Gateway3.1.4

什麼是灰階釋出

灰階釋出(又名金絲雀釋出)是指在黑與白之間,能夠平滑過渡的一種釋出方式。在其上可以進行A/B testing,即讓一部分使用者繼續用産品特性A,一部分使用者開始用産品特性B,如果使用者對B沒有什麼反對意見,那麼逐漸擴大範圍,把所有使用者都遷移到B上面來。灰階釋出可以保證整體系統的穩定,在初始灰階的時候就可以發現、調整問題,以保證其影響度。

灰階釋出類型

  • 金絲雀釋出

将少量的請求引流到新版本上,是以部署新版本服務隻需極小數的機器。驗證新版本符合預期後,逐漸調整流量權重比例,使得流量慢慢從老版本遷移至新版本,期間可以根據設定的流量比例,對新版本服務進行擴容,同時對老版本服務進行縮容,使得底層資源得到最大化利用。

Spring Cloud Gateway實作灰階釋出實作原理

金絲雀釋出的優點:

  1. 按比例将流量無差别地導向新版本,新版本故障影響範圍小;
  2. 釋出期間逐漸對新版本擴容,同時對老版本縮容,資源使用率高。

金絲雀釋出的缺點:

  1. 流量無差别地導向新版本,可能會影響重要使用者的體驗;
  2. 釋出周期長。
  • A/B測試

A/B 測試基于使用者請求的元資訊将流量路由到新版本,這是一種基于請求内容比對的灰階釋出政策。隻有比對特定規則的請求才會被引流到新版本,常見的做法包括基于 Header 和 Cookie。基于 Header 方式例子,例如 User-Agent 的值為 Android 的請求 (來自安卓系統的請求)可以通路新版本,其他系統仍然通路舊版本。基于 Cookie 方式的例子,Cookie 中通常包含具有業務語義的使用者資訊,例如普通使用者可以通路新版本,VIP 使用者仍然通路舊版本。

Spring Cloud Gateway實作灰階釋出實作原理
  • 藍綠釋出

藍綠釋出需要對服務的新版本進行備援部署,一般新版本的機器規格和數量與舊版本保持一緻,相當于該服務有兩套完全相同的部署環境,隻不過此時隻有舊版本在對外提供服務,新版本作為熱備。當服務進行版本更新時,我們隻需将流量全部切換到新版本即可,舊版本作為熱備。由于備援部署的緣故,是以不必擔心新版本的資源不夠。如果新版本上線後出現嚴重的程式 BUG,那麼我們隻需将流量全部切回至舊版本,大大縮短故障恢複的時間。

Gateway實作灰階釋出

本篇将文章将通過A/B測試方式實作灰階釋出。接下來将展示在Spring Cloud Gateway中實作A/B測試核心元件。

  • 引入依賴
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-gateway</artifactId>
  <version>3.1.4</version>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-loadbalancer</artifactId>
  <version>3.1.4</version>
</dependency>           
  • 自定義負載均衡器

自定義負載均衡器作用是根據請求的header中的v進行服務執行個體的篩選。

public class GrayRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {

  private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class);

  final AtomicInteger position;

  final String serviceId;

  ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

  public GrayRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
      String serviceId) {
    this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
  }

  public GrayRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
      String serviceId, int seedPosition) {
    this.serviceId = serviceId;
    this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
    this.position = new AtomicInteger(seedPosition);
  }

  @SuppressWarnings("rawtypes")
  @Override
  public Mono<Response<ServiceInstance>> choose(Request request) {
    ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
        .getIfAvailable(NoopServiceInstanceListSupplier::new);
    return supplier.get(request).next()
        .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, request));
  }

  @SuppressWarnings("rawtypes")
  private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
      List<ServiceInstance> serviceInstances, Request request) {
    Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances, request);
    if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
      ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
    }
    return serviceInstanceResponse;
  }

  @SuppressWarnings("rawtypes")
  private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, Request request) {
    if (instances.isEmpty()) {
      if (log.isWarnEnabled()) {
        log.warn("No servers available for service: " + serviceId);
      }
      return new EmptyResponse();
    }
    List<ServiceInstance> result = instances.stream().filter(instance -> {
      Map<String, String> metadata = instance.getMetadata();
      Object orgId = metadata.get("v");
      RequestDataContext context = (RequestDataContext) request.getContext() ;
      RequestData requestData = context.getClientRequest() ;
      String v = null ;
      if (requestData instanceof GrayRequestData) {
        GrayRequestData grayRequestData = (GrayRequestData) requestData ;
        queryV = grayRequestData.getQueryParams().getFirst("v") ;
      }
      String value = requestData.getHeaders().getFirst("v") ;
      return v != null && (v.equals(value) || v.equals(queryV)) ;
    }).collect(Collectors.toList());
    if (result.isEmpty()) {
      result = instances;
    }
    int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;

    ServiceInstance instance = result.get(pos % result.size());

    return new DefaultResponse(instance);
  }

}           

以上負載均衡器将從header或者請求參數中擷取v參數,然後根據v參數的值從服務執行個體清單中擷取metadata資訊進行比對。

全局過濾器

該過濾器的作用是通過上面的負載均衡器從其中選擇一個服務執行個體進行服務的調用

@SuppressWarnings({ "rawtypes", "unchecked" })
@Component
public class GrayReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {

  private static final Log log = LogFactory.getLog(GrayReactiveLoadBalancerClientFilter.class);
  
  /**
   * Order of filter.
   */
  public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;

  private final LoadBalancerClientFactory clientFactory;

  private final GatewayLoadBalancerProperties properties;


  public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,
      GatewayLoadBalancerProperties properties) {
    this.clientFactory = clientFactory;
    this.properties = properties;
  }

  @Override
  public int getOrder() {
    return LOAD_BALANCER_CLIENT_FILTER_ORDER;
  }

  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
    if (url == null || (!"packlb".equals(url.getScheme()) && !"packlb".equals(schemePrefix))) {
      return chain.filter(exchange);
    }
    // preserve the original url
    addOriginalRequestUrl(exchange, url);

    URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    String serviceId = requestUri.getHost();
    Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
        .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
            RequestDataContext.class, ResponseData.class, ServiceInstance.class);
    DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
        new RequestDataContext(new GrayRequestData(exchange.getRequest()), getHint(serviceId)));
    LoadBalancerProperties loadBalancerProperties = clientFactory.getProperties(serviceId);
    return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {

      if (!response.hasServer()) {
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
            .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
        throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
      }

      ServiceInstance retrievedInstance = response.getServer();

      URI uri = exchange.getRequest().getURI();

      // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
      // if the loadbalancer doesn't provide one.
      String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
      if (schemePrefix != null) {
        overrideScheme = url.getScheme();
      }

      DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance,
          overrideScheme);

      URI requestUrl = reconstructURI(serviceInstance, uri);

      if (log.isTraceEnabled()) {
        log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
      }
      exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
      exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
      supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
    }).then(chain.filter(exchange))
        .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
            .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                CompletionContext.Status.FAILED, throwable, lbRequest,
                exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
        .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
            .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                CompletionContext.Status.SUCCESS, lbRequest,
                exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR), buildResponseData(exchange,
                    loadBalancerProperties.isUseRawStatusCodeInResponseData())))));
  }

  @SuppressWarnings("deprecation")
  private ResponseData buildResponseData(ServerWebExchange exchange, boolean useRawStatusCodes) {
    if (useRawStatusCodes) {
      return new ResponseData(new GrayRequestData(exchange.getRequest()), exchange.getResponse());
    }
    return new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()));
  }

  protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
    return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
  }

  private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId,
      Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
    ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(serviceId,
        ReactorServiceInstanceLoadBalancer.class);
    if (loadBalancer == null) {
      throw new NotFoundException("No loadbalancer available for " + serviceId);
    }
    supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
    return loadBalancer.choose(lbRequest);
  }

  private String getHint(String serviceId) {
    LoadBalancerProperties loadBalancerProperties = clientFactory.getProperties(serviceId);
    Map<String, String> hints = loadBalancerProperties.getHint();
    String defaultHint = hints.getOrDefault("default", "default");
    String hintPropertyValue = hints.get(serviceId);
    return hintPropertyValue != null ? hintPropertyValue : defaultHint;
  }

}           

配置

@Configuration
public class GrayDefaultConfiguration {

  @Bean
  public GrayRoundRobinLoadBalancer grayRandomLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
    String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
    return new GrayRoundRobinLoadBalancer(
        loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
  }
  
  // 由于沒有使用服務注冊及發現,這裡通過編碼的方式定義服務執行個體
  @Bean
  public ServiceInstanceListSupplier sscServiceInstanceListSupplier() {
    return new ServiceInstanceListSupplier() {
      @Override
      public Flux<List<ServiceInstance>> get() {
        List<ServiceInstance> instances = new ArrayList<>() ;
        Map<String, String> metadata1 = new HashMap<>() ;
        metadata1.put("v", "1") ;
        ServiceInstance s1 = new DefaultServiceInstance("s1", "ssc", "localhost", 8088 , false, metadata1) ;
        instances.add(s1) ;
        
        Map<String, String> metadata2 = new HashMap<>() ;
        metadata2.put("v", "2") ;
        ServiceInstance s2 = new DefaultServiceInstance("s2", "ssc", "localhost", 8099 , false, metadata2) ;
        instances.add(s2) ;
        
        return Flux.just(instances) ;
      }
      @Override
      public String getServiceId() {
        return "ssc" ;
      }
    };
  }
  
}           

負載均衡用戶端設定預設的配置

@LoadBalancerClients(defaultConfiguration = GrayDefaultConfiguration.class)
public class SpringCloudGatewayApplication {
}           

以上就是實作灰階釋出的核心元件。

Spring Cloud Gateway實作灰階釋出實作原理

測試,設定不同的v傳回不同服務的結果資料

完畢!!!

求關注

Spring Cloud Gateway應用詳解1之謂詞

Spring Cloud Gateway應用詳解2内置過濾器

Spring Cloud Gateway核心全局過濾器

SpringCloud Gateway 應用Hystrix 限流功能 自定義Filter詳解

Spring Cloud Gateway核心過濾器之服務熔斷降級

SpringCloud Gateway 路由配置定位原理分析

Spring Cloud Gateway核心過濾器之請求限流詳解

Spring Cloud Gateway實作灰階釋出實作原理
Spring Cloud Gateway實作灰階釋出實作原理