開篇
整個Dubbo Consumer的引用過程比較複雜,這部分的文章會比較多,這篇文章的目的是描述Consumer的訂閱過程,側重于Consumer發現Provider的URL并生成對應的invoker的過程。
在這篇文章中,主要分為兩個部分講解,第一部分RegistryProtocol的refer過程側重于描述RegistryProtocol的流程,ZookeeperRegistry的subscribe過程側重于服務訂閱過程。
RegistryProtocol的refer過程
- RegistryProtocol調用refer() => doRefer()方法。
- RegistryProtocol的registry是ZookeeperRegistry對象。
public class RegistryProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
// application=dubbo-demo-api-consumer&dubbo=2.0.2&pid=58968
// &refer=application=dubbo-demo-api-consumer&dubbo=2.0.2
// &interface=org.apache.dubbo.demo.DemoService
// &lazy=false&methods=sayHello&pid=58968
// ®ister.ip=172.17.32.176&side=consumer&sticky=false
// ×tamp=1571824631224®istry=zookeeper×tamp=1571824632206
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
// registry為ZookeeperRegistry對象
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 執行doRefer動作
return doRefer(cluster, registry, type, url);
}
- RegistryProtocol的doRefer()執行一系列核心步驟,在代碼中都備有注釋。
- 建立RegistryDirectory對象。
- 生成服務消費者連結并在consumers目錄下新增節點注冊服務消費者。
- 建立路由規則鍊。
- 訂閱 providers、configurators、routers 等節點資料。
- 将多個服務提供者合并為一個invoker。
- 重點關注訂閱 providers、configurators、routers 等節點資料的流程
public class RegistryProtocol implements Protocol {
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 建立RegistryDirectory執行個體
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 設定注冊中心
directory.setRegistry(registry);
// 設定協定
directory.setProtocol(protocol);
// 所有屬性放到map中
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服務消費者連結
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注冊服務消費者,在 consumers 目錄下新節點
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
// 建立路由規則鍊
directory.buildRouterChain(subscribeUrl);
// 訂閱 providers、configurators、routers 等節點資料
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// 一個注冊中心可能有多個服務提供者,是以這裡需要将多個服務提供者合并為一個,生成一個invoker
Invoker invoker = cluster.join(directory);
// 在服務提供者處注冊消費者
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}
- RegistryDirectory中的registry是ZookeeperRegistry對象。
- ZookeeperRegistry中subscribe(url, this)方法中将RegistryDirectory對象作為NotifyListener參數。
- 執行ZookeeperRegistry對象的subscribe()方法,至此進入了服務訂閱過程。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
// registry為ZookeeperRegistry對象。
registry.subscribe(url, this);
}
}
ZookeeperRegistry的subscribe過程
- ZookeeperRegistry的整個subscribe過程涉及到ZookeeperRegistry、FailbackRegistry 、AbstractRegistry這幾個類,整個訂閱過程如下圖所示。
- 從圖中可以發現父類FailbackRegistry作為執行subscribe()的入口,真正執行的是ZookeeperRegistry的doSubscribe()方法。

- ZookeeperRegistry、FailbackRegistry 、AbstractRegistry三者的類關系如下圖,涉及到父類子類具體實作方法的調用。
- 從ZookeeperRegistry訂閱時序圖可以看出來訂閱首先執行FailbackRegistry的subscribe()方法。
- FailbackRegistry的subscribe()方法調用子類的ZookeeperRegistry的doSubscribe()方法。
public abstract class FailbackRegistry extends AbstractRegistry {
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// 執行訂閱操作
doSubscribe(url, listener);
} catch (Exception e) {
}
}
}
- ZookeeperRegistry的doSubscribe()主要執行訂閱操作,核心和zookeeper的特性比較相關。
- 擷取待訂閱的path資訊,包括providers,configurators,routers三類路徑。
- 針對每個待訂閱的path,會針對path增加children次元的監聽。
- 針對每個待訂閱的path,會一次性擷取children的變量并進入notify()流程。
- zkListeners先根據consumer的URL次元 和 listener次元唯一确定Zookeeper子節點監聽器ChildListener對象。
- ZookeeperRegistry的path的子節點監聽器ChildListener回調中執行的ZookeeperRegistry.this.notify()方法。
- 針對處理path下的children的URL路徑會通過toUrlsWithEmpty()方法進行比對,擷取比對的URL進行處理。
- 首次擷取及後續回調都是執行ZookeeperRegistry.this.notify()方法。
- notify()的urls是指訂閱path下所有符合要求的urls,通過toUrlsWithEmpty()進行比對。
public class ZookeeperRegistry extends FailbackRegistry {
// 變量url的值
// consumer://172.17.32.176/org.apache.dubbo.demo.DemoService?
// application=dubbo-demo-api-consumer&category=providers,configurators,routers&dubbo=2.0.2
// &interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello
// &pid=58968&side=consumer&sticky=false×tamp=1571824631224
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
// 暫時不關注這部分邏輯
} else {
List<URL> urls = new ArrayList<>();
// 處理providers、configurators、routers等路徑
// /dubbo/org.apache.dubbo.demo.DemoService/providers
// /dubbo/org.apache.dubbo.demo.DemoService/configurators
// /dubbo/org.apache.dubbo.demo.DemoService/routers
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// 建立zk節點變化回調監聽器
listeners.putIfAbsent(listener,
(parentPath, currentChilds) -> ZookeeperRegistry.this.notify(
url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
// 建立path對應的節點
zkClient.create(path, false);
// 添加path下的children的監聽
List<String> children = zkClient.addChildListener(path, zkListener);
// 處理path下的children
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 通知回調notify動作
notify(url, listener, urls);
}
} catch (Throwable e) {
}
}
}
- ZookeeperRegistry.notify()方法會調用父類FailbackRegistry.notify()方法進而執行doNotify()方法。
- FailbackRegistry.doNotify()方法會調用父類AbstractRegistry.notify()。
public abstract class FailbackRegistry extends AbstractRegistry {
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
try {
doNotify(url, listener, urls);
} catch (Exception t) {
addFailedNotified(url, listener, urls);
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
}
- AbstractRegistry的notify()的urls是指訂閱path下所有符合要求的urls。
- 将所有的urls按照url的category包括(providers,configurators,routers)三類。
- 将所有的urls按照分組進行回調處理。
- 回調的listener為RegistryDirectory對象,實作了NotifyListener接口。
public abstract class AbstractRegistry implements Registry {
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
// 符合要求的URL按照category作為分組key的Map。
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
// 同一類category的URL進行回調,譬如providers的URL一并進行回調。
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 調用監聽回調
listener.notify(categoryList);
// 儲存URL資訊
saveProperties(url);
}
}
}
RegistryDirectory的notify過程
- 按照configurators、routers、providers組裝成List對象。
- 針對configurators執行toConfigurators()動作。
- 針對routers執行toRouters()動作。
- 針對providers執行refreshOverrideAndInvoker()動作。
- 每個細節後面分篇幅繼續講解。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}
}