開篇
這篇文章的目的主要是為了講解下Dubbo中路由政策功能,核心問題包括路由的更新流程和生效流程,當然這些流程都是針對interface服務級别的。
路由生成流程
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
// 将路由的url資訊進行轉換并添加路由資訊當中
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}
private Optional<List<Router>> toRouters(List<URL> urls) {
if (urls == null || urls.isEmpty()) {
return Optional.empty();
}
List<Router> routers = new ArrayList<>();
for (URL url : urls) {
if (EMPTY_PROTOCOL.equals(url.getProtocol())) {
continue;
}
String routerType = url.getParameter(ROUTER_KEY);
if (routerType != null && routerType.length() > 0) {
url = url.setProtocol(routerType);
}
try {
// 根據路由政策走SPI擷取走不同政策的路由生成
Router router = ROUTER_FACTORY.getRouter(url);
if (!routers.contains(router)) {
routers.add(router);
}
} catch (Throwable t) {
logger.error("convert router url to router error, url: " + url, t);
}
}
return Optional.of(routers);
}
}
- dubbo的consumer在引用provider對應服務的時候會監聽服務對應注冊中心上的configurators、providers、routers三個目錄。
- 任意一個目錄發生變更的時候都會進入notify()階段,routers的變更就在這個時候會發生。
- toRouters(routerURLs).ifPresent(this::addRouters)将路由的URL轉換為Router對象并添加到Routers。
- toRouters()方法裡url.setProtocol(routerType)将routerType作為protocol字段注入到URL當中,routerType字段對應路由當中的router字段(如router=condition)。
file=com.alibaba.dubbo.rpc.cluster.router.file.FileRouterFactory
script=com.alibaba.dubbo.rpc.cluster.router.script.ScriptRouterFactory
condition=com.alibaba.dubbo.rpc.cluster.router.condition.ConditionRouterFactory
- RouterFactory在在com.alibaba.dubbo.rpc.cluster.RouterFactory中定義。
public class RouterFactory$Adaptive
implements RouterFactory {
public Router getRouter(URL uRL) {
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string = uRL2.getProtocol();
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Failed to get extension (org.apache.dubbo.rpc.cluster.RouterFactory) name from url (").append(uRL2.toString()).append(") use keys([protocol])").toString());
}
RouterFactory routerFactory = (RouterFactory)ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(string);
return routerFactory.getRouter(uRL);
}
}
route://0.0.0.0/org.apache.dubbo.demo.DemoService?
category=routers&compatible_config=true&dynamic=false
&enabled=true&force=true&name=null&priority=0&router=condition
&rule= => host != 172.22.3.91&runtime=false&version=20880
- ROUTER_FACTORY.getRouter(url)根據url中參數選擇具體的RouterFactory。
- url.setProtocol(routerType)将routerType作為protocol字段注入到URL當中,這裡routerType為router=condition。
- ROUTER_FACTORY對應的為ConditionRouterFactory。
- 通過routerFactory.getRouter(uRL)生成路由資訊。
路由添加流程
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {}
public abstract class AbstractDirectory<T> implements Directory<T> {
protected RouterChain<T> routerChain;
protected void addRouters(List<Router> routers) {
routers = routers == null ? Collections.emptyList() : routers;
// 添加路由到路由聯調
routerChain.addRouters(routers);
}
}
public class RouterChain<T> {
public void addRouters(List<Router> routers) {
List<Router> newRouters = new ArrayList<>();
// 添加内置路由
newRouters.addAll(builtinRouters);
// 添加動态配置路由
newRouters.addAll(routers);
// 按照路由優先級排序
CollectionUtils.sort(newRouters);
// 最終生成的有序路由規則
this.routers = newRouters;
}
}
public interface Router extends Comparable<Router> {
int DEFAULT_PRIORITY = Integer.MAX_VALUE;
@Override
default int compareTo(Router o) {
if (o == null) {
throw new IllegalArgumentException();
}
return Integer.compare(this.getPriority(), o.getPriority());
}
}
- RegistryDirectory的notify()方法内部通過使用下面方法生成路由并添加路由toRouters(routerURLs).ifPresent(this::addRouters)。
- addRouters的實作在RegistryDirectory的父類AbstractDirectory當中。
- routerChain.addRouters(routers)内部通過添加内置路由規則、動态配置路由規則、排序整體路由規則,最終生成最終路由規則。
- 路由規則的比較順序是按照優先級字段Priority來排序。
路由選擇過程
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
// 注入provider的invokers
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
// 省略無關代碼
// 注入provider的invokers
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
}
- provider側變更會觸發更新routerChain的invoker(服務提供者)。
- routerChain.setInvokers(this.invokers)負責更新routerChain的服務提供者。
- routerChain包含最新invokers,每次進行路由選擇的時候直接通路routerChain内部的invoker即可,起到緩存作用。
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {}
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// 擷取執行的invoker清單
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
return directory.list(invocation);
}
}
- 在cluster模式下以FailoverClusterInvoker為例,每次執行invoke()會最終執行directory.list()擷取invokers。
public abstract class AbstractDirectory<T> implements Directory<T> {
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
return doList(invocation);
}
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public List<Invoker<T>> doList(Invocation invocation) {
// 省略相關代碼
List<Invoker<T>> invokers = null;
try {
// Get invokers from cache, only runtime routers will be executed.
// 通過路由鍊去擷取符合路由規則的invoker對象
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
}
return invokers == null ? Collections.emptyList() : invokers;
}
}
- directory.list()最終會執行routerChain.route()進行路由選擇。
public class RouterChain<T> {
private List<Invoker<T>> invokers = Collections.emptyList();
public List<Invoker<T>> route(URL url, Invocation invocation) {
List<Invoker<T>> finalInvokers = invokers;
for (Router router : routers) {
finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}
}
- routerChain.route()會針對invokers周遊所有的路由政策并傳回符合要求的finalInvokers。
- routerChain本身維護了某服務下的所有invokers,通過路由過濾找到符合要求的finalInvokers傳回。