天天看點

Dubbo筆記 ⑯ :Dubbo叢集元件 之 LoadBalance一、前言三、 AbstractLoadBalance四、政策實作五、自定義 LoadBalance

文章目錄

  • 一、前言
    • 1. LoadBalance
    • 2. 調用時機
  • 三、 AbstractLoadBalance
  • 四、政策實作
    • 1. RandomLoadBalance
    • 2. RoundRobinLoadBalance
    • 3. LeastActiveLoadBalance
    • 4. ConsistentHashLoadBalance
      • 4.1 Hash環 的初始化
      • 4.2 節點的查找
  • 五、自定義 LoadBalance

一、前言

本系列為個人Dubbo學習筆記,内容基于《深度剖析Apache Dubbo 核心技術内幕》, 過程參考官方源碼分析文章,僅用于個人筆記記錄。本文分析基于Dubbo2.7.0版本,由于個人了解的局限性,若文中不免出現錯誤,感謝指正。

叢集元件相關文章:

  1. Dubbo筆記⑫ :Dubbo 叢集元件概述
  2. Dubbo筆記⑬ :Dubbo 叢集元件 之 Cluster & ClusterInvoker
  3. Dubbo筆記⑭ :Dubbo叢集元件 之 Directory
  4. Dubbo筆記⑮ :Dubbo叢集元件 之 Router
  5. Dubbo筆記⑯ :Dubbo叢集元件 之 LoadBalance

由于官方存在關于 LoadBalance 的文檔,并且寫的很詳細,是以本文大部分内容來源于官方,僅為留檔。如需閱讀,推薦官方文檔:https://dubbo.apache.org/zh/docs/v2.7/dev/source/loadbalance/

當服務提供方是叢集時,為了避免大量請求一直集中在一個或者幾個服務提供方機器上,進而使這些機器負載很高,甚至導緻服務不可用,需要做一定的負載均衡政策。Dubbo提供了多種均衡政策,預設為random,也就是每次随機調用一台服務提供者的服務。

1. LoadBalance

LoadBalance 接口是 負載均衡實作的基礎 SPI 接口,其實作如下:

// 标注是 SPI 接口,預設實作是 RandomLoadBalance
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
	// @Adaptive("loadbalance") 表示會為 select 生成代理方法,并且通過 url 中的 loadbalance 參數值決定選擇何種負載均衡實作政策。
    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
           

而 org.apache.dubbo.rpc.cluster.LoadBalance 檔案如下:

random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
           

由此我們可知,Dubbo提供的負載均衡政策有如下幾種:

  • Random LoadBalance:随機政策。按照機率設定權重,比較均勻,并且可以動态調節提供者的權重。
  • RoundRobin LoadBalance:輪循政策。輪循,按公約後的權重設定輪循比率。會存在執行比較慢的服務提供者堆積請求的情況,比如一個機器執行得非常慢,但是機器沒有當機(如果當機了,那麼目前機器會從ZooKeeper 的服務清單中删除),當很多新的請求到達該機器後,由于之前的請求還沒處理完,會導緻新的請求被堆積,久而久之,消費者調用這台機器上的所有請求都會被阻塞。
  • LeastActive LoadBalance:最少活躍調用數。如果每個提供者的活躍數相同,則随機選擇一個。在每個服務提供者裡維護着一個活躍數計數器,用來記錄目前同時處理請求的個數,也就是并發處理任務的個數。這個值越小,說明目前服務提供者處理的速度越快或者目前機器的負載比較低,是以路由選擇時就選擇該活躍度最底的機器。如果一個服務提供者處理速度很慢,由于堆積,那麼同時處理的請求就比較多,也就是說活躍調用數目較大(活躍度較高),這時,處理速度慢的提供者将收到更少的請求。
  • ConsistentHash LoadBalance:一緻性Hash政策。一緻性Hash,可以保證相同參數的請求總是發到同一提供者,當某一台提供者機器當機時,原本發往該提供者的請求,将基于虛拟節點平攤給其他提供者,這樣就不會引起劇烈變動。

2. 調用時機

下圖是消費者發起調用後的簡化時序圖 (圖源 《深度剖析Apache Dubbo 核心技術内幕》)

Dubbo筆記 ⑯ :Dubbo叢集元件 之 LoadBalance一、前言三、 AbstractLoadBalance四、政策實作五、自定義 LoadBalance

當消費者進行服務調用時,在 Cluster Invoker 中會進行負載均衡。需要注意的是,并非所有的 ClusterInvoker 都會進行負載均衡的調用。

三、 AbstractLoadBalance

AbstractLoadBalance 類是 LoadBalance 實作類的父級抽象類,完成了一些基礎的功能的實作。上面的各種負載均衡政策實際上就是繼承了AbstractLoadBalance方法,但重寫了其doSelect()方法,是以下面我們重點看看每種負載均衡政策的doSelect()方法

/**
 * AbstractLoadBalance
 */
public abstract class AbstractLoadBalance implements LoadBalance {
 
    static int calculateWarmupWeight(int uptime, int warmup, int weight) {
        // 計算權重,下面代碼邏輯上形似于 (uptime / warmup) * weight。
    	// 随着服務運作時間 uptime 增大,權重計算值 ww 會慢慢接近配置值 weight
        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
        return ww < 1 ? 1 : (ww > weight ? weight : ww);
    }

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    	// 服務校驗,如果服務清單為空則傳回null
        if (invokers == null || invokers.isEmpty()) {
            return null;
        }
        // 如果服務清單隻有一個,則不需要負載均衡,直接傳回
        if (invokers.size() == 1) {
            return invokers.get(0);
        }
        // 進行負載均衡,供子類實作
        return doSelect(invokers, url, invocation);
    }

    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

    //  服務提供者權重計算邏輯
    protected int getWeight(Invoker<?> invoker, Invocation invocation) {
    	// 從 url 中擷取權重 weight 配置值。預設 100
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
        if (weight > 0) {
        	// 擷取服務提供者啟動時間戳
            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
            if (timestamp > 0L) {
            	// 計算服務提供者運作時長
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                 // 擷取服務預熱時間,預設為10分鐘
                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                // 如果服務運作時間小于預熱時間,則重新計算服務權重,即降級
                if (uptime > 0 && uptime < warmup) {
                	 // 重新計算服務權重
                    weight = calculateWarmupWeight(uptime, warmup, weight);
                }
            }
        }
        return weight >= 0 ? weight : 0;
    }

}

           

上面是權重的計算過程,該過程主要用于保證當服務運作時長小于服務預熱時間時,對服務進行降級,避免讓服務在啟動之初就處于高負載狀态。服務預熱是一個優化手段,與此類似的還有 JVM 預熱。主要目的是讓服務啟動後“低功率”運作一段時間,使其效率慢慢提升至最佳狀态。

四、政策實作

1. RandomLoadBalance

RandomLoadBalance 是權重随機算法的具體實作。

它的算法思想很簡單。假設我們有一組伺服器 servers = [A, B, C],他們對應的權重為 weights = [5, 3, 2],權重總和為10。現在把這些權重值平鋪在一維坐标值上,[0, 5) 區間屬于伺服器 A,[5, 8) 區間屬于伺服器 B,[8, 10) 區間屬于伺服器 C。接下來通過随機數生成器生成一個範圍在 [0, 10) 之間的随機數,然後計算這個随機數會落到哪個區間上。比如數字3會落到伺服器 A 對應的區間上,此時傳回伺服器 A 即可。權重越大的機器,在坐标軸上對應的區間範圍就越大,是以随機數生成器生成的數字就會有更大的機率落到此區間内。隻要随機數生成器産生的随機數分布性很好,在經過多次選擇後,每個伺服器被選中的次數比例接近其權重比例。比如,經過一萬次選擇後,伺服器 A 被選中的次數大約為5000次,伺服器 B 被選中的次數約為3000次,伺服器 C 被選中的次數約為2000次。

RandomLoadBalance#doSelect 的實作如下:

@Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        // 服務提供者數量
        int length = invokers.size();
        // Every invoker has the same weight?
        // 是否所有的服務提供者權重相等
        boolean sameWeight = true;
        // the weight of every invokers
        // 用來存儲每個提供者的權重
        int[] weights = new int[length];
        // the first invoker's weight
        // 擷取第一個提供者的權重
        int firstWeight = getWeight(invokers.get(0), invocation);
        weights[0] = firstWeight;
        // The sum of weights
        // 所有提供者的權重總和
        int totalWeight = firstWeight;
        // 計算出所有提供者的權重之和
        for (int i = 1; i < length; i++) {
        	// 擷取權重
            int weight = getWeight(invokers.get(i), invocation);
            // save for later use
            weights[i] = weight;
            // Sum
            totalWeight += weight;
            if (sameWeight && weight != firstWeight) {
            	// 如果有權重不相等,則表明不是所有的權重都相等
                sameWeight = false;
            }
        }
        // 總權重大于0 && 權重不全相等,則根據權重進行配置設定調用
        if (totalWeight > 0 && !sameWeight) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            // 生成一個 [0, totalWeight) 區間内的數字
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            // 按照上面說的規則判斷這個随機數落入哪個提供者的區間,則傳回該服務提供者
            for (int i = 0; i < length; i++) {
                offset -= weights[i];
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        // 如果所有提供者權重相等,則随機傳回一個
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }
           

需要注意的是,這裡沒有使用Random而是使用了ThreadLocalRandom,這是出于性能上的考慮,因為Random在高并發下會導緻大量線程競争同一個原子變量,導緻大量線程原地自旋,進而浪費CPU資源

RandomLoadBalance 的算法思想比較簡單,在經過多次請求後,能夠将調用請求按照權重值進行“均勻”配置設定。當然 RandomLoadBalance 也存在一定的缺點,當調用次數比較少時,Random 産生的随機數可能會比較集中,此時多數請求會落到同一台伺服器上。這個缺點并不是很嚴重,多數情況下可以忽略。RandomLoadBalance 是一個簡單,高效的負載均衡實作,是以 Dubbo 選擇它作為預設實作。

2. RoundRobinLoadBalance

RoundRobinLoadBalance 即權重輪詢負載均衡的實作。

在詳細分析源碼前,我們先來了解一下什麼是權重輪詢。這裡從最簡單的輪詢開始講起,所謂輪詢是指将請求輪流配置設定給每台伺服器。舉個例子,我們有三台伺服器 A、B、C。我們将第一個請求配置設定給伺服器 A,第二個請求配置設定給伺服器 B,第三個請求配置設定給伺服器 C,第四個請求再次配置設定給伺服器 A。這個過程就叫做輪詢。輪詢是一種無狀态負載均衡算法,實作簡單,适用于每台伺服器性能相近的場景下。但現實情況下,我們并不能保證每台伺服器性能均相近。如果我們将等量的請求配置設定給性能較差的伺服器,這顯然是不合理的。是以,這個時候我們需要對輪詢過程進行權重,以調控每台伺服器的負載。經過權重後,每台伺服器能夠得到的請求數比例,接近或等于他們的權重比。比如伺服器 A、B、C 權重比為 5:2:1。那麼在8次請求中,伺服器 A 将收到其中的5次請求,伺服器 B 會收到其中的2次請求,伺服器 C 則收到其中的1次請求。

在上述基礎上,此處的輪詢算法還需要避免 類似 [A, A, A, A, A, B, B, C] 的情況産生,這樣會使得 A 服務在短時間内接收了大量請求,而是需要實作 類似[A, A, B, A, A ,C, A, B] 的效果以均勻的對服務進行通路。

RoundRobinLoadBalance#doSelect 實作如下:

@Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    	// 1. 擷取調用方法的key,格式 key = 全限定類名 + "." + 方法名,比如 com.xxx.DemoService.sayHello
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        // 2. 擷取該調用方法對應的每個服務提供者的 WeightedRoundRobin對象組成的map
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        // 3. 周遊所有的提供者,計算總權重和權重最大的提供者
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();
        Invoker<T> selectedInvoker = null;
        WeightedRoundRobin selectedWRR = null;
        for (Invoker<T> invoker : invokers) {
        	// 擷取提供者唯一id :格式為 protocol://ip:port/接口全限定類名。
        	//  如 dubbo://30.10.75.231:20880/com.books.dubbo.demo.api.GreetingService
            String identifyString = invoker.getUrl().toIdentityString();
            WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
            // 擷取目前提供者的 權重
            int weight = getWeight(invoker, invocation);
			// 如果 weightedRoundRobin  為空,則說明 map 中還沒有針對此提供者的權重資訊,則建立後儲存
            if (weightedRoundRobin == null) {
            	// 建立提供者的權重資訊,并儲存到map中
                weightedRoundRobin = new WeightedRoundRobin();
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(identifyString, weightedRoundRobin);
            }
            // 如果權重有變化,更新權重資訊
            if (weight != weightedRoundRobin.getWeight()) {
                //weight changed
                // 3.1 權重變化
                weightedRoundRobin.setWeight(weight);
            }
            // 這裡 cur = cur + weight
            long cur = weightedRoundRobin.increaseCurrent();
            // 設定權重更新時間
            weightedRoundRobin.setLastUpdate(now);
            // 記錄提供者中的最大權重的提供者
            if (cur > maxCurrent) {
            	// 最大權重
                maxCurrent = cur;
                // 最大權重的提供者
                selectedInvoker = invoker;
                selectedWRR = weightedRoundRobin;
            }
            // 記錄總權重
            totalWeight += weight;
        }
        // 4. 更新 Map
        // CAS 確定線程安全: updateLock為true 時表示有線程在更新 methodWeightMap
        // 如果 invokers.size() != map.size() 則說明提供者清單有變化
        if (!updateLock.get() && invokers.size() != map.size()) {
            if (updateLock.compareAndSet(false, true)) {
                try {
                    // copy -> modify -> update reference
                    // 4.1 拷貝新值
                    ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                    newMap.putAll(map);
                    // 4.2 更新map,移除過期值
                    Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, WeightedRoundRobin> item = it.next();
                        if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                            it.remove();
                        }
                    }
                    // 4.3 更新methodWeightMap
                    methodWeightMap.put(key, newMap);
                } finally {
                    updateLock.set(false);
                }
            }
        }
        // 5. 傳回選擇的 服務提供者
        if (selectedInvoker != null) {
        	// 更新selectedInvoker 權重 : selectedInvoker 權重 = selectedInvoker 權重 - totalWeight
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        // 傳回第一個服務提供者,不會走到這裡
        return invokers.get(0);
    }
           

其中 WeightedRoundRobin 為 RoundRobinLoadBalance 内部類,其實作如下:

protected static class WeightedRoundRobin {
        private int weight;
        private AtomicLong current = new AtomicLong(0);
        private long lastUpdate;
        public int getWeight() {
            return weight;
        }
        public void setWeight(int weight) {
            this.weight = weight;
            current.set(0);
        }
        public long increaseCurrent() {
            return current.addAndGet(weight);
        }
        public void sel(int total) {
            current.addAndGet(-1 * total);
        }
        public long getLastUpdate() {
            return lastUpdate;
        }
        public void setLastUpdate(long lastUpdate) {
            this.lastUpdate = lastUpdate;
        }
    }
           

我們按照代碼中的注釋的順序來整理整個過程:

  1. 擷取調用方法的 methodKey,格式 key = 全限定類名 + “.” + 方法名,比如 com.xxx.DemoService.sayHello
  2. 擷取該調用方法對應的每個服務提供者的 WeightedRoundRobin對象組成的map。

    這裡 methodWeightMap 的定義為

    ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>>

    ,其格式如下:
    |-- methodWeightMap 
     |    |-- key : 擷取調用方法的 methodKey,格式 key = 全限定類名 + "." + 方法名。即第一步中的key 
     |		   |-- key : invoker.getUrl().toIdentityString(), 格式為 protocol://ip:port/接口全限定類名。
     					 如 dubbo://30.10.75.231:20880/com.books.dubbo.demo.api.GreetingService
     |	 	   |-- value : WeightedRoundRobin對象,其中記錄了目前提供者的權重和最後一次更新的時間
               
  3. 周遊所有的提供者,計算總權重和權重最大的提供者。需要注意經過這一步,每一個WeightedRoundRobin對象中的current 都經曆了如下運算 : current = current + weight
  4. 更新 methodWeightMap :目前沒有線程更新 methodWeightMap && 提供者清單有變化。updateLock用來確定更新methodWeightMap的線程安全,這裡使用了原子變量的CAS操作,減少了因為使用鎖帶來的開銷。
    1. 建立 newMap 儲存新的 提供者權重資訊。
    2. 更新map,移除過期值。這裡 RECYCLE_PERIOD 是清理周期,如果服務提供者耗時RECYCLE_PERIOD還沒有更新自己的WeightedRoundRobin對象,則會被自動回收;
    3. 更新 methodWeightMap
  5. 傳回權重最大的提供者。這裡

    selectedWRR.sel(totalWeight);

    的作用是更新selectedInvoker 權重 :

    selectedInvoker 權重 = selectedInvoker 權重 - totalWeight

其中,關于 RoundRobinLoadBalance 算法的解析,詳參官方解釋

3. LeastActiveLoadBalance

LeastActiveLoadBalance 翻譯過來是最小活躍數負載均衡。活躍調用數越小,表明該服務提供者效率越高,機關時間内可處理更多的請求。此時應優先将請求配置設定給該服務提供者。

在具體實作中,每個服務提供者對應一個活躍數 active。初始情況下,所有服務提供者活躍數均為0。每收到一個請求,活躍數加1,完成請求後則将活躍數減1。在服務運作一段時間後,性能好的服務提供者處理請求的速度更快,是以活躍數下降的也越快,此時這樣的服務提供者能夠優先擷取到新的服務請求、這就是最小活躍數負載均衡算法的基本思想。除了最小活躍數,LeastActiveLoadBalance 在實作上還引入了權重值。是以準确的來說,LeastActiveLoadBalance 是基于權重最小活躍數算法實作的。

舉個例子說明一下,在一個服務提供者叢集中,有兩個性能優異的服務提供者。某一時刻它們的活躍數相同,此時 Dubbo 會根據它們的權重去配置設定請求,權重越大,擷取到新請求的機率就越大。如果兩個服務提供者權重相同,此時随機選擇一個即可。

LeastActiveLoadBalance#doSelect 的實作如下:

@Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    	/*********** 1. 屬性初始化 ************/
        // Number of invokers
        // 擷取服務提供者個數
        int length = invokers.size();
        // The least active value of all invokers
        // 臨時變量,用于暫存目前最小的活躍調用次數
        int leastActive = -1;
        // The number of invokers having the same least active value (leastActive)
        // 具有相同“最小活躍數”的服務者提供者(以下用 Invoker 代稱)數量
        int leastCount = 0;
        // The index of invokers having the same least active value (leastActive)
        // 記錄具有最小活躍調用的提供者在 invokers 中的下标位置
        int[] leastIndexes = new int[length];
        // the weight of every invokers
        // 記錄每個服務提供者的權重
        int[] weights = new int[length];
        // The sum of the warmup weights of all the least active invokes
        // 記錄活躍調用次數最小的服務提供者的權重和
        int totalWeight = 0;
        // The weight of the first least active invoke
        // 記錄第一個調用次數等于最小調用次數的服務提供者的權重。用于與其他具有相同最小活躍數的 Invoker 的權重進行對比,
        // 以檢測是否“所有具有相同最小活躍數的 Invoker 的權重”均相等
        int firstWeight = 0;
        // Every least active invoker has the same weight value?
        // 所有的調用次數等于最小調用次數的服務提供者的權重是否一樣
        boolean sameWeight = true;
		/*********** 2. 挑選最小的活躍調用的服務提供者 ************/
        // Filter out all the least active invokers
        // 過濾出所有調用次數等于最小調用次數的服務提供者
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // Get the active number of the invoke
            // 擷取目前服務提供者的被調用次數
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // Get the weight of the invoke configuration. The default value is 100.
            // 擷取目前服務提供者的權重
            int afterWarmup = getWeight(invoker, invocation);
            // save for later use
            // 記錄下目前服務提供者的權重
            weights[i] = afterWarmup;
            // If it is the first invoker or the active number of the invoker is less than the current least active number
            // 如果是第一個服務提供者(leastActive  == -1) 或者 目前服務提供者的活躍調用次數小于目前最小的活躍調用次數。
            // 滿足上述條件則認為最小活躍調用相關資訊需要更新,進行更性
            if (leastActive == -1 || active < leastActive) {
                // Reset the active number of the current invoker to the least active number
                // 記錄下最新的最小活躍調用次數
                leastActive = active;
                // Reset the number of least active invokers
                // 最小活躍調用的 提供者數量為1
                leastCount = 1;
                // Put the first least active invoker first in leastIndexs
                // 記錄最小活躍調用次數的提供者在  invokers 中的下标
                leastIndexes[0] = i;
                // Reset totalWeight
                // 重置最小活躍調用次數的提供者的權重和
                totalWeight = afterWarmup;
                // Record the weight the first least active invoker
                // 記錄目前最小活躍調用的權重
                firstWeight = afterWarmup;
                // Each invoke has the same weight (only one invoker here)
                // 此時處于重置最小活躍調用次數資訊,目前隻有一個提供者,是以所有的調用次數等于最小調用次數的服務提供者的權重一樣
                sameWeight = true;
                // If current invoker's active value equals with leaseActive, then accumulating.
            }
			// 如果目前提供者的活躍調用次數等于目前最小活躍調用次數
			 else if (active == leastActive) {
                // Record the index of the least active invoker in leastIndexs order
                // 記錄目前服務提供者的下标
                leastIndexes[leastCount++] = i;
                // Accumulate the total weight of the least active invoker
                // 累加最小活躍調用者的權重
                totalWeight += afterWarmup;
                // If every invoker has the same weight?
                // 是否每個最小活躍調用者的權重都相等
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }
		/*********** 3. 對最小活躍調用的服務提供者的處理 ************/
        // Choose an invoker from all the least active invokers
        // 如果隻有一個最小調用者次數的 Invoker 則直接傳回
        if (leastCount == 1) {
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return invokers.get(leastIndexes[0]);
        }
        //  如果最小調用次數者的 Invoker 有多個且權重不同
        if (!sameWeight && totalWeight > 0) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on 
            // totalWeight.
            // 按照 RandomLoadBalance 的算法按照權重随機一個服務提供者。
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        // 如果最小調用此數者的invoker有多個并且權重相同,則随機拿一個使用。
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
    }
           

簡述思想:

這裡的代碼雖然看起來很複雜,但是其思路很簡單:首先從所有服務提供者中擷取 活躍調用最小的 提供者。但是因為活躍調用最小的提供者可能有多個。如果隻有一個,則直接傳回。如果存在多個,則從中按照 RandomLoadBalance 的權重算法挑選。

4. ConsistentHashLoadBalance

ConsistentHashLoadBalance 實作的是 一緻性Hash負載均衡政策。關于一緻性hash,可以參考官方文檔解析,本文不再贅述。

ConsistentHashLoadBalance#doSelect 的實作如下:

// 每個 methodKey 對應一個 “hash環”
    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
    
    @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String methodName = RpcUtils.getMethodName(invocation);
        // 擷取服務調用方法 的key
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
        // 擷取 invokers 的hash 值
        int identityHashCode = System.identityHashCode(invokers);
        // 擷取目前方法 key 對應的 一緻性hash選擇器 ConsistentHashSelector
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
         // 如果 invokers 是一個新的 List 對象,意味着服務提供者數量發生了變化,可能新增也可能減少了。
        // 此時 selector.identityHashCode != identityHashCode 條件成立
        if (selector == null || selector.identityHashCode != identityHashCode) {
        	// 建立新的 ConsistentHashSelector
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
         // 調用 ConsistentHashSelector 的 select 方法選擇 Invoker
        return selector.select(invocation);
    }
           

如上,doSelect 方法主要做了一些前置工作,比如檢測 invokers 清單是不是變動過,以及建立 ConsistentHashSelector。

而重點則在于 ConsistentHashSelector 中。

  • ConsistentHashSelector 在建立時完成了 Hash環的建立。
  • ConsistentHashSelector#select 方法中完成了節點的查找。

下面我們來詳細看一看

4.1 Hash環 的初始化

ConsistentHashSelector 的構造函數如下:

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            // 1. 擷取虛拟節點數,預設為160。
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            // 2. 擷取參與 hash 計算的參數下标值,預設對第一個參數進行 hash 運算。
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            // String 轉 Integer
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            // 3.  根據所有服務提供者的invoker清單,生成從Hash環上的節點到服務提供者機器的映射關系,并存放到virtualInvokers中
            for (Invoker<T> invoker : invokers) {
                String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {
                	// 對 address + i 進行 md5 運算,得到一個長度為16的位元組數組
                    byte[] digest = md5(address + i);
                    // 對 digest 部分位元組進行4次 hash 運算,得到四個不同的 long 型正整數
                    for (int h = 0; h < 4; h++) {
                     	// h = 0 時,取 digest 中下标為 0 ~ 3 的4個位元組進行位運算
	                    // h = 1 時,取 digest 中下标為 4 ~ 7 的4個位元組進行位運算
	                    // h = 2, h = 3 時過程同上
                        long m = hash(digest, h);
                        // 将 hash 到 invoker 的映射關系存儲到 virtualInvokers 中,
                    	// virtualInvokers 需要提供高效的查詢操作,是以選用 TreeMap 作為存儲結構
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }
           

我們按照上面注釋中的步驟:

  1. 擷取虛拟節點數,預設為160。虛拟節點的數量代表每個服務提供者在 Hash環上有多少個虛拟節點。
  2. 擷取參與 hash 計算的參數下标值,預設對第一個參數進行 hash 運算。即當消費者進行調用該方法時,會使用調用的第一個參數進行hash運算得到hash值并依據此hash值在hash環上找到對應的Invoker。後面調用的時候會詳解。
  3. 根據所有服務提供者的invoker清單,生成從Hash環上的節點到服務提供者機器的映射關系,并存放到virtualInvokers中。簡單來說,就是生成Hash環上的虛拟節點,并儲存到 Hash環中。需要注意的是這裡的 Hash環實作的資料結構是 TreeMap。

4.2 節點的查找

ConsistentHashSelector#select 的實作如下:

public Invoker<T> select(Invocation invocation) {
 			// 1. 擷取參與一緻性Hash 的key。預設是方法的第一個參數
            String key = toKey(invocation.getArguments());
            // 2. 根據具體算法擷取 key對應的md5值
            byte[] digest = md5(key);
            // 3. 計算  key 對應 hash 環上的哪個節點,并傳回對應的Invoker。
            return selectForKey(hash(digest, 0));
        }

		// 根據 argumentIndex 指定的下标來擷取調用方法時的入參并拼接後傳回。
        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
                    buf.append(args[i]);
                }
            }
            return buf.toString();
        }
        // 從 hash 環 上擷取對應的節點
        private Invoker<T> selectForKey(long hash) {
            Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
            if (entry == null) {
                entry = virtualInvokers.firstEntry();
            }
            return entry.getValue();
        }
		// 進行hash 算法
        private long hash(byte[] digest, int number) {
            return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;
        }
		// md5 計算
        private byte[] md5(String value) {
            MessageDigest md5;
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.reset();
            byte[] bytes;
            try {
                bytes = value.getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.update(bytes);
            return md5.digest();
        }
           

這裡我們需要注意一下 toKey(invocation.getArguments()) 的實作。在上面 Hash環的初始化中,我們知道了在初始化Hash環時會擷取

hash.arguments

屬性,并轉換為

argumentIndex

。其作用在于此,當消費者調用時,會使用前 argumentIndex+1 個入參值作為key進行 hash,依次來選擇合适的服務提供者。

// 根據 argumentIndex 指定的下标來擷取調用方法時的入參并拼接後傳回。
        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
                    buf.append(args[i]);
                }
            }
            return buf.toString();
        }
           

五、自定義 LoadBalance

  1. 建立自定義的負載均衡類,實作LoadBalance 接口
    public class SimpleLoadBalance implements LoadBalance {
        @Override
        public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
            return invokers.get(0);
        }
    }
               
  2. 在META-INF/dubbo 目錄下建立建立 org.apache.dubbo.rpc.cluster.LoadBalance 檔案,并添加如下内容用以指定 simple 協定指定使用 SimpleLoadBalance 作為負載均衡政策 :

    如下圖:

    Dubbo筆記 ⑯ :Dubbo叢集元件 之 LoadBalance一、前言三、 AbstractLoadBalance四、政策實作五、自定義 LoadBalance
  3. 調用時指定使用 simple 協定的負載均衡政策
    @Reference(loadbalance = "simple")
        private DemoService demoService;
               

以上:内容部分參考

《深度剖析Apache Dubbo 核心技術内幕》

https://dubbo.apache.org/zh/docs/v2.7/dev/source/

如有侵擾,聯系删除。 内容僅用于自我記錄學習使用。如有錯誤,歡迎指正