天天看点

springcloud gateway网关集成abtest(灰度发布、多版本发布)

     一、前言

在大型分布式电商平台中,网关是重中之重,负责所有服务的分发和流量的控制等核心操作,而灰度发布又为企业的商业决策提供数据依据。那么如何通过网关来集成灰度发布呢?下面分步骤介绍,全程干货。

     二、思路:

首先有以下几种思路:

1、网关层直接加入weight来进行一个权重的设置,随机的将用户流量打到新旧服务,也就是gateway的权重路由,例如:

spring:
application:
name: testgateway
cloud:
gateway:
routes:
- id: search-service1
  uri: lb://search-service1
  predicates:
  - Path=/search/**
  filters:
  - StripPrefix=1
  - Weight=service1, 95
- id: search-service2
  uri: lb://search-service2
  predicates:
  - Path=/search/**
  filters:
  - StripPrefix=1
  - Weight=service1, 5
           

此方式可以实现简单的实现一个gateway的权重路由,接下来看下springcloud gateway的权重路由核心源码:

org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter类的过滤方法如下,感兴趣可以自行debug断点跟踪代码,大致实现原理是通过随机数落点的方式实现过滤操作,核心的filter方法和addweight方法如下所示,例如上述例子,如果随机数在0.95范围内则落到search-service1,否则落到search-service2

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        Map<String, String> weights = getWeights(exchange);
    //groupWeights里保存着实例的各种信息
        Iterator var4 = this.groupWeights.keySet().iterator();

        while(true) {
            while(var4.hasNext()) {
                String group = (String)var4.next();
                WeightCalculatorWebFilter.GroupWeightConfig config = (WeightCalculatorWebFilter.GroupWeightConfig)this.groupWeights.get(group);
                if (config == null) {
                    if (log.isDebugEnabled()) {
                        log.debug("No GroupWeightConfig found for group: " + group);
                    }
                } else {
//取一个随机数
                    double r = this.random.nextDouble();
                    List<Double> ranges = config.ranges;
                    if (log.isTraceEnabled()) {
                        log.trace("Weight for group: " + group + ", ranges: " + ranges + ", r: " + r);
                    }
//判断随机数如果大于等于ranges的范围落在相应实例,ranges的生成方法在addWeightConfig中
                    for(int i = 0; i < ranges.size() - 1; ++i) {
                        if (r >= (Double)ranges.get(i) && r < (Double)ranges.get(i + 1)) {
                            String routeId = (String)config.rangeIndexes.get(i);
                            weights.put(group, routeId);
                            break;
                        }
                    }
                }
            }

            if (log.isTraceEnabled()) {
                log.trace("Weights attr: " + weights);
            }

            return chain.filter(exchange);
        }
    }
           
void addWeightConfig(WeightConfig weightConfig) {
        String group = weightConfig.getGroup();
        WeightCalculatorWebFilter.GroupWeightConfig config;
        if (this.groupWeights.containsKey(group)) {
            config = new WeightCalculatorWebFilter.GroupWeightConfig((WeightCalculatorWebFilter.GroupWeightConfig)this.groupWeights.get(group));
        } else {
            config = new WeightCalculatorWebFilter.GroupWeightConfig(group);
        }

        config.weights.put(weightConfig.getRouteId(), weightConfig.getWeight());
        int weightsSum = 0;

        Integer weight;
        for(Iterator var5 = config.weights.values().iterator(); var5.hasNext(); weightsSum += weight) {
            weight = (Integer)var5.next();
        }

        AtomicInteger index = new AtomicInteger(0);
        Iterator var12 = config.weights.entrySet().iterator();

        Double range;
        while(var12.hasNext()) {
            Entry<String, Integer> entry = (Entry)var12.next();
            String routeId = (String)entry.getKey();
            Integer weight = (Integer)entry.getValue();
            range = (double)weight / (double)weightsSum;
            config.normalizedWeights.put(routeId, range);
            config.rangeIndexes.put(index.getAndIncrement(), routeId);
        }

        config.ranges.clear();
        config.ranges.add(0.0D);
        List<Double> values = new ArrayList(config.normalizedWeights.values());

        for(int i = 0; i < values.size(); ++i) {
            Double currentWeight = (Double)values.get(i);
            Double previousRange = (Double)config.ranges.get(i);
            range = previousRange + currentWeight;
            config.ranges.add(range);
        }

        if (log.isTraceEnabled()) {
            log.trace("Recalculated group weight config " + config);
        }

        this.groupWeights.put(group, config);
    }

    Map<String, WeightCalculatorWebFilter.GroupWeightConfig> getGroupWeights() {
        return this.groupWeights;
    }
           

那么这种方式无法保证用户黏性,通常在企业开发中也不会使用如此简单的方式来实现一个网关的灰度发布,我们可以通过重写网关的全局过滤器和负载均衡以及加入新的断言的方式来进行一个比较完整的灰度发布:

1、新增一个断言工厂:

/**
 * 自定义断言工厂类
 * */
@Component
@Slf4j
public class HeaderGroupRoutePredicateFactory extends AbstractRoutePredicateFactory<HeaderGroupRoutePredicateFactory.Config> {

    public static final String GROUP = "Group";

    public HeaderGroupRoutePredicateFactory() {
        super(Config.class);
    }

    @Override
    public ShortcutType shortcutType() {
        return ShortcutType.GATHER_LIST;
    }

    @Override
    public List<String> shortcutFieldOrder() {
        return Collections.singletonList("group");
    }

    /**
     * 重写predicate(定时刷新gateway内的配置信息)
     * */
    @Override
    public Predicate<ServerWebExchange> apply(Config config) {
        List<String> groups = config.getGroup();
        return new GatewayPredicate() {
            @Override
            public boolean test(ServerWebExchange serverWebExchange) {
                //判断请求头内的Group信息是否存在于配置中

                String group = serverWebExchange.getRequest().getHeaders().getFirst(GROUP);
                if (!StringUtils.isEmpty(group)) {
                    return groups.contains(group);
                }
                return false;
            }

            @Override
            public String toString() {
                return String.format("Header: Group=%s", config.getGroup());
            }
        };
    }

    @NoArgsConstructor
    @Getter
    @Setter
    @ToString
    public static class Config {
        private List<String> group;
    }
}
           

2.重写负载均衡:

/**
 * 自定义负载均衡
 * 只有filter拦截到灰度请求之后才会走
 * */
@Slf4j
public class CustomizeLoadBalance implements ReactorServiceInstanceLoadBalancer{

    private static AtomicInteger position = new AtomicInteger(new Random().nextInt(1000));
    private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
    private String serviceId;

    public CustomizeLoadBalance(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
        this.serviceId = serviceId;
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
    }



    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        HttpHeaders headers = (HttpHeaders) request.getContext();
        if (this.serviceInstanceListSupplierProvider != null) {
            ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
            return ((Flux)supplier.get()).next().map(list->getInstanceResponse((List<ServiceInstance>)list,headers));
        }
        return null;
    }



    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, HttpHeaders headers) {
        //对服务实例进行一个过滤操作
        List<ServiceInstance> serviceInstances = instances.stream()
                .filter(instance -> {
                    //获取请求头version,如果带有version则参与灰度的实例选举,没有剔除
                    String version = headers.getFirst("Version");
//instance.getMetadata().get("Version")即可取得nacos的元数据,然后在进行判断是否加入选举列表即可。
                    log.info("版本号:{}",version);
                    if (version != null) {
                        return true;
                    } else {
                        return false;
                    }
                }).collect(Collectors.toList());
        if (serviceInstances.isEmpty()) {
                log.warn("无可用服务: " + serviceId);
            return new EmptyResponse();
        }
        //硬hash路由(轮询),目前没有实质性作用
        int pos = Math.abs(position.incrementAndGet());
        ServiceInstance instance = serviceInstances.get(pos % serviceInstances.size());
        return new DefaultResponse(instance);
    }

}
           

 由于灰度发布一般只有一个新版本与旧版本,因此客户端的负载均衡在此处其实就无意义了,除非在单机器的选举上,负载均衡的作用才会体现出来,到时可以考虑换成带虚拟节点的一致性hash算法等分布式路由算法来替换硬hash。

此处如果想要通过nacos的元数据来区分,只需要使用

instance.getMetadata().get("Version")即可取得nacos的元数据,然后在进行判断是否加入选举列表即可。但nacos作为开源项目貌似并不稳定,或许有bug,不推荐      

3.重写过滤器:

@Component
@Slf4j
public class AuthGlobalFilter implements GlobalFilter, Ordered {

    private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;

    private final LoadBalancerClientFactory clientFactory;
    private LoadBalancerProperties properties;

    public AuthGlobalFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
        this.clientFactory = clientFactory;
        this.properties = properties;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = url.getScheme();
        //通过匹配配置中的url前缀是否含有grayLb执行以下过滤器,如果前缀中不含grayLb则走默认的负载均衡
        if (url != null && "grayLb".equals(schemePrefix)) {
            ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
                log.info("重写前url: " + url);
            return this.choose(exchange).doOnNext((response) -> {
                if (!response.hasServer()) {
                    throw NotFoundException.create(this.properties.isUse404(), "无法找到实例 " + url.getHost());
                } else {
                    URI uri = exchange.getRequest().getURI();
                    String overrideScheme = null;
//重写url,去除grayLb头
                    DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance((ServiceInstance)response.getServer(), overrideScheme);
                    URI requestUrl = this.reconstructURI(serviceInstance, uri);

                        log.info("重写到url: " + requestUrl);

                    exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
                }
            }).then(chain.filter(exchange));
        } else {
            return chain.filter(exchange);
        }
    }

    private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
        URI uri = (URI) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        CustomizeLoadBalance loadBalancer = new CustomizeLoadBalance(clientFactory.getLazyProvider(uri.getHost(), ServiceInstanceListSupplier.class), uri.getHost());
        return loadBalancer.choose(createRequest(exchange));
    }

    private Request createRequest(ServerWebExchange exchange) {
        HttpHeaders headers = exchange.getRequest().getHeaders();
        return new DefaultRequest<>(headers);
    }


    protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
        return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
    }

    @Override
    public int getOrder() {
        return LOAD_BALANCER_CLIENT_FILTER_ORDER;
    }

}
           

4.configuration类中注册springbean:

@Bean
    @ConditionalOnMissingBean({AuthGlobalFilter.class})
    public AuthGlobalFilter grayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
        return new AuthGlobalFilter(clientFactory,properties);
    }
           

最后在nacos配置中心里配置动态路由:

- id: search1
  uri: grayLb://search-service1
  predicates:
  - Path=/search/**
  - HeaderGroup=A
  filters:
  - StripPrefix=1
  - AddRequestHeader=Version,v1
  - name: RequestRateLimiter
    args:
      keyResolver: '#{@ipKeyResolver}' #从spring容器中找并赋值 '#{@beanName}'
      redis-rate-limiter.replenishRate: 1000
      redis-rate-limiter.burstCapacity: 3000
- id: search2
  uri: grayLb://search-service2
  predicates:
  - Path=/search/**
  - HeaderGroup=B
  filters:
  - StripPrefix=1
  - AddRequestHeader=Version,v2
  - name: RequestRateLimiter
    args:
      keyResolver: '#{@ipKeyResolver}' #从spring容器中找并赋值 '#{@beanName}'
      redis-rate-limiter.replenishRate: 1000
      redis-rate-limiter.burstCapacity: 3000
           

这样业务方在进行网关调用具体服务前可以先从abtest拿到用户相应绑定的分组,然后在请求时携带一个请求头名为Group:A或者Group:B即可打到不同的服务中,

此方式仅限于abtest中提供了用户的分组策略以及用户粘性的保证,保证每次用户拿到的自己的分组与分组的比例是正确的,因为网关并没有做权重路由来进行流量的

分组和控制,完全依赖于abtest来取得用户所属的分组。