天天看點

SpringCloud更新之路2020.0.x版-32. 改進負載均衡算法

SpringCloud更新之路2020.0.x版-32. 改進負載均衡算法
本系列代碼位址:https://github.com/JoJoTec/spring-cloud-parent

在前面一節,我們梳理了實作 Feign 斷路器以及線程隔離的思路,這一節,我們先不看如何源碼實作(因為源碼中會包含負載均衡算法的改進部分),先來讨論下如何優化目前的負載均衡算法。

之前的負載均衡算法

  1. 擷取服務執行個體清單,将執行個體清單按照 ip 端口排序,如果不排序即使 position 是下一個可能也代表的是之前已經調用過的執行個體
  2. 根據請求中的 traceId,從本地緩存中以 traceId 為 key 擷取一個初始值為随機數的原子變量 position,這樣防止所有請求都從第一個執行個體開始調用,之後第二個、第三個這樣。
  3. position 原子加一,之後對執行個體個數取餘,傳回對應下标的執行個體進行調用

其中請求包含 traceId 是來自于我們使用了 spring-cloud-sleuth 鍊路追蹤,基于這種機制我們能保證請求不會重試到之前已經調用過的執行個體。源碼是:

//一定必須是實作ReactorServiceInstanceLoadBalancer
//而不是ReactorLoadBalancer<ServiceInstance>
//因為注冊的時候是ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final ServiceInstanceListSupplier serviceInstanceListSupplier;
    //每次請求算上重試不會超過1分鐘
    //對于超過1分鐘的,這種請求肯定比較重,不應該重試
    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
            //随機初始值,防止每次都是從第一個開始調用
            .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
    private final String serviceId;
    private final Tracer tracer;


    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
        this.serviceInstanceListSupplier = serviceInstanceListSupplier;
        this.serviceId = serviceId;
        this.tracer = tracer;
    }
    
    //每次重試,其實都會調用這個 choose 方法重新擷取一個執行個體
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        return getInstanceResponseByRoundRobin(serviceInstances);
    }

    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        //為了解決原始算法不同調用并發可能導緻一個請求重試相同的執行個體
        //從 sleuth 的 Tracer 中擷取目前請求的上下文
        Span currentSpan = tracer.currentSpan();
        //如果上下文不存在,則可能不是前端使用者請求,而是其他某些機制觸發,我們就建立一個新的上下文
        if (currentSpan == null) {
            currentSpan = tracer.newTrace();
        }
        //從請求上下文中擷取請求的 traceId,用來唯一辨別一個請求
        long l = currentSpan.context().traceId();
        AtomicInteger seed = positionCache.get(l);
        int s = seed.getAndIncrement();
        int pos = s % serviceInstances.size();
        log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
        return new DefaultResponse(serviceInstances.stream()
                //執行個體傳回清單順序可能不同,為了保持一緻,先排序再取
                .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
                .collect(Collectors.toList()).get(pos));
    }
}
           

但是在這次請求突增很多的時候,這種負載均衡算法還是給我們帶來了問題。

首先,本次突增,我們并沒有采取擴容,導緻本次的性能壓力對于壓力的均衡分布非常敏感。舉個例子是,假設微服務 A 有 9 個執行個體,在業務高峰點來的時候,最理想的情況是保證無論何時這 9 個負載壓力都完全均衡,但是由于我們使用了初始值為随機數的原子變量 position,雖然從一天的總量上來看,負責均衡壓力肯定是均衡,但是在某一小段時間内,很可能壓力全都跑到了某幾個執行個體上,導緻這幾個執行個體被壓垮,熔斷,然後又都跑到了另外的幾個執行個體上,又被壓垮,熔斷,如此惡性循環。

然後,我們部署采用的是 k8s 部署,同一個虛拟機上面可能會跑很多微服務的 pod。在某些情況下,同一個微服務的多個 pod 可能會跑到同一個虛拟機 Node 上,這個可以從pod 的 ip 網段上看出來:例如某個微服務有如下 7 個執行個體:10.238.13.12:8181,10.238.13.24:8181,10.238.15.12:8181,10.238.17.12:8181,10.238.20.220:8181,10.238.21.31:8181,10.238.21.121:8181,那麼 10.238.13.12:8181 與 10.238.13.24:8181 很可能在同一個 Node 上,10.238.21.31:8181 和 10.238.21.121:8181 很可能在同一個 Node 上。我們重試,需要優先重試與之前重試過的執行個體盡量不在同一個 Node 上的執行個體,因為同一個 Node 上的執行個體隻要有一個有問題或者壓力過大,其他的基本上也有問題或者壓力過大。

最後,如果調用某個執行個體一直失敗,那麼這個執行個體的調用優先級需要排在其他正常的執行個體後面。這個對于減少快速重新整理釋出(一下子啟動很多執行個體之後停掉多個老執行個體,執行個體個數大于重試次數配置)對于使用者的影響,以及某個可用區突然發生異常導緻多個執行個體下線對使用者的影響,以及業務壓力已經過去,壓力變小後,需要關掉不再需要的執行個體,導緻大量執行個體發生遷移的時候對使用者的影響,有很大的作用。

針對以上問題的優化方案

我們針對上面三個問題,提出了一種優化後的解決方案:

  1. 針對每次請求,記錄:
  2. 本次請求已經調用過哪些執行個體 -> 請求調用過的執行個體緩存
  3. 調用的執行個體,目前有多少請求在進行中 -> 執行個體運作請求數
  4. 調用的執行個體,最近請求錯誤率 -> 執行個體請求錯誤率
  5. 随機将執行個體清單打亂,防止在以上三個名額都相同時,總是将請求發給同一個執行個體。
  6. 按照 目前請求沒有調用過靠前 -> 錯誤率越小越靠前 的順序排序 -> 執行個體運作請求數越小越靠前
  7. 取排好序之後的清單第一個執行個體作為本次負載均衡的執行個體

具體實作是:以下的代碼來自于:https://github.com/JoJoTec/spring-cloud-parent

我們使用了依賴:

<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
</dependency>
           

記錄執行個體資料的緩存類:

@Log4j2
public class ServiceInstanceMetrics {
	private static final String CALLING = "-Calling";
	private static final String FAILED = "-Failed";

	private MetricRegistry metricRegistry;

	ServiceInstanceMetrics() {
	}

	public ServiceInstanceMetrics(MetricRegistry metricRegistry) {
		this.metricRegistry = metricRegistry;
	}

	/**
	 * 記錄調用執行個體
	 * @param serviceInstance
	 */
	public void recordServiceInstanceCall(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		metricRegistry.counter(key + CALLING).inc();
	}
	/**
	 * 記錄調用執行個體結束
	 * @param serviceInstance
	 * @param isSuccess 是否成功
	 */
	public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		metricRegistry.counter(key + CALLING).dec();
		if (!isSuccess) {
			//不成功則記錄失敗
			metricRegistry.meter(key + FAILED).mark();
		}
	}

	/**
	 * 擷取正在運作的調用次數
	 * @param serviceInstance
	 * @return
	 */
	public long getCalling(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		long count = metricRegistry.counter(key + CALLING).getCount();
		log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count);
		return count;
	}

	/**
	 * 擷取最近一分鐘調用失敗次數分鐘速率,其實是滑動平均數
	 * @param serviceInstance
	 * @return
	 */
	public double getFailedInRecentOneMin(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate();
		log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate);
		return rate;
	}
}
           

負載均衡核心代碼:

private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder()
        .expireAfterAccess(3, TimeUnit.MINUTES)
        .build(k -> Sets.newConcurrentHashSet());
private final String serviceId;
private final Tracer tracer;
private final ServiceInstanceMetrics serviceInstanceMetrics;

//每次重試,其實都會調用這個 choose 方法重新擷取一個執行個體
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
    Span span = tracer.currentSpan();
    return serviceInstanceListSupplier.get().next()
            .map(serviceInstances -> {
                //保持 span 和調用 choose 的 span 一樣
                try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                    return getInstanceResponse(serviceInstances);
                }
            });
}


private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    //讀取 spring-cloud-sleuth 的對于目前請求的鍊路追蹤上下文,擷取對應的 traceId
    Span currentSpan = tracer.currentSpan();
    if (currentSpan == null) {
        currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    return getInstanceResponseByRoundRobin(l, serviceInstances);
}

@VisibleForTesting
public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {
    //首先随機打亂清單中執行個體的順序
    Collections.shuffle(serviceInstances);
    //需要先将所有參數緩存起來,否則 comparator 會調用多次,并且可能在排序過程中參數發生改變(針對執行個體的請求統計資料一直在并發改變)
    Map<ServiceInstance, Integer> used = Maps.newHashMap();
    Map<ServiceInstance, Long> callings = Maps.newHashMap();
    Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap();
    serviceInstances = serviceInstances.stream().sorted(
            Comparator
                    //之前已經調用過的網段,這裡排後面
                    .<ServiceInstance>comparingInt(serviceInstance -> {
                        return used.computeIfAbsent(serviceInstance, k -> {
                            return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> {
                                return serviceInstance.getHost().contains(prefix);
                            }) ? 1 : 0;
                        });
                    })
                    //目前錯誤率最少的
                    .thenComparingDouble(serviceInstance -> {
                        return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> {
                            double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance);
                            //由于使用的是移動平均值(EMA),需要忽略過小的差異(保留兩位小數,不是四舍五入,而是直接舍棄)
                            return ((int) (value * 100)) / 100.0;
                        });
                    })
                    //目前負載請求最少的
                    .thenComparingLong(serviceInstance -> {
                        return callings.computeIfAbsent(serviceInstance, k ->
                                serviceInstanceMetrics.getCalling(serviceInstance)
                        );
                    })
    ).collect(Collectors.toList());
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    ServiceInstance serviceInstance = serviceInstances.get(0);
    //記錄本次傳回的網段
    calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf(".")));
    //目前記錄這個隻為了相容之前的單元測試(調用次數測試)
    positionCache.get(traceId).getAndIncrement();
    return new DefaultResponse(serviceInstance);
}
           

對于記錄執行個體資料的緩存何時更新,是在 FeignClient 粘合重試,斷路以及線程隔離的代碼中的,這個我們下一節就會看到。

一些組内關于方案設計的取舍 Q&A

1. 為何沒有使用所有微服務共享的緩存來儲存調用資料,來讓這些資料更加準确?

共享緩存的可選方案包括将這些資料記錄放入 Redis,或者是 Apache Ignite 這樣的記憶體網格中。但是有兩個問題:

  1. 如果資料記錄放入 Redis 這樣的額外存儲,如果 Redis 不可用會導緻所有的負載均衡都無法執行。如果放入 Apache Ignite,如果對應的節點下線,那麼對應的負載均衡也無法執行。這些都是不能接受的。
  2. 假設微服務 A 需要調用微服務 B,可能 A 的某個執行個體調用 B 的某個執行個體有問題,但是 A 的其他執行個體調用 B 的這個執行個體卻沒有問題,例如當某個可用區與另一個可用區網絡擁塞的時候。如果用同一個緩存 Key 記錄 A 所有的執行個體調用 B 這個執行個體的資料,顯然是不準确的。

每個微服務使用本地緩存,記錄自己調用其他執行個體的資料,在我們這裡看來,不僅是更容易實作,也是更準确的做法。

2. 采用 EMA 的方式而不是請求視窗的方式統計最近錯誤率

采用請求視窗的方式統計,肯定是最準确的,例如我們統計最近一分鐘的錯誤率,就将最近一分鐘的請求緩存起來,讀取的時候,将緩存起來的請求資料加在一起取平均數即可。但是這種方式在請求突增的時候,可能會占用很多很多記憶體來緩存這些請求。同時計算錯誤率的時候,随着緩存請求數的增多也會消耗更大量的 CPU 進行計算。這樣做很不值得。

EMA 這種滑動平均值的計算方式,常見于各種性能監控統計場景,例如 JVM 中 TLAB 大小的動态計算,G1 GC Region 大小的伸縮以及其他很多 JVM 需要動态得出合适值的地方,都用這種計算方式。他不用将請求緩存起來,而是直接用最新值乘以一個比例之後加上老值乘以 (1 - 這個比例),這個比例一般高于 0.5,表示 EMA 和目前最新值更加相關。

但是 EMA 也帶來另一個問題,我們會發現随着程式運作小數點位數會非常多,會看到類似于如下的值:0.00000000123, 0.120000001, 0.120000003, 為了忽略過于細緻差異的影響(其實這些影響也來自于很久之前的錯誤請求),我們隻保留兩位小數進行排序。

微信搜尋“我的程式設計喵”關注公衆号,每日一刷,輕松提升技術,斬獲各種offer:
SpringCloud更新之路2020.0.x版-32. 改進負載均衡算法

繼續閱讀