天天看點

springcloud gateway網關內建abtest(灰階釋出、多版本釋出)

     一、前言

在大型分布式電商平台中,網關是重中之重,負責所有服務的分發和流量的控制等核心操作,而灰階釋出又為企業的商業決策提供資料依據。那麼如何通過網關來內建灰階釋出呢?下面分步驟介紹,全程幹貨。

     二、思路:

首先有以下幾種思路:

1、網關層直接加入weight來進行一個權重的設定,随機的将使用者流量打到新舊服務,也就是gateway的權重路由,例如:

spring:
application:
name: testgateway
cloud:
gateway:
routes:
- id: search-service1
  uri: lb://search-service1
  predicates:
  - Path=/search/**
  filters:
  - StripPrefix=1
  - Weight=service1, 95
- id: search-service2
  uri: lb://search-service2
  predicates:
  - Path=/search/**
  filters:
  - StripPrefix=1
  - Weight=service1, 5
           

此方式可以實作簡單的實作一個gateway的權重路由,接下來看下springcloud gateway的權重路由核心源碼:

org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter類的過濾方法如下,感興趣可以自行debug斷點跟蹤代碼,大緻實作原理是通過随機數落點的方式實作過濾操作,核心的filter方法和addweight方法如下所示,例如上述例子,如果随機數在0.95範圍内則落到search-service1,否則落到search-service2

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        Map<String, String> weights = getWeights(exchange);
    //groupWeights裡儲存着執行個體的各種資訊
        Iterator var4 = this.groupWeights.keySet().iterator();

        while(true) {
            while(var4.hasNext()) {
                String group = (String)var4.next();
                WeightCalculatorWebFilter.GroupWeightConfig config = (WeightCalculatorWebFilter.GroupWeightConfig)this.groupWeights.get(group);
                if (config == null) {
                    if (log.isDebugEnabled()) {
                        log.debug("No GroupWeightConfig found for group: " + group);
                    }
                } else {
//取一個随機數
                    double r = this.random.nextDouble();
                    List<Double> ranges = config.ranges;
                    if (log.isTraceEnabled()) {
                        log.trace("Weight for group: " + group + ", ranges: " + ranges + ", r: " + r);
                    }
//判斷随機數如果大于等于ranges的範圍落在相應執行個體,ranges的生成方法在addWeightConfig中
                    for(int i = 0; i < ranges.size() - 1; ++i) {
                        if (r >= (Double)ranges.get(i) && r < (Double)ranges.get(i + 1)) {
                            String routeId = (String)config.rangeIndexes.get(i);
                            weights.put(group, routeId);
                            break;
                        }
                    }
                }
            }

            if (log.isTraceEnabled()) {
                log.trace("Weights attr: " + weights);
            }

            return chain.filter(exchange);
        }
    }
           
void addWeightConfig(WeightConfig weightConfig) {
        String group = weightConfig.getGroup();
        WeightCalculatorWebFilter.GroupWeightConfig config;
        if (this.groupWeights.containsKey(group)) {
            config = new WeightCalculatorWebFilter.GroupWeightConfig((WeightCalculatorWebFilter.GroupWeightConfig)this.groupWeights.get(group));
        } else {
            config = new WeightCalculatorWebFilter.GroupWeightConfig(group);
        }

        config.weights.put(weightConfig.getRouteId(), weightConfig.getWeight());
        int weightsSum = 0;

        Integer weight;
        for(Iterator var5 = config.weights.values().iterator(); var5.hasNext(); weightsSum += weight) {
            weight = (Integer)var5.next();
        }

        AtomicInteger index = new AtomicInteger(0);
        Iterator var12 = config.weights.entrySet().iterator();

        Double range;
        while(var12.hasNext()) {
            Entry<String, Integer> entry = (Entry)var12.next();
            String routeId = (String)entry.getKey();
            Integer weight = (Integer)entry.getValue();
            range = (double)weight / (double)weightsSum;
            config.normalizedWeights.put(routeId, range);
            config.rangeIndexes.put(index.getAndIncrement(), routeId);
        }

        config.ranges.clear();
        config.ranges.add(0.0D);
        List<Double> values = new ArrayList(config.normalizedWeights.values());

        for(int i = 0; i < values.size(); ++i) {
            Double currentWeight = (Double)values.get(i);
            Double previousRange = (Double)config.ranges.get(i);
            range = previousRange + currentWeight;
            config.ranges.add(range);
        }

        if (log.isTraceEnabled()) {
            log.trace("Recalculated group weight config " + config);
        }

        this.groupWeights.put(group, config);
    }

    Map<String, WeightCalculatorWebFilter.GroupWeightConfig> getGroupWeights() {
        return this.groupWeights;
    }
           

那麼這種方式無法保證使用者黏性,通常在企業開發中也不會使用如此簡單的方式來實作一個網關的灰階釋出,我們可以通過重寫網關的全局過濾器和負載均衡以及加入新的斷言的方式來進行一個比較完整的灰階釋出:

1、新增一個斷言工廠:

/**
 * 自定義斷言工廠類
 * */
@Component
@Slf4j
public class HeaderGroupRoutePredicateFactory extends AbstractRoutePredicateFactory<HeaderGroupRoutePredicateFactory.Config> {

    public static final String GROUP = "Group";

    public HeaderGroupRoutePredicateFactory() {
        super(Config.class);
    }

    @Override
    public ShortcutType shortcutType() {
        return ShortcutType.GATHER_LIST;
    }

    @Override
    public List<String> shortcutFieldOrder() {
        return Collections.singletonList("group");
    }

    /**
     * 重寫predicate(定時重新整理gateway内的配置資訊)
     * */
    @Override
    public Predicate<ServerWebExchange> apply(Config config) {
        List<String> groups = config.getGroup();
        return new GatewayPredicate() {
            @Override
            public boolean test(ServerWebExchange serverWebExchange) {
                //判斷請求頭内的Group資訊是否存在于配置中

                String group = serverWebExchange.getRequest().getHeaders().getFirst(GROUP);
                if (!StringUtils.isEmpty(group)) {
                    return groups.contains(group);
                }
                return false;
            }

            @Override
            public String toString() {
                return String.format("Header: Group=%s", config.getGroup());
            }
        };
    }

    @NoArgsConstructor
    @Getter
    @Setter
    @ToString
    public static class Config {
        private List<String> group;
    }
}
           

2.重寫負載均衡:

/**
 * 自定義負載均衡
 * 隻有filter攔截到灰階請求之後才會走
 * */
@Slf4j
public class CustomizeLoadBalance implements ReactorServiceInstanceLoadBalancer{

    private static AtomicInteger position = new AtomicInteger(new Random().nextInt(1000));
    private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
    private String serviceId;

    public CustomizeLoadBalance(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
        this.serviceId = serviceId;
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
    }



    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        HttpHeaders headers = (HttpHeaders) request.getContext();
        if (this.serviceInstanceListSupplierProvider != null) {
            ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
            return ((Flux)supplier.get()).next().map(list->getInstanceResponse((List<ServiceInstance>)list,headers));
        }
        return null;
    }



    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, HttpHeaders headers) {
        //對服務執行個體進行一個過濾操作
        List<ServiceInstance> serviceInstances = instances.stream()
                .filter(instance -> {
                    //擷取請求頭version,如果帶有version則參與灰階的執行個體選舉,沒有剔除
                    String version = headers.getFirst("Version");
//instance.getMetadata().get("Version")即可取得nacos的中繼資料,然後在進行判斷是否加入選舉清單即可。
                    log.info("版本号:{}",version);
                    if (version != null) {
                        return true;
                    } else {
                        return false;
                    }
                }).collect(Collectors.toList());
        if (serviceInstances.isEmpty()) {
                log.warn("無可用服務: " + serviceId);
            return new EmptyResponse();
        }
        //硬hash路由(輪詢),目前沒有實質性作用
        int pos = Math.abs(position.incrementAndGet());
        ServiceInstance instance = serviceInstances.get(pos % serviceInstances.size());
        return new DefaultResponse(instance);
    }

}
           

 由于灰階釋出一般隻有一個新版本與舊版本,是以用戶端的負載均衡在此處其實就無意義了,除非在單機器的選舉上,負載均衡的作用才會展現出來,到時可以考慮換成帶虛拟節點的一緻性hash算法等分布式路由算法來替換硬hash。

此處如果想要通過nacos的中繼資料來區分,隻需要使用

instance.getMetadata().get("Version")即可取得nacos的中繼資料,然後在進行判斷是否加入選舉清單即可。但nacos作為開源項目貌似并不穩定,或許有bug,不推薦      

3.重寫過濾器:

@Component
@Slf4j
public class AuthGlobalFilter implements GlobalFilter, Ordered {

    private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;

    private final LoadBalancerClientFactory clientFactory;
    private LoadBalancerProperties properties;

    public AuthGlobalFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
        this.clientFactory = clientFactory;
        this.properties = properties;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = url.getScheme();
        //通過比對配置中的url字首是否含有grayLb執行以下過濾器,如果字首中不含grayLb則走預設的負載均衡
        if (url != null && "grayLb".equals(schemePrefix)) {
            ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
                log.info("重寫前url: " + url);
            return this.choose(exchange).doOnNext((response) -> {
                if (!response.hasServer()) {
                    throw NotFoundException.create(this.properties.isUse404(), "無法找到執行個體 " + url.getHost());
                } else {
                    URI uri = exchange.getRequest().getURI();
                    String overrideScheme = null;
//重寫url,去除grayLb頭
                    DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance((ServiceInstance)response.getServer(), overrideScheme);
                    URI requestUrl = this.reconstructURI(serviceInstance, uri);

                        log.info("重寫到url: " + requestUrl);

                    exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
                }
            }).then(chain.filter(exchange));
        } else {
            return chain.filter(exchange);
        }
    }

    private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
        URI uri = (URI) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        CustomizeLoadBalance loadBalancer = new CustomizeLoadBalance(clientFactory.getLazyProvider(uri.getHost(), ServiceInstanceListSupplier.class), uri.getHost());
        return loadBalancer.choose(createRequest(exchange));
    }

    private Request createRequest(ServerWebExchange exchange) {
        HttpHeaders headers = exchange.getRequest().getHeaders();
        return new DefaultRequest<>(headers);
    }


    protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
        return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
    }

    @Override
    public int getOrder() {
        return LOAD_BALANCER_CLIENT_FILTER_ORDER;
    }

}
           

4.configuration類中注冊springbean:

@Bean
    @ConditionalOnMissingBean({AuthGlobalFilter.class})
    public AuthGlobalFilter grayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
        return new AuthGlobalFilter(clientFactory,properties);
    }
           

最後在nacos配置中心裡配置動态路由:

- id: search1
  uri: grayLb://search-service1
  predicates:
  - Path=/search/**
  - HeaderGroup=A
  filters:
  - StripPrefix=1
  - AddRequestHeader=Version,v1
  - name: RequestRateLimiter
    args:
      keyResolver: '#{@ipKeyResolver}' #從spring容器中找并指派 '#{@beanName}'
      redis-rate-limiter.replenishRate: 1000
      redis-rate-limiter.burstCapacity: 3000
- id: search2
  uri: grayLb://search-service2
  predicates:
  - Path=/search/**
  - HeaderGroup=B
  filters:
  - StripPrefix=1
  - AddRequestHeader=Version,v2
  - name: RequestRateLimiter
    args:
      keyResolver: '#{@ipKeyResolver}' #從spring容器中找并指派 '#{@beanName}'
      redis-rate-limiter.replenishRate: 1000
      redis-rate-limiter.burstCapacity: 3000
           

這樣業務方在進行網關調用具體服務前可以先從abtest拿到使用者相應綁定的分組,然後在請求時攜帶一個請求頭名為Group:A或者Group:B即可打到不同的服務中,

此方式僅限于abtest中提供了使用者的分組政策以及使用者粘性的保證,保證每次使用者拿到的自己的分組與分組的比例是正确的,因為網關并沒有做權重路由來進行流量的

分組和控制,完全依賴于abtest來取得使用者所屬的分組。