dubbo作為分布式遠端調用架構,要保證的點很多,比如:服務注冊與發現、故障轉移、高性能通信、負載均衡等等!
負載均衡的目的是為了特定場景下,能夠将請求合理地平分到各服務執行個體上,以便發揮所有機器的疊加作用。主要考慮的點如:不要配置設定請求到挂掉的機器,性能越好的機器可以配置設定更多的請求。。。
一般負載均衡是借助外部工具,硬體負載均衡或軟體負載均衡,如F5/nginx。當然了,在目前分布式環境遍地開花的情況下,用戶端的負載均衡看起來就更輕量級,顯得不可或缺。
今天我們就來看看dubbo是如何進行負載均衡的吧!
1. dubbo負載均衡的作用?
其出發點,自然也就是普通的負載均衡器的出發點了。将負載均衡功能實作在rpc用戶端側,以便能夠随時适應外部的環境變化,更好地發揮硬體作用。而且用戶端的負載均衡天然地就避免了單點問題。定制化的自有定制化的優勢和劣勢。
它可以從配置檔案中指定,也可以在管理背景進行配置修改。
事實上,它支援 服務端服務/方法級别、用戶端服務/方法級别 的負載均衡配置。
2. dubbo有哪些負載均衡方式?
即dubbo提供了哪些負載均衡政策呢?
Dubbo内置了4種負載均衡政策:
RandomLoadBalance:随機負載均衡。随機的選擇一個。是Dubbo的預設負載均衡政策。
RoundRobinLoadBalance:輪詢負載均衡。輪詢選擇一個。
LeastActiveLoadBalance:最少活躍調用數,相同活躍數的随機。活躍數指調用前後計數差。使慢的 Provider 收到更少請求,因為越慢的 Provider 的調用前後計數差會越大。
ConsistentHashLoadBalance:一緻性哈希負載均衡。相同參數的請求總是落在同一台機器上。
3. 如何配置dubbo負載均衡政策?
其實在第一點時已經提過,有多種級别的配置:服務端服務/方法級别、用戶端服務/方法級别; 具體配置如下:
<!-- 服務端服務級别 -->
<dubbo:service interface="..." loadbalance="roundrobin" />
<!-- 用戶端服務級别 -->
<dubbo:reference interface="..." loadbalance="roundrobin" />
<!-- 服務端方法級别 -->
<dubbo:service interface="...">
<dubbo:method name="hello" loadbalance="roundrobin"/>
</dubbo:service>
<!-- 用戶端方法級别 -->
<dubbo:reference interface="...">
<dubbo:method name="hello" loadbalance="roundrobin"/>
</dubbo:reference>
多個配置是有覆寫關系的, 配置的優先級是:
1. 用戶端方法級别配置;(最優先)
2. 用戶端接口級别配置;
3. 服務端方法級别配置;
4. 服務端接口級别配置;(最後使用)
注意: 雖說以上配置有全封閉服務端配置的,有針對用戶端配置的,但是,真正使負載均衡起作用的是,用戶端在發起調用的時候,使用相應負載均衡算法進行選擇調用。(服務端不可能有這能力)
負載均衡器的初始化過程如下:
// 調用提供者服務入口
// org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, Object> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation);
// 根據可用的提供者清單和要調用的方法,決定選取的負載均衡器
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
// 執行個體化一個負載均衡器,以備後續使用
// org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#initLoadBalance
/**
* Init LoadBalance.
* <p>
* if invokers is not empty, init from the first invoke's url and invocation
* if invokes is empty, init a default LoadBalance(RandomLoadBalance)
* </p>
*
* @param invokers invokers
* @param invocation invocation
* @return LoadBalance instance. if not need init, return null.
*/
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
if (CollectionUtils.isNotEmpty(invokers)) {
// 從provider 的 url 位址中取出 loadbalance=xxx 配置,如果沒有仍使用 random 政策
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
} else {
// 預設是 random 政策
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
}
}
是以,事實上,到最終用戶端決定使用哪個負載均衡政策時,隻是從請求參數中取出 loadbalance=xxx 的參數,進而決定具體執行個體。前面所有的配置,也都是為決定這個參數做出的努力。
4. dubbo負載均衡的實作?
前面我們看到,dubbo中提供了4種負載均衡政策,功能也是很明了。那麼他們都是如何實作的呢?
先來看下其繼承圖:
很明顯,多個負載均衡器都有一些共同點,是以統一使用 AbstractLoadBalance 進行抽象模闆方法,差異點由各子算法決定即可。
那麼抽象類中,到底有多少公用功能被抽取出來了呢?到底什麼是公用的呢?
// org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance#select
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 沒有可用的提供者,沒得選
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
// 隻有一個提供者,沒得均衡的,直接用
if (invokers.size() == 1) {
return invokers.get(0);
}
// 然後就是各自均衡算法的實作了
return doSelect(invokers, url, invocation);
}
好吧,看起來是我想多了。抽象方法并沒有太多的職責,僅做普通判空操作而已。不過它倒是提供幾個公用方法被調用,如 getWeight();
事實上,模闆方法更多地存在于叢集的抽象調用方法中。AbstractClusterInvoker 。
整個負載均衡的功能,都被統一放在 cluster 子產品下的 loadbalance 包下,一看即明了。
還是來看具體的實作好玩些!
4.1. 随機負載均衡的實作
/**
* This class select one provider from multiple providers randomly.
* You can define weights for each provider:
* If the weights are all the same then it will use random.nextInt(number of invokers).
* If the weights are different then it will use random.nextInt(w1 + w2 + ... + wn)
* Note that if the performance of the machine is better than others, you can set a larger weight.
* If the performance is not so good, you can set a smaller weight.
*/
public class RandomLoadBalance extends AbstractLoadBalance {
// 辨別自身
public static final String NAME = "random";
/**
* Select one invoker between a list using a random criteria
* @param invokers List of possible invokers
* @param url URL
* @param invocation Invocation
* @param <T>
* @return The selected invoker
*/
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Every invoker has the same weight?
boolean sameWeight = true;
// the weight of every invokers
int[] weights = new int[length];
// the first invoker's weight
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// The sum of weights
int totalWeight = firstWeight;
for (int i = 1; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// save for later use
weights[i] = weight;
// 計算出所有權重和,以便在進行随機時設定範圍
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
// 針對各提供供者權重不一的情況,則找到第一個大于随機數的提供者即可
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果大家權重都一樣,則直接以個數進行随機即可得到提供者
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
}
稍微有點小技巧的就是,針對不一樣權重的随機實作,以相減的方式找到第一個為負的提供者即可。注意,此處計算各提供者權重的算法,倒成了難點了有木有。
4.2. 輪詢負載均衡的實作
/**
* Round robin load balance.
*/
public class RoundRobinLoadBalance extends AbstractLoadBalance {
// 自身辨別
public static final String NAME = "roundrobin";
private static final int RECYCLE_PERIOD = 60000;
protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0);
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
public long increaseCurrent() {
return current.addAndGet(weight);
}
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
private AtomicBoolean updateLock = new AtomicBoolean();
/**
* get invoker addr list cached for specified invocation
* <p>
* <b>for unit test only</b>
*
* @param invokers
* @param invocation
* @return
*/
protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map != null) {
return map.keySet();
}
return null;
}
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
int weight = getWeight(invoker, invocation);
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
}
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
// 自增權重
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
// 擷取最大權重項,并以對應的 invoker 作為本次選擇的執行個體
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
// 當invoker數量發生變化時,需要能感覺到,以便清理 map, 避免記憶體洩露
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
// 超出計數周期,則清空原來的 WeightedRoundRobin
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
if (selectedInvoker != null) {
// 将本次選中的invoker, 權重置為最低, 以便下次不會再被選中
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
}
依次從最大權重的invoker開始選擇,然後将選中的項放到最後,輪流選中。使用一個 ConcurrentHashMap 來儲存每個url的權重資訊,且維護其活躍性。
4.3. 最少活躍調用數負載均衡的實作
/**
* LeastActiveLoadBalance
* <p>
* Filter the number of invokers with the least number of active calls and count the weights and quantities of these invokers.
* If there is only one invoker, use the invoker directly;
* if there are multiple invokers and the weights are not the same, then random according to the total weight;
* if there are multiple invokers and the same weight, then randomly called.
*/
public class LeastActiveLoadBalance extends AbstractLoadBalance {
// 自身辨別
public static final String NAME = "leastactive";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the least active invokers
int totalWeight = 0;
// The weight of the first least active invoker
int firstWeight = 0;
// Every least active invoker has the same weight value?
boolean sameWeight = true;
// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoker
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoker's configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
// save for later use
weights[i] = afterWarmup;
// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexes
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexes order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// 如果隻有一個最小則直接傳回
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
// 如果權重不相同且權重大于0則按總權重數随機
// 并确定随機值落在哪個片斷上(第一個相減為負的值)
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
官方解釋:最少活躍調用數,相同活躍數的随機,活躍數指調用前後計數差,使慢的機器收到更少。
額,有點難以了解的樣子。
4.4. 一緻性哈希負載均衡的實作
/**
* ConsistentHashLoadBalance
*/
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
/**
* Hash nodes name
* 通過 指定 hash.nodes=0,1,2... 可以自定義參與一緻性hash的參數清單
*/
public static final String HASH_NODES = "hash.nodes";
/**
* Hash arguments name
*/
public static final String HASH_ARGUMENTS = "hash.arguments";
// 使用selector 儲存某個固定狀态時 invoker 的映射關系
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 第一次進入或者 identityHashCode 不相等時(invoker環境發生了變化)
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 存放虛拟節點
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// hash.nodes 預設是 160
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
// 取出參與一緻性hash計算的參數資訊
String key = toKey(invocation.getArguments());
byte[] digest = md5(key);
// 根據hash值選取 invoker
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
// 按照指定的參與hash的參數,調用 toString() 方法,得到參數辨別資訊
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
// ceilingEntry
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
// 如果沒有找到,取第一個值
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}
}
主要就是取第多少個參數,參與hashCode的計算,然後按照一緻性hash算法,定位invoker. 它使用一個 TreeMap 來儲存一緻性哈希虛拟節點,hashCode->invoker形式存儲,使用 ceilingEntry(hash) 的方式擷取最近的虛拟節點(天然的一緻性hash應用)。
值得一提的是,一緻性哈希負載均衡政策是唯一一個沒有使用到權重項的負載均衡算法。而前面幾種均衡算法,多少都與權重相關。該負載均衡的應用場景嘛,還得自己找了。
5. 更多的負載均衡政策?
dubbo實作了4種負載均衡政策,是否就隻能是這麼多呢?一個好的設計是不會的,對擴充開放。基于dubbo的SPI機制,可以自行實作任意的負載均衡政策!
1. 實作 LoadBalance 接口;
2. 添加資源檔案 添加檔案:src/main/resource/META-INF/dubbo/com.alibaba.dubbo.rpc.cluster.LoadBalance;
demo=my=com.demo.dubbo.DemoLoadBalance
3. 設定負載均衡政策為自己實作的demo;
6. 有了負載均衡就可以保證高可用了嗎?
當然不能。負載均衡隻是保證了在發生調用的時候,可以将流量按照既定規定均攤到各機器上。然而,均攤是不是最合理的方式卻是不一定的。另外,如果發生異常,此次負載均衡就失敗了,進而成功躲過了高可用。
事實上,dubbo用三種方式協同保證了高可用:
1. 負載均衡
2. 叢集容錯
3. 服務路由
以下故事描述摘自官網:
這3個概念容易混淆。他們都描述了怎麼從多個 Provider 中選擇一個來進行調用。那他們到底有什麼差別呢?下面我來舉一個簡單的例子,把這幾個概念闡述清楚吧。
有一個Dubbo的使用者服務,在北京部署了10個,在上海部署了20個。一個杭州的服務消費方發起了一次調用,然後發生了以下的事情:
根據配置的路由規則,如果杭州發起的調用,會路由到比較近的上海的20個 Provider。
根據配置的随機負載均衡政策,在20個 Provider 中随機選擇了一個來調用,假設随機到了第7個 Provider。
結果調用第7個 Provider 失敗了。
根據配置的Failover叢集容錯模式,重試其他伺服器。
重試了第13個 Provider,調用成功。
上面的第1,2,4步驟就分别對應了路由,負載均衡和叢集容錯。 Dubbo中,先通過路由,從多個 Provider 中按照路由規則,選出一個子集。再根據負載均衡從子集中選出一個 Provider 進行本次調用。如果調用失敗了,根據叢集容錯政策,進行重試或定時重發或快速失敗等。 可以看到Dubbo中的路由,負載均衡和叢集容錯發生在一次RPC調用的不同階段。最先是路由,然後是負載均衡,最後是叢集容錯。
不要害怕今日的苦,你要相信明天,更苦!