Dubbo學習筆記
- Dubbo業務架構的源碼解析
-
- 叢集容錯
-
- 修改配置檔案
- 容錯執行個體的加載與建立
- 容錯方案的調用
- 容錯政策的解析
-
- Failover——會降級
- Failfast——會降級
- Failsafe——不會降級
- Failback——不會降級
- Forking——會降級
- Broadcast——會降級
- Available——會降級
- registryaware——會降級
- mergeable——會降級
- 負載均衡
-
- 擷取負載均衡政策
- 負載均衡前的準備過程
-
- 負載均衡政策解析
-
- random
- leastactive
- roundrobin
- consistenthash
- 負載均衡政策應用場景
-
- random
- leastactive
- roundrobin
- consistenthash
Dubbo業務架構的源碼解析
叢集容錯
修改配置檔案
容錯執行個體的加載與建立
容錯方案的調用
容錯政策的解析
Failover——會降級
- 故障轉移政策。當消費者調用提供者叢集中的某個伺服器失敗時,其會自動嘗試着調用其它伺服器。而重試的次數是通過 retries 屬性指定的。
Failfast——會降級
- 快速失敗政策。消費者端隻發起一次調用,若失敗則立即報錯。通常用于非幂等性的寫操作,比如新增記錄。
Failsafe——不會降級
- 失敗安全政策。當消費者調用提供者出現異常時,直接忽略本次消費操作。該政策通常用于執行相對不太重要的服務。
- 當遠端調用出現異常時,其不會抛出異常,而是傳回一個異步的 null 值。對于消費者來說,根本就不知道遠端調用出現了問題,即将這次遠端調用給忽略了。從這點來說,其是不公發生降級的。
Failback——不會降級
- 失敗自動恢複政策。消費者調用提供者失敗後,Dubbo 會記錄下該失敗請求,然後會定時發起重試請求,而定時任務執行的次數仍是通過配置檔案中的 retries 指定的。該政策通常用于實時性要求不太高的服務。
- 再看下 RetryTimerTask 這個内部類:
Forking——會降級
- 并行政策。消費者對于同一服務并行調用多個提供者伺服器,隻要一個成功即調用結束并傳回結果。通常用于實時性要求較高的讀操作,但其會浪費較多伺服器資源。
Broadcast——會降級
- 廣播政策。廣播調用所有提供者,逐個調用,任意一台報錯則報錯。通常用于通知所有提供者更新緩存或日志等本地資源資訊。
Available——會降級
- 首個可用政策。從所有 invoker 中查找,選擇第一個可用的 invoker。
registryaware——會降級
- 該政策會從預設注冊中心擷取首個可用的 invoker 傳回。若沒有可用的,則從所有注冊中心中查找首個可用的 invoker 傳回。
mergeable——會降級
- 合并政策。将多個 group 的 invoker 的執行結果進行合并。
負載均衡
- 再啟動3個提供者,端口分别是 20881、20882、20883,下如圖所示。
擷取負載均衡政策
- 分析入口:InvokerInvocationHandler#invoke()方法,以Debug啟動消費者端
負載均衡前的準備過程
reselect
select
負載均衡政策解析
random
- 權重随機算法,是 Dubbo 預設的負載均衡算法。權重越大,擷取到負載的機率就越大。權重相同,則會随機配置設定。
- 其适用于,提供者主機性能差别較大的場景。若所有提供者的性能差别不大,則會使該算法變為了純粹的随機算法。而純粹的随機算法則可能會出現“負載堆積”問題,即可能存在生成的某些随機數機率較高進而導緻某些提供者的負載較大。
- 算法思路如下:
leastactive
- 權重最小活躍度排程算法。活躍度越小,其優選級就越高,被排程到的機率就越高。活躍度相同,則按照權重随機算法進行負載均衡。
- 最小活躍度即目前 invoker 正在處理的請求數量。Invoker 每接收到一個請求,活躍度就會加一,處理完一個請求,活躍度就會減一。
- 其适合于提供者主機性能差别不大的場景。根據各個提供者主機目前處理實際情況進行負載均衡。即誰這會兒的活幹完了,就馬上給誰再分新活兒。
roundrobin
- 雙權重輪詢算法,是結合主機權重與輪詢權重的、方法級别的輪詢算法。
- 主機權重:配置檔案中設定的 weight,是一個整型值。
- 輪詢權重:是一個執行個體。
- 算法思路圖解:
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
// 回收期,預設60秒
// 若某個invoker的輪詢權重目前值“一直”沒有變化,
// 則說明每次輪詢時都沒有其對應的這個invoker,
// 說明這個invoker挂了,需要将其從緩存map中清除。
// 這個“一直”就是這裡的 回收期
private static final int RECYCLE_PERIOD = 60000;
// 輪詢權重
protected static class WeightedRoundRobin {
// 主機權重
private int weight;
// 輪詢權重目前值
private AtomicLong current = new AtomicLong(0);
// 記錄輪詢權重目前值的增重時間戳
private long lastUpdate;
// 擷取主機權重
public int getWeight() {
return weight;
}
// 初始化主機權重
public void setWeight(int weight) {
this.weight = weight;
current.set(0); // 将輪詢權重目前值歸0
}
// 輪詢權重目前值增重,每次增重一個主機權重
public long increaseCurrent() {
return current.addAndGet(weight);
}
// 輪詢權重目前值減重,每次增重一個所有主機權重之和
public void sel(int total) {
current.addAndGet(-1 * total);
}
// 擷取最後的增重時間戳
public long getLastUpdate() {
return lastUpdate;
}
// 更新最後的增重時間戳
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
// 這是一個雙層map,
// 外層map的key是全限定性方法名,value是内層map
// 内層map的key為一個invoker的url,其就代表了一個invoker,是以我們就可以将其直接視為invoker
// 内層map的value為輪詢權重執行個體
// 這個内層map與其對應的外層map的key間的關系是,内層map可以提供外層map的key(方法)的服務
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
// 更新鎖,其值為false,表示鎖是打開的
private AtomicBoolean updateLock = new AtomicBoolean();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 擷取目前RPC調用的方法的全限定性方法名
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 從緩存Map中擷取目前調用方法所對應的所有invoker及其對應的輪詢權重,即内層map
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
// 若這個内層map為null,說明要麼沒有這個方法,要麼真的是這個map為null
// 此時建立一個内層map放入到緩存Map中
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;
// 記錄本輪循環中輪詢權重目前值的最大值
long maxCurrent = Long.MIN_VALUE;
// 目前時間
long now = System.currentTimeMillis();
// 記錄最終被選擇出的invoker及其對應的輪詢權重執行個體
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
// 周遊所有invoker
for (Invoker<T> invoker : invokers) {
// 擷取目前周遊invoker的url,即内層map的key,格式 dubbo://ip:port/業務接口名
String identifyString = invoker.getUrl().toIdentityString();
// 從緩存中擷取目前invoker對應的輪詢權重執行個體
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
// 擷取目前invoker的主機權重
int weight = getWeight(invoker, invocation);
// 若目前invoker對應的輪詢權重執行個體為null,則建立一個新的,初始化後再寫入到緩存map
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
}
// 什麼時候主機權重會發生變化?warmup過程中的主機權重
// 會随着時間的推移,越來越接近weight設定值
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
// 更新目前輪詢權重執行個體的主機權重。注意,此時也會将輪詢權重目前值歸0
weightedRoundRobin.setWeight(weight);
}
// 為目前invoker的輪詢權重目前值增重
long cur = weightedRoundRobin.increaseCurrent();
// 記錄本次增重時間戳
weightedRoundRobin.setLastUpdate(now);
// 若目前輪詢權重目前值比目前記錄的最大current還大,則記錄下這個值及invoker
if (cur > maxCurrent) {
// 更新最大目前值
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
// 計算主機權重total
totalWeight += weight;
} // end-for
// 若updateLock為false(即更新鎖是打開狀态),
// 且目前所有可用的invoker數量與緩存中invoker的數量不相同,則進行如下處理
// 問題:這個“不相同”的情況有幾種可能?
// 1)invokers.size() > map.size()
// 2)invokers.size() < map.size()
// 分析:
// 1> 若有新增的invoker,則經過前面的for,已經會為這些新增invoker建立相應的輪詢執行個體,
// 将其其放入到了緩存map中。是以對于這種有新增invoker的情況,代碼走到這裡,其invokers的
// 數量也是與map的size()相同的。故,第1)種情況不可能出現
// 2> 第2)種情況描述的是什麼場景?有invoker出現了當機的情況
if (!updateLock.get() && invokers.size() != map.size()) {
// 上鎖
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference ==> 目的是為了“疊代穩定性”
// 建立一個map
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
// 使用老map初始化新map
newMap.putAll(map);
// 疊代新map
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
while (it.hasNext()) {
Entry<String, WeightedRoundRobin> item = it.next();
// item.getValue().getLastUpdate() 擷取目前疊代元素的最後增重時間戳
// now - item.getValue().getLastUpdate() 目前疊代元素已經有多久沒有更新時間戳了
// if()條件用于判斷停更時長是否超過了 回收期
// 若超過了,則說明目前疊代invoker已經挂了,需要從map中删除
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
it.remove();
}
}
// 将更新過的新map替換掉緩存Map中的老map
methodWeightMap.put(key, newMap);
} finally {
// 解鎖
updateLock.set(false);
}
}
}
// 若用于記錄輪詢權重最大的invoker的變量selectedInvoker不為null,則傳回這個invoker
// 但是在傳回之前,先讓其目前值變為最小
if (selectedInvoker != null) {
// 輪詢權重目前值減重變為最小
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
}
consistenthash
- 一緻性 hash 算法。其是一個方法參數級别的負載均衡,對于同一調用方法的、相同實參的、遠端調用請求,其會被路由到相同的 invoker。其是以調用方法的指定實參的 hash 值為 key 進行 invoker 選擇的。
- 配置:
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
public static final String HASH_NODES = "hash.nodes";
public static final String HASH_ARGUMENTS = "hash.arguments";
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 擷取到目前RPC調用的簡單方法名
String methodName = RpcUtils.getMethodName(invocation);
// 獲緻到RPC調用的全限定性方法名
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// 擷取到invokers的hash,其将來用于建構selector
int identityHashCode = System.identityHashCode(invokers);
// 從緩存中擷取目前方法的選擇器selector
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 若selector為null,或這個存在的selector的hash與invokers的hash不同,則建立一個新的selector并放入到緩存
if (selector == null || selector.identityHashCode != identityHashCode) {
// 建立一個新的selector
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 使用selector根據調用資訊進行選擇
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 建立一個map用于存放虛拟invoker,key為虛拟invoker(其本質就是一個hash值),value為實體invoker
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 擷取hash.nodes屬性值,即為每個實體invoker建立的虛拟invoker的數量,預設160
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
// 擷取hash.arguments屬性值,并使用逗号(,)進行分隔
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
// 這個數組中的元素将來是hash.arguments屬性值解析出的索引轉換為整型後的數值
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]); // 将String變為Integer寫入到數組
}
// 周遊所有實體invoker,為每一個實體invoker建立replicaNumber個虛拟invoker
for (Invoker<T> invoker : invokers) {
// 擷取實體invoker的位址 ip:port
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
// 使用md5算法生成一個128位的摘要: 生成一個hash值需要使用一個32位的二進制數,
// 是以一個digest可以生成4個hash,将這個digest分為4段:0-31, 32-63, 64-95, 96-127
byte[] digest = md5(address + i);
// 使用digest的每一段生成一個hash
for (int h = 0; h < 4; h++) {
// 使用一個32位的二進制數生成一個hash,這個hash就代表了一個虛拟invoker
long m = hash(digest, h);
// 将這個hash(虛拟invoker)作為key,實體invoker作為value,寫入到map
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
// 将指定的實參值進行字元串拼接
String key = toKey(invocation.getArguments());
// 根據這個key計算出摘要
byte[] digest = md5(key);
// 使用這個digest的前32位(0-31)計算出一個hash,然後使用這個hash做選擇
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
// 周遊argumentIndex的每一個元素,這些元素是RPC調用方法的實參的索引
for (int i : argumentIndex) {
// 若這個周遊的索引值在有效範圍内,則将這個實參拼接
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
// 選擇出一個虛拟invoker的entry
// 選擇比目前指定hash值大的最小的虛拟invoker(hash值),若不存在,則傳回null
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
// 若這個entry為null,則選擇第一個虛拟invoker的entry
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
// 傳回選擇的虛拟invoker對應的實體invoker
return entry.getValue();
}
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}
}
負載均衡政策應用場景
random
- 其适用于,提供者主機性能差别較大的場景。若所有提供者的性能差别不大,則會使該算法變為了純粹的随機算法。而純粹的随機算法則可能會出現“負載堆積”問題,即可能存在生成的某些随機數機率較高進而導緻某些提供者的負載較大。
leastactive
- 其适合于提供者主機性能差别不大的場景。根據各個提供者主機目前處理實際情況進行負載均衡。即誰這會兒的活幹完了,就馬上給誰再分新活兒。
roundrobin
- 前兩種算法屬于服務級别的負載均衡,而 roundrobin 則屬于方法級别的。
- 其适合于某服務中包含多個方法,其中某個方法屬于高消耗方法(消耗系統資源或耗時)。若對于該方法的請求突然再現高并發,此時若采用服務級别的負載均衡,則可能會出現問題:某時刻某個或某些提供者剛處理完畢一批請求,此時它們的負載較小。而此時恰好對這個高消耗的方法的高并發請求到達。這時可能會一次性将大量的這個高消耗請求配置設定給這些負載較小的提供者,可能會導緻這些提供者直接崩潰。
- 是以對于這種場景,使用方法級别的負載均衡較好。
consistenthash
- 其是一個方法參數級别的負載均衡。
- 其應用場景是,對于某些方法,其參數選擇範圍較小,即參數值相對較固定。使用該算法,可以使對相同方法的相同參數請求,被負載均衡到相同的提供者。這樣就可以充分地利用提供者的緩存,提高性能。