Dubbo之服務消費原理
前言
上篇文章《Dubbo之服務暴露》分析 Dubbo 服務是如何暴露的,本文接着分析 Dubbo 服務的消費流程。主要從以下幾個方面進行分析:注冊中心的暴露;通過注冊中心進行服務消費通知;直連服務進行消費。
服務消費端啟動時,将自身的資訊注冊到注冊中心的目錄,同時還訂閱服務提供方的目錄,當服務提供方的 URL 發生更改時,實時擷取新的資料。
服務消費端流程
下面是一個服務消費的流程圖:
上圖中可以看到,服務消費的流程與服務暴露的流程有點類似逆向的。同樣,Dubbo 服務也是分為兩個大步驟:第一步就是将遠端服務通過Protocol轉換成Invoker(概念在上篇文章中有解釋)。第二步通過動态代理将Invoker轉換成消費服務需要的接口。
org.apache.dubbo.config.ReferenceConfig類是ReferenceBean的父類,與生産端服務的ServiceBean一樣,存放着解析出來的 XML 和注解資訊。類關系如下:
服務初始化中轉換的入口
當我們消費端調用本地接口就能實作遠端服務的調用,這是怎麼實作的呢?根據上面的流程圖,來分析消費原理。
在消費端進行初始化時ReferenceConfig#init,會執行ReferenceConfig#createProxy來完成這一系列操作。以下為ReferenceConfig#createProxy主要的代碼部分:
private T createProxy(Map map) {
// 判斷是否為 Jvm 本地引用
if (shouldJvmRefer(map)) {
// 通過 injvm 協定,擷取本地服務
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
} else {
urls.clear();
// 判斷是否有自定義的直連位址,或注冊中心位址
if (url != null && url.length() > 0) {
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
// 如果是注冊中心Protocol類型,則向位址中添加 refer 服務消費中繼資料
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 直連服務提供端
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// 組裝注冊中心的配置
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
// 檢查配置中心
checkRegistry();
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
// 監控上報資訊
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 注冊中心位址添加 refer 服務消費中繼資料
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
}
}
// 隻有一條注冊中心資料,即單注冊中心
if (urls.size() == 1) {
// 将遠端服務轉化成 Invoker
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
// 因為多注冊中心就會存在多個 Invoker,這裡用儲存在 List 中
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 将每個注冊中心轉換成 Invoker 資料
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
// 會覆寫前周遊的注冊中心,使用最後一條注冊中心資料
registryURL = url;
}
}
if (registryURL != null) {
// 預設使用 zone-aware 政策來處理多個訂閱
URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
// 将轉換後的多個 Invoker 合并成一個
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else {
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
// 利用動态代理,将 Invoker 轉換成本地接口代理
return (T) PROXY_FACTORY.getProxy(invoker);
}
上面轉換的過程中,主要可概括為:先分為本地引用和遠端引用兩類。本地就是以 inJvm 協定的擷取本地服務,這不做過多說明;遠端引用分為直連服務和通過注冊中心。注冊中心分為單注冊中心和多注冊中心的情況,單注冊中心好解決,直接使用即可,多注冊中心時,将轉換後的 Invoker 合并成一個 Invoker。最後通過動态代理将 Invoker 轉換成本地接口代理。
擷取 Invoker 執行個體
由于本地服務時直接從緩存中擷取,這裡就注冊中心的消費進行分析,上面代碼片段中使用的是REF_PROTOCOL.refer進行轉換,該方法代碼:
public Invoker refer(Class type, URL url) throws RpcException {
// 擷取服務的注冊中心url,裡面會設定注冊中心的協定和移除 registry 的參數
url = getRegistryUrl(url);
// 擷取注冊中心執行個體
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 擷取服務消費中繼資料
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
// 從服務消費中繼資料中擷取分組資訊
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// 執行 Invoker 轉換工作
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 執行 Invoker 轉換工作
return doRefer(cluster, registry, type, url);
上面主要是擷取服務消費的注冊中心執行個體和進行服務分組,最後調用doRefer方法進行轉換工作,以下為doRefer的代碼:
private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {
// 建立 RegistryDirectory 對象
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 設定注冊中心
directory.setRegistry(registry);
// 設定協定
directory.setProtocol(protocol);
// directory.getUrl().getParameters() 是服務消費中繼資料
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 消費消息注冊到注冊中心
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 服務消費者訂閱:服務提供端,動态配置,路由的通知
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// 多個Invoker合并為一個
Invoker invoker = cluster.join(directory);
return invoker;
上面實作主要是完成建立 RegistryDirectory 對象,将消費服務中繼資料注冊到注冊中心,通過 RegistryDirectory 對象裡的資訊,實作服務提供端,動态配置及路由的訂閱相關功能。
RegistryDirectory 這個類實作了 NotifyListener 這個通知監聽接口,當訂閱的服務,配置或路由發生變化時,會接收到通知,進行相應改變:
public synchronized void notify(List urls) {
// 将服務提供方配置,路由配置,服務提供方的服務分别以不同的 key 儲存在 Map 中
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
// 更新服務提供方配置
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
// 更新路由配置
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// 加載服務提供方的服務資訊
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getUrl(),this);
}
}
// 重新加載 Invoker 執行個體
refreshOverrideAndInvoker(providerURLs);
RegistryDirectory#notify裡面最後會重新整理 Invoker 進行重新加載,下面是核心代碼的實作:
private void refreshOverrideAndInvoker(List urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
// 重新整理 invoker
refreshInvoker(urls);
private void refreshInvoker(List invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
......
} else {
// 重新整理之前的 Invoker
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
// 加載新的 Invoker Map
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
// 擷取新的 Invokers
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// 緩存新的 Invokers
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
try {
// 通過新舊 Invokers 對比,銷毀無用的 Invokers
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
擷取重新整理前後的 Invokers,将新的 Invokers 重新緩存起來,通過對比,銷毀無用的 Invoker。
上面将 URL 轉換 Invoker 是在RegistryDirectory#toInvokers中進行。
private Map> toInvokers(List urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
// 過濾消費端不比對的協定,及非法協定
......
// 合并服務提供端配置資料
URL url = mergeUrl(providerUrl);
// 過濾重複的服務提供端配置資料
String key = url.toFullString();
if (keys.contains(key)) {
continue;
}
keys.add(key);
// 緩存鍵是不與使用者端參數合并的url,無論使用者如何合并參數,如果伺服器url更改,則再次引用
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
// 緩存無對應 invoker,再次調用 protocol#refer 是否有資料
if (invoker == null) {
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
// 将新的 Invoker 緩存起來
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
// 緩存裡有資料,則進行重新覆寫
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
總結
通過《Dubbo之服務暴露》和本文兩篇文章對 Dubbo 服務暴露和服務消費原理的了解。我們可以看到,不管是暴露還是消費,Dubbo 都是以 Invoker 為資料交換主體進行,通過對 Invoker 發起調用,實作一個遠端或本地的實作。
原文位址
https://www.cnblogs.com/ytao-blog/p/12551251.html