文章目錄
- DUBBO負載均衡算法及源碼分析
-
- 前言
- 類結構
- 随機權重-RandomLoadBalance
-
- 算法思想
- 源碼分析
-
- 主要代碼-doSelect
- getweight分析
- 權重輪詢-RoundRobinLoadBalance
-
- 算法思想
-
-
- 步驟分析
-
- 源碼分析
- 最少活躍連接配接數-LeastActiveLoadBalance
-
- 算法思想
- 源碼分析
- 一緻性hash-ConsistentHashLoadBalance
-
- 算法思想
- 源碼分析
- 補充
- 總結
DUBBO負載均衡算法及源碼分析
前言
下面是dubbo官方叢集容錯的布局圖,叢集容錯有以下元件,Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。這邊分析下LoadBalance (負載均衡子產品) ,LoadBalance 仍然是在消費者端實作的功能。
後文源碼 dubbo版本2.6.1
類結構
由下圖可知,Dubbo共提供了四種負載均衡政策,分别是基于權重随機算法的 RandomLoadBalance(預設實作)、基于最少活躍調用數算法的 LeastActiveLoadBalance、基于 hash 一緻性的 ConsistentHashLoadBalance,以及基于權重輪詢算法的 RoundRobinLoadBalance。
其中所有的實作類繼承與AbstractLoadBalance,該類提供模闆方法
/**
* select one invoker in list.
*
* 從 Invoker 集合中,選擇一個
*
* @param invokers invokers. Directory子產品查到的清單
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty()) {
return null;
}
//如果長度為1 直接傳回
if (invokers.size() == 1) {
return invokers.get(0);
}
//留給具體子類實作
return doSelect(invokers, url, invocation);
}
随機權重-RandomLoadBalance
算法思想
下面講解的時權重不相等的情況,如果權重都相等,采用随機算法從0到n-1中随機擷取一個整數,取對應下标的invoker。
第一步:對所有的invoker清單權重求和,和為totalWeight。
第二步:在0-totalWeight中取一個随機整數offset。
第三步:offset依次減去每一個的invoker的權重,offset小于0時終止條件。
offset每次減去一個權重,其實主要為了看offset落在上圖的對應哪一個區間,每個區間的大小是該機器的權重,是以達到了随機權重的作用。
源碼分析
主要代碼-doSelect
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
// 計算總權限
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation); // 獲得權重
totalWeight += weight; // Sum
if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
// 權重不相等,随機後,判斷在哪個 Invoker 的權重區間中
if (totalWeight > 0 && !sameWeight) {
// 随機
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
// 區間判斷
for (Invoker<T> invoker : invokers) {
offset -= getWeight(invoker, invocation);
if (offset < 0) {
return invoker;
}
}
}
// 權重相等,平均随機
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(random.nextInt(length));
}
getweight分析
如果遠端URL配置了remote.timestamp參數 ,權重采用動态擷取。為了防止在做服務擴容的時候,擴容的機器,如果是固定的權重,可能一下子接收了大量的請求,導緻服務波動嚴重。這邊通過預熱時長,和啟動時長動态的調整了擴容服務的權重,規避的大量請求打入剛加入叢集的新機器。
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 獲得 weight 配置,即服務權重。預設為 100
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 獲得啟動總時長
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 獲得預熱需要總時長。預設為 10 * 60 * 1000 = 10 分鐘
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
// 處于預熱中,計算目前的權重
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 計算權重
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
// 權重範圍為 [0, weight] 之間
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
權重輪詢-RoundRobinLoadBalance
算法思想
在講權重輪詢時,我們先看一下輪詢如何實作,現有服務提供者A,B,C。我們将第一個請求配置設定給伺服器 A,第二個請求配置設定給伺服器 B,第三個請求配置設定給伺服器 C,第四個請求再次配置設定給伺服器 A。這個過程就叫做輪詢。
權重輪詢:由于每一台機器的性能是有差異的,或者我們生産有多機房,機房之間請求也不是平均的,是以對不同機器配置不同權重是很常見的。現在A,B,C的權重為5,2,1 我們要達到在8次請求中,伺服器 A 将收到其中的5次請求,伺服器 B 會收到其中的2次請求,伺服器 C 則收到其中的1次請求。
步驟分析
第一步:采用一個全局map儲存每個服務和服務的請求次數
第二步:計算總的清單權重weightSum,并提取每個invoker和他的權重放進局部invokerToWeightMap中
第三步:用目前輪詢序号與服務提供者總權重取模,餘數為mod-從0取到weightSum
第四步:外循環,從0-maxweight,由于maxweight*length >= weightSum 保證了對應mod可以遞減到0
第五步:内循環,對invokerToWeightMap進行周遊,當取到mod為0,且該invoker的權重還大于0的時候,傳回,否則mod–,invoker.weight–;
下面是官方文檔的一個案例
假設我們有三台伺服器 servers = [A, B, C],對應的權重為 weights = [2, 5, 1]。接下來對上面的邏輯進行簡單的模拟。
mod = 0:滿足條件,此時直接傳回伺服器 A
mod = 1:需要進行一次遞減操作才能滿足條件,此時傳回伺服器 B
mod = 2:需要進行兩次遞減操作才能滿足條件,此時傳回伺服器 C
mod = 3:需要進行三次遞減操作才能滿足條件,經過遞減後,伺服器權重為 [1, 4, 0],此時傳回伺服器 A
mod = 4:需要進行四次遞減操作才能滿足條件,經過遞減後,伺服器權重為 [0, 4, 0],此時傳回伺服器 B
mod = 5:需要進行五次遞減操作才能滿足條件,經過遞減後,伺服器權重為 [0, 3, 0],此時傳回伺服器 B
mod = 6:需要進行六次遞減操作才能滿足條件,經過遞減後,伺服器權重為 [0, 2, 0],此時傳回伺服器 B
mod = 7:需要進行七次遞減操作才能滿足條件,經過遞減後,伺服器權重為 [0, 1, 0],此時傳回伺服器 B
源碼分析
/**
* 服務方法與計數器的映射
*
* KEY:serviceKey + "." + methodName
*/
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // Number of invokers
int maxWeight = 0; // The maximum weight
int minWeight = Integer.MAX_VALUE; // The minimum weight
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;
// 計算最小、最大權重,總的權重和。
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
minWeight = Math.min(minWeight, weight); // Choose the minimum weight
if (weight > 0) {
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
weightSum += weight;
}
}
// 獲得 AtomicPositiveInteger 對象
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
// 不存在的話 初始目前服務請求計數器
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
// 傳回目前順序号,并遞增 + 1
int currentSequence = sequence.getAndIncrement();
// 權重不相等,順序根據權重配置設定
if (maxWeight > 0 && minWeight < maxWeight) {
int mod = currentSequence % weightSum; // 剩餘權重
for (int i = 0; i < maxWeight; i++) { // 循環最大權重
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { // 循環 Invoker 集合
final Invoker<T> k = each.getKey();
final IntegerWrapper v = each.getValue();
// 剩餘權重歸 0 ,目前 Invoker 還有剩餘權重,傳回該 Invoker 對象
if (mod == 0 && v.getValue() > 0) {
return k;
}
// 若 Invoker 還有權重值,扣除它( value )和剩餘權重( mod )。
if (v.getValue() > 0) {
v.decrement();
mod--;
}
}
}
}
// 權重相等,平均順序獲得
// Round robin
return invokers.get(currentSequence % length);
}
通過源碼分析該算法采用了兩重循環,如果機器權重配置差異過大,實際上性能會有點慢,是以高版本做了後續的幾次優化。
最少活躍連接配接數-LeastActiveLoadBalance
算法思想
在分析源碼前,我們先來了解一個概念—活躍連接配接數(active),active表示該服務提供者的目前服務活躍程度,初始情況下,活躍度為0,每收到(發出)一個請求,活躍數加1,完成請求時減1。在服務運作中,服務提供者處理的請求越快,則其活躍度下降的越快,是以最少活躍連接配接數的服務提供者,相對應的其性能最好,是以此負載均衡算法是基于性能為參考依據,進行負載。
活躍連接配接數也作用于并發限流,ActiveLimitFilter(服務消費者端),ExecuteLimitFilter(服務提供者端)
補充:如果存在多個最小活躍連接配接數的服務提供者,則對該集合采用随機權重的擷取方法得到對應的invoker。
源碼分析
/**
* 最少活躍調用數,相同活躍數的随機,活躍數指調用前後計數差。
* 使慢的提供者收到更少請求,因為越慢的提供者的調用前後計數差會越大。
*
**/
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // invokers清單總個數
int leastActive = -1; // 最小的活躍數
int leastCount = 0; // 相同最小活躍數的個數
int[] leastIndexes = new int[length]; // 相同最小活躍數的下标
int totalWeight = 0; // 總權重
int firstWeight = 0; // 第一個權重,用于于計算是否相同
boolean sameWeight = true; // 是否所有權重相同
// 計算獲得相同最小活躍數的數組和個數
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
//通過RpcStatus擷取該invokers的活躍數
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 權重
// 發現更小的活躍數,重新開始
if (leastActive == -1 || active < leastActive) {
leastActive = active; // 記錄最小活躍數
leastCount = 1; // 重新統計相同最小活躍數的個數
leastIndexes[0] = i; // 重新記錄最小活躍數下标
totalWeight = weight; // 重新累計總權重
firstWeight = weight; // 記錄第一個權重
sameWeight = true; // 還原權重相同辨別
} else if (active == leastActive) { // 累計相同最小的活躍數
leastIndexes[leastCount++] = i; // 累計相同最小活躍數下标
totalWeight += weight; // 累計總權重
// 判斷所有權重是否一樣
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
}
// 如果隻有一個最小則直接傳回
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
// 如果權重不相同且權重大于0則按總權重數随機
if (!sameWeight && totalWeight > 0) {
int offsetWeight = random.nextInt(totalWeight);
// 并确定随機值落在哪個片斷上
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0) {
return invokers.get(leastIndex);
}
}
}
// 如果權重相同或權重為0則均等随機
return invokers.get(leastIndexes[random.nextInt(leastCount)]);
}
一緻性hash-ConsistentHashLoadBalance
一緻性雜湊演算法保證相同的請求參數,每次請求到相同的伺服器上面,
算法思想
首先存在一個hash環,原算法包含(232次方 - 1)個節點,dubbo采用了一個treeMap在虛拟構造該環。
然後,所有節點分布在環上面,如果一次采用一個節點 可能有多種情況的分布,如下圖。
可以看到,如果一個服務提供者值在環上隻有一個節點的話,分布是各種各樣的,極有可能存在資料傾斜,為了避免這個問題,提出了虛拟節點的概念。dubbo預設為每一個提供者設定160個虛拟節點,每四個節點為一組,然後所有虛拟節點均勻分布在環上面。
在做服務選擇時,先從請求參數中構造keyString,并采用同樣MD5摘要算法,然後做hash求具體key,在哈希換上取距離key最近的服務提供者如果不存在,選擇第一個。
源碼分析
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
/**
* Hash nodes name 哈希node鍵
*/
public static final String HASH_NODES = "hash.nodes";
/**
* Hash arguments name
*/
public static final String HASH_ARGUMENTS = "hash.arguments";
/**
* 服務方法和一緻性哈希選擇器的映射 key為serviceKey + "."methodName
*/
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
// 構造key
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// 基于invokers 根據對象記憶體位址擷取系統的hashCode
int identityHashCode = System.identityHashCode(invokers);
// 擷取目前服務的選擇器
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 如果選擇器為空 或者hash值改變了 重新建立選擇器
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {
// 虛拟節點和invoker的對應映射
private final TreeMap<Long, Invoker<T>> virtualInvokers;
// 虛拟節點的個數
private final int replicaNumber;
// 定義的哈希值
private final int identityHashCode;
//
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 初始化虛拟節點個數 預設160個
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
// 初始化argumentIndex
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 構造virtualInvokers
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
// 每四個虛拟節點一組 保證digest的充分利用 提高效率
for (int i = 0; i < replicaNumber / 4; i++) {
// 擷取一個唯一的摘要
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
// 對四個中每一個虛拟節點取hash,作為key 并均勻分布在hash環上面
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
//基于方法參數 作為key
String key = toKey(invocation.getArguments());
// 對key計算md5 同樣采用MD5 便于計算long型的key
byte[] digest = md5(key);
// 先計算key
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
//傳回與大于或等于給定鍵的最小鍵關聯的鍵值映射,如果沒有這樣的鍵,則傳回null
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
// 不存在,則取 virtualInvokers 第一個
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
/**
* hash算法 digest每四位進行拼接位一個32的long資料
*/
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
/**
* md5摘要算法 傳回一個128位的byte數組,數組長度位16
*/
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}
}
補充
- 所謂資料傾斜是指,由于節點不夠分散,導緻大量請求落到了同一個節點上,而其他節點隻會接收到了少量請求的情況。
- 如果服務提供者有變動(新增或者下線),則需要重新構造一緻性選擇器,此時會有額外的性能問題。
- 一緻性雜湊演算法中每個節點都是均勻配置設定的,如果性能不同的機器擷取到了相同的請求,則其請求時間會有不同。
- 大多數請求都是不帶狀态的,隻請求一次就結束了,是以一般不采用一緻性哈希路由算法,該算法常用于資料庫,緩存叢集的方案配置
總結
這次分析了dubbo的負載均衡源碼,我們常用的是随機權重,權重輪詢,最少活躍連接配接數這三種算法,其中權重輪詢在不同版本分别做了優化,後續有多餘時間再進行研究。我們應該根據機器性能合理的配置權重,一開始如果是相同的機器的話,可以采用權重輪詢,但我們後續難免會對機器擴容,如果存在新老機器都存在的情況,就需要配置不同的權重了。
另外我們學習了一緻性hash算法,一緻性hash算法主要解決了正常hash算法在分布式環境下存在的 問題,如果服務節點增加或者減少,其對hash值進行取模時,由于餘數變化,所有的路由關系或者資料映射都會混亂,進而請求不能請求到原先的機器。
即使墜落谷底,也要絕地反擊。