天天看點

DUBBO負載均衡算法及源碼分析DUBBO負載均衡算法及源碼分析

文章目錄

  • DUBBO負載均衡算法及源碼分析
    • 前言
    • 類結構
    • 随機權重-RandomLoadBalance
      • 算法思想
      • 源碼分析
        • 主要代碼-doSelect
        • getweight分析
    • 權重輪詢-RoundRobinLoadBalance
      • 算法思想
          • 步驟分析
      • 源碼分析
    • 最少活躍連接配接數-LeastActiveLoadBalance
      • 算法思想
      • 源碼分析
    • 一緻性hash-ConsistentHashLoadBalance
      • 算法思想
      • 源碼分析
      • 補充
    • 總結

DUBBO負載均衡算法及源碼分析

前言

​ 下面是dubbo官方叢集容錯的布局圖,叢集容錯有以下元件,Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。這邊分析下LoadBalance (負載均衡子產品) ,LoadBalance 仍然是在消費者端實作的功能。

​ 後文源碼 dubbo版本2.6.1
DUBBO負載均衡算法及源碼分析DUBBO負載均衡算法及源碼分析

類結構

​ 由下圖可知,Dubbo共提供了四種負載均衡政策,分别是基于權重随機算法的 RandomLoadBalance(預設實作)、基于最少活躍調用數算法的 LeastActiveLoadBalance、基于 hash 一緻性的 ConsistentHashLoadBalance,以及基于權重輪詢算法的 RoundRobinLoadBalance。

DUBBO負載均衡算法及源碼分析DUBBO負載均衡算法及源碼分析

其中所有的實作類繼承與AbstractLoadBalance,該類提供模闆方法

/**
     * select one invoker in list.
     *
     * 從 Invoker 集合中,選擇一個
     *
     * @param invokers   invokers.  Directory子產品查到的清單
     * @param url        refer url
     * @param invocation invocation.
     * @return selected invoker.
     */
@Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers == null || invokers.isEmpty()) {
            return null;
        }
        //如果長度為1 直接傳回
        if (invokers.size() == 1) {
            return invokers.get(0);
        }
        //留給具體子類實作
        return doSelect(invokers, url, invocation);
    }
           

随機權重-RandomLoadBalance

算法思想

​ 下面講解的時權重不相等的情況,如果權重都相等,采用随機算法從0到n-1中随機擷取一個整數,取對應下标的invoker。

DUBBO負載均衡算法及源碼分析DUBBO負載均衡算法及源碼分析

第一步:對所有的invoker清單權重求和,和為totalWeight。

第二步:在0-totalWeight中取一個随機整數offset。

第三步:offset依次減去每一個的invoker的權重,offset小于0時終止條件。

offset每次減去一個權重,其實主要為了看offset落在上圖的對應哪一個區間,每個區間的大小是該機器的權重,是以達到了随機權重的作用。

源碼分析

主要代碼-doSelect

@Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // Number of invokers
        int totalWeight = 0; // The sum of weights
        boolean sameWeight = true; // Every invoker has the same weight?
        // 計算總權限
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation); // 獲得權重
            totalWeight += weight; // Sum
            if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false;
            }
        }
        // 權重不相等,随機後,判斷在哪個 Invoker 的權重區間中
        if (totalWeight > 0 && !sameWeight) {
            // 随機
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = random.nextInt(totalWeight);
            // Return a invoker based on the random value.
            // 區間判斷
            for (Invoker<T> invoker : invokers) {
                offset -= getWeight(invoker, invocation);
                if (offset < 0) {
                    return invoker;
                }
            }
        }
        // 權重相等,平均随機
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(random.nextInt(length));
    }
           

getweight分析

​ 如果遠端URL配置了remote.timestamp參數 ,權重采用動态擷取。為了防止在做服務擴容的時候,擴容的機器,如果是固定的權重,可能一下子接收了大量的請求,導緻服務波動嚴重。這邊通過預熱時長,和啟動時長動态的調整了擴容服務的權重,規避的大量請求打入剛加入叢集的新機器。

protected int getWeight(Invoker<?> invoker, Invocation invocation) {
        // 獲得 weight 配置,即服務權重。預設為 100
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
        if (weight > 0) {
            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
            if (timestamp > 0L) {
                // 獲得啟動總時長
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                // 獲得預熱需要總時長。預設為 10 * 60 * 1000 = 10 分鐘
                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                // 處于預熱中,計算目前的權重
                if (uptime > 0 && uptime < warmup) {
                    weight = calculateWarmupWeight(uptime, warmup, weight);
                }
            }
        }
        return weight;
    }

   static int calculateWarmupWeight(int uptime, int warmup, int weight) {
        // 計算權重
        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
        // 權重範圍為 [0, weight] 之間
        return ww < 1 ? 1 : (ww > weight ? weight : ww);
    }
           

權重輪詢-RoundRobinLoadBalance

算法思想

​ 在講權重輪詢時,我們先看一下輪詢如何實作,現有服務提供者A,B,C。我們将第一個請求配置設定給伺服器 A,第二個請求配置設定給伺服器 B,第三個請求配置設定給伺服器 C,第四個請求再次配置設定給伺服器 A。這個過程就叫做輪詢。

權重輪詢:由于每一台機器的性能是有差異的,或者我們生産有多機房,機房之間請求也不是平均的,是以對不同機器配置不同權重是很常見的。現在A,B,C的權重為5,2,1 我們要達到在8次請求中,伺服器 A 将收到其中的5次請求,伺服器 B 會收到其中的2次請求,伺服器 C 則收到其中的1次請求。

步驟分析

第一步:采用一個全局map儲存每個服務和服務的請求次數

第二步:計算總的清單權重weightSum,并提取每個invoker和他的權重放進局部invokerToWeightMap中

第三步:用目前輪詢序号與服務提供者總權重取模,餘數為mod-從0取到weightSum

第四步:外循環,從0-maxweight,由于maxweight*length >= weightSum 保證了對應mod可以遞減到0

第五步:内循環,對invokerToWeightMap進行周遊,當取到mod為0,且該invoker的權重還大于0的時候,傳回,否則mod–,invoker.weight–;

下面是官方文檔的一個案例

假設我們有三台伺服器 servers = [A, B, C],對應的權重為 weights = [2, 5, 1]。接下來對上面的邏輯進行簡單的模拟。

mod = 0:滿足條件,此時直接傳回伺服器 A

mod = 1:需要進行一次遞減操作才能滿足條件,此時傳回伺服器 B

mod = 2:需要進行兩次遞減操作才能滿足條件,此時傳回伺服器 C

mod = 3:需要進行三次遞減操作才能滿足條件,經過遞減後,伺服器權重為 [1, 4, 0],此時傳回伺服器 A

mod = 4:需要進行四次遞減操作才能滿足條件,經過遞減後,伺服器權重為 [0, 4, 0],此時傳回伺服器 B

mod = 5:需要進行五次遞減操作才能滿足條件,經過遞減後,伺服器權重為 [0, 3, 0],此時傳回伺服器 B

mod = 6:需要進行六次遞減操作才能滿足條件,經過遞減後,伺服器權重為 [0, 2, 0],此時傳回伺服器 B

mod = 7:需要進行七次遞減操作才能滿足條件,經過遞減後,伺服器權重為 [0, 1, 0],此時傳回伺服器 B

源碼分析

/**
     * 服務方法與計數器的映射
     *
     * KEY:serviceKey + "." + methodName
     */
    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();


    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // Number of invokers
        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
        int weightSum = 0;
        // 計算最小、最大權重,總的權重和。
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                weightSum += weight;
            }
        }
        // 獲得 AtomicPositiveInteger 對象
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            // 不存在的話 初始目前服務請求計數器
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        // 傳回目前順序号,并遞增 + 1
        int currentSequence = sequence.getAndIncrement();
        // 權重不相等,順序根據權重配置設定
        if (maxWeight > 0 && minWeight < maxWeight) {
            int mod = currentSequence % weightSum; // 剩餘權重
            for (int i = 0; i < maxWeight; i++) { // 循環最大權重
                for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { // 循環 Invoker 集合
                    final Invoker<T> k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    // 剩餘權重歸 0 ,目前 Invoker 還有剩餘權重,傳回該 Invoker 對象
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    // 若 Invoker 還有權重值,扣除它( value )和剩餘權重( mod )。
                    if (v.getValue() > 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        // 權重相等,平均順序獲得
        // Round robin
        return invokers.get(currentSequence % length);
    }
           

​ 通過源碼分析該算法采用了兩重循環,如果機器權重配置差異過大,實際上性能會有點慢,是以高版本做了後續的幾次優化。

最少活躍連接配接數-LeastActiveLoadBalance

算法思想

​ 在分析源碼前,我們先來了解一個概念—活躍連接配接數(active),active表示該服務提供者的目前服務活躍程度,初始情況下,活躍度為0,每收到(發出)一個請求,活躍數加1,完成請求時減1。在服務運作中,服務提供者處理的請求越快,則其活躍度下降的越快,是以最少活躍連接配接數的服務提供者,相對應的其性能最好,是以此負載均衡算法是基于性能為參考依據,進行負載。

​ 活躍連接配接數也作用于并發限流,ActiveLimitFilter(服務消費者端),ExecuteLimitFilter(服務提供者端)

​ 補充:如果存在多個最小活躍連接配接數的服務提供者,則對該集合采用随機權重的擷取方法得到對應的invoker。

源碼分析

/**
* 最少活躍調用數,相同活躍數的随機,活躍數指調用前後計數差。
* 使慢的提供者收到更少請求,因為越慢的提供者的調用前後計數差會越大。
*
**/
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // invokers清單總個數
        int leastActive = -1; // 最小的活躍數
        int leastCount = 0; // 相同最小活躍數的個數
        int[] leastIndexes = new int[length]; // 相同最小活躍數的下标
        int totalWeight = 0; // 總權重
        int firstWeight = 0; // 第一個權重,用于于計算是否相同
        boolean sameWeight = true; // 是否所有權重相同
        // 計算獲得相同最小活躍數的數組和個數
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            //通過RpcStatus擷取該invokers的活躍數
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); 
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 權重
            // 發現更小的活躍數,重新開始
            if (leastActive == -1 || active < leastActive) { 
                leastActive = active; // 記錄最小活躍數
                leastCount = 1; // 重新統計相同最小活躍數的個數
                leastIndexes[0] = i; // 重新記錄最小活躍數下标
                totalWeight = weight; // 重新累計總權重
                firstWeight = weight; // 記錄第一個權重
                sameWeight = true; // 還原權重相同辨別
            } else if (active == leastActive) { // 累計相同最小的活躍數
                leastIndexes[leastCount++] = i; // 累計相同最小活躍數下标
                totalWeight += weight; // 累計總權重
                // 判斷所有權重是否一樣
                if (sameWeight && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // 如果隻有一個最小則直接傳回
        if (leastCount == 1) {
            return invokers.get(leastIndexes[0]);
        }
     // 如果權重不相同且權重大于0則按總權重數随機
        if (!sameWeight && totalWeight > 0) {
            int offsetWeight = random.nextInt(totalWeight);
            // 并确定随機值落在哪個片斷上
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
        // 如果權重相同或權重為0則均等随機
        return invokers.get(leastIndexes[random.nextInt(leastCount)]);
    }
           

一緻性hash-ConsistentHashLoadBalance

一緻性雜湊演算法保證相同的請求參數,每次請求到相同的伺服器上面,

算法思想

​ 首先存在一個hash環,原算法包含(232次方 - 1)個節點,dubbo采用了一個treeMap在虛拟構造該環。

​ 然後,所有節點分布在環上面,如果一次采用一個節點 可能有多種情況的分布,如下圖。

DUBBO負載均衡算法及源碼分析DUBBO負載均衡算法及源碼分析

​ 可以看到,如果一個服務提供者值在環上隻有一個節點的話,分布是各種各樣的,極有可能存在資料傾斜,為了避免這個問題,提出了虛拟節點的概念。dubbo預設為每一個提供者設定160個虛拟節點,每四個節點為一組,然後所有虛拟節點均勻分布在環上面。

​ 在做服務選擇時,先從請求參數中構造keyString,并采用同樣MD5摘要算法,然後做hash求具體key,在哈希換上取距離key最近的服務提供者如果不存在,選擇第一個。

DUBBO負載均衡算法及源碼分析DUBBO負載均衡算法及源碼分析

源碼分析

public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "consistenthash";

    /**
     * Hash nodes name 哈希node鍵 
     */
    public static final String HASH_NODES = "hash.nodes";

    /**
     * Hash arguments name
     */
    public static final String HASH_ARGUMENTS = "hash.arguments";

    /**
    * 服務方法和一緻性哈希選擇器的映射 key為serviceKey + "."methodName
    */
    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String methodName = RpcUtils.getMethodName(invocation);
        // 構造key
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
        // 基于invokers 根據對象記憶體位址擷取系統的hashCode 
        int identityHashCode = System.identityHashCode(invokers);
        // 擷取目前服務的選擇器
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        // 如果選擇器為空 或者hash值改變了 重新建立選擇器
        if (selector == null || selector.identityHashCode != identityHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        return selector.select(invocation);
    }

    private static final class ConsistentHashSelector<T> {

        // 虛拟節點和invoker的對應映射
        private final TreeMap<Long, Invoker<T>> virtualInvokers;
		// 虛拟節點的個數
        private final int replicaNumber;
		// 定義的哈希值
        private final int identityHashCode;
		// 
        private final int[] argumentIndex;

        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            // 初始化虛拟節點個數 預設160個
            this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
           // 初始化argumentIndex
            String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            // 構造virtualInvokers
            for (Invoker<T> invoker : invokers) {
                String address = invoker.getUrl().getAddress();
                // 每四個虛拟節點一組 保證digest的充分利用  提高效率
                for (int i = 0; i < replicaNumber / 4; i++) {
                    // 擷取一個唯一的摘要
                    byte[] digest = md5(address + i);
                    for (int h = 0; h < 4; h++) {
                        // 對四個中每一個虛拟節點取hash,作為key 并均勻分布在hash環上面
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

        public Invoker<T> select(Invocation invocation) {
            //基于方法參數 作為key
            String key = toKey(invocation.getArguments());
            // 對key計算md5 同樣采用MD5 便于計算long型的key
            byte[] digest = md5(key);
            // 先計算key 
            return selectForKey(hash(digest, 0));
        }

        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
                    buf.append(args[i]);
                }
            }
            return buf.toString();
        }

        private Invoker<T> selectForKey(long hash) {
            //傳回與大于或等于給定鍵的最小鍵關聯的鍵值映射,如果沒有這樣的鍵,則傳回null
            Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
            if (entry == null) {
                 // 不存在,則取 virtualInvokers 第一個
                entry = virtualInvokers.firstEntry();
            }
            return entry.getValue();
        }

        /**
        * hash算法 digest每四位進行拼接位一個32的long資料
        */
        private long hash(byte[] digest, int number) {
            return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;
        }

        /**
        * md5摘要算法  傳回一個128位的byte數組,數組長度位16
        */
        private byte[] md5(String value) {
            MessageDigest md5;
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.reset();
            byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
            md5.update(bytes);
            return md5.digest();
        }

    }

}
           

補充

  • 所謂資料傾斜是指,由于節點不夠分散,導緻大量請求落到了同一個節點上,而其他節點隻會接收到了少量請求的情況。
  • 如果服務提供者有變動(新增或者下線),則需要重新構造一緻性選擇器,此時會有額外的性能問題。
  • 一緻性雜湊演算法中每個節點都是均勻配置設定的,如果性能不同的機器擷取到了相同的請求,則其請求時間會有不同。
  • 大多數請求都是不帶狀态的,隻請求一次就結束了,是以一般不采用一緻性哈希路由算法,該算法常用于資料庫,緩存叢集的方案配置

總結

​ 這次分析了dubbo的負載均衡源碼,我們常用的是随機權重,權重輪詢,最少活躍連接配接數這三種算法,其中權重輪詢在不同版本分别做了優化,後續有多餘時間再進行研究。我們應該根據機器性能合理的配置權重,一開始如果是相同的機器的話,可以采用權重輪詢,但我們後續難免會對機器擴容,如果存在新老機器都存在的情況,就需要配置不同的權重了。

​ 另外我們學習了一緻性hash算法,一緻性hash算法主要解決了正常hash算法在分布式環境下存在的 問題,如果服務節點增加或者減少,其對hash值進行取模時,由于餘數變化,所有的路由關系或者資料映射都會混亂,進而請求不能請求到原先的機器。

即使墜落谷底,也要絕地反擊。