天天看點

源碼分析Dubbo服務注冊與發現機制RegistryDirectory)

   RegistryDirectory,基于注冊中心的服務發現,本文将重點探讨Dubbo是如何實作服務的自動注冊與發現。從上篇文章,得知在消息消費者在建立服務調用器(Invoker)【消費者在初始時】時需要根據不同的協定,例如dubbo、registry(從注冊中心擷取服務提供者)來建構,其調用的方法為Protocol#refer,基于注冊中心發現服務提供者的實作協定為RegistryProtocol。

   RegistryProtocol#refer ----> doRefer方法。

   RegistryProtocol#doRefer

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {    // @1
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);   // @2
        directory.setRegistry(registry);
        directory.setProtocol(protocol);   // @3
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());   // @4
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);  // @5
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }   // @6
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));     // @7

        Invoker invoker = cluster.join(directory);    // @8
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);     // @9
        return invoker;
    }           

   代碼@1:參數詳解

  • Cluster cluster:叢集政策。
  • Registry registry:注冊中心實作類。
  • Class type:引用服務名,dubbo:reference interface。
  • URL url:注冊中心URL。

   代碼@2:建構RegistryDirectory對象,基于注冊中心動态發現服務提供者(服務提供者新增或減少),本節重點會剖析該類的實作細節。

   代碼@3:為RegistryDirectory設定注冊中心、協定。

   代碼@4:擷取服務消費者的配置屬性。

   代碼@5:建構消費者URL,例如:

consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185           

   代碼@6:向注冊中心消息消費者:

consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185           

   相比第5步的URL,增加了category=consumers、check=false,其中category表示在注冊中心的命名空間,這裡代表消費端。該步驟的作用就是向注冊中心為服務增加一個消息消費者,其生成的效果如下:【以zookeeper為例】。

   代碼@7:為消息消費者添加category=providers,configurators,routers屬性後,然後向注冊中心訂閱該URL,關注該服務下的providers,configurators,routers發生變化時通知RegistryDirectory,以便及時發現服務提供者、配置、路由規則的變化。

consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185           

   其訂閱關系調用的入口為:RegistryDirectory#subscribe方法,是接下來需要重點分析的重點。

   代碼@8:根據Directory,利用叢集政策傳回叢集Invoker。

   代碼@9:緩存服務消費者、服務提供者對應關系。

   從這裡發現,服務的注冊與發現與RegistryDirectory聯系非常緊密,接下來讓我們來詳細分析RegistryDirectory的實作細節。

1、RegistryDirectory類圖

  • private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
    叢集政策,預設為failover。           
  • private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader (RouterFactory.class).getAdaptiveExtension()路由工廠,可以通過監控中心或治理中心配置。
  • private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();配置實作工廠類。
  • private final String serviceKey; 服務key,預設為服務接口名。com.alibaba.dubbo.registry.RegistryService,注冊中心在Dubbo中也是使用服務暴露。
  • private final Class< T > serviceType;服務提供者接口類,例如interface com.alibaba.dubbo.demo.DemoService
  • private final Map< String, String> queryMap:服務消費者URL中的所有屬性。
  • private final URL directoryUrl;注冊中心URL,隻保留消息消費者URL查詢屬性,也就是queryMap。
  • private final String[] serviceMethods:引用服務提供者方法數組。
  • private final boolean multiGroup:是否引用多個服務組。
  • private Protocol protocol:協定。
  • private Registry registry:注冊中心實作者。
  • private volatile List< Configurator> configurators;配置資訊。
  • private volatile Map< String, Invoker< T>> urlInvokerMap; 服務URL對應的Invoker(服務提供者調用器)。
  • private volatile Map< String, List< Invoker< T>>> methodInvokerMap; methodName : List< Invoker< T >>,
    dubbo:method 對應的Invoker緩存表。           
  • private volatile Set< URL > cachedInvokerUrls; 目前緩存的所有URL提供者URL。

2、RegistryDirectory 構造方法詳解

public RegistryDirectory(Class<T> serviceType, URL url) {    // @1
        super(url);
        if (serviceType == null)
            throw new IllegalArgumentException("service type is null.");
        if (url.getServiceKey() == null || url.getServiceKey().length() == 0)
            throw new IllegalArgumentException("registry serviceKey is null.");
        this.serviceType = serviceType;  
        this.serviceKey = url.getServiceKey();     // @2
        this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));  // @3
        this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY); //@4
        String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
        this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
        String methods = queryMap.get(Constants.METHODS_KEY);
        this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);   // @5
    }           

   代碼@1:參數描述,serviceType:消費者引用的服務< dubbo:reference interface="" .../>;URL url:注冊中心的URL,例如:

zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=5552&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5552%26qos.port%3D33333%26register.ip%3D192.168.56.1%26side%3Dconsumer%26timestamp%3D1528379076123&timestamp=1528379076179           

   代碼@2:擷取注冊中心URL的serviceKey:com.alibaba.dubbo.registry.RegistryService。

   代碼@3:擷取注冊中心URL消費提供者的所有配置參數:從url屬性的refer。

   代碼@4:初始化haulovverrideDirecotryUrl、directoryUrl:注冊中心的URL,移除監控中心以及其他屬性值,隻保留消息消費者的配置屬性。

   代碼@5:擷取服務消費者單獨配置的方法名dubbo:method。

3、RegistryDirectory#subscribe

public void subscribe(URL url) {
     setConsumerUrl(url);   // @1
     registry.subscribe(url, this); // @2
}           

   代碼@1:設定RegistryDirectory的consumerUrl為消費者URL。

   代碼@2:調用注冊中心訂閱消息消息消費者URL,首先看一下接口Registry#subscribe的接口聲明:

RegistryService:void subscribe(URL url, NotifyListener listener); 這裡傳入的NotifyListener為RegistryDirectory,其注冊中心的subscribe方法暫時不深入去跟蹤,不過根據上面URL上面的特點,應該能猜出如下實作關鍵點:

consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185           
  • 根據消息消費者URL,擷取服務名。
  • 根據category=providers、configurators、routers,分别在該服務名下的providers目錄、configurators目錄、routers目錄建立事件監聽,監聽該目錄下節點的建立、更新、删除事件,然後一旦事件觸發,将回調RegistryDirectory#void notify(List< URL> urls)。

4、RegistryDirectory#notify

   首先該方法是在注冊中心providers、configurators、routers目錄下的節點發生變化後,通知RegistryDirectory,已便更新最新資訊,實作”動态“發現機制。

   RegistryDirectory#notify

List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
     String protocol = url.getProtocol();    // @1 
     String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);   // @2
     if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {   // @3
           routerUrls.add(url);
      } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {   // @4
           configuratorUrls.add(url);
     } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {    // @5
           invokerUrls.add(url);
     } else {
          logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + 
                NetUtils.getLocalHost());
     }
}           

   Step1:根據通知的URL的字首,分别添加到:invokerUrls(提供者url)、routerUrls(路由資訊)、configuratorUrls (配置url)。

   代碼@1:從url中擷取協定字段,例如condition://、route://、script://、override://等。

   代碼@2:擷取url的category,在注冊中心的指令空間,例如:providers、configurators、routers。

   代碼@3:如果category等于routers或協定等于route,則添加到routerUrls中。

   代碼@4:如果category等于configurators或協定等于override,則添加到configuratorUrls中。

   代碼@5:如果category等于providers,則表示服務提供者url,加入到invokerUrls中。

// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
    this.configurators = toConfigurators(configuratorUrls);
}           

   Step2:将configuratorUrls轉換為配置對象List< Configurator> configurators,該方法将在《源碼分析Dubbo配置規則實作細節》一文中詳細講解。

// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
      List<Router> routers = toRouters(routerUrls);
      if (routers != null) { // null - do nothing
            setRouters(routers);
      }
}           

   Step3:将routerUrls路由URL轉換為Router對象,該部分内容将在《源碼分析Dubbo路由機制實作細節》一文中詳細分析。

// providers
refreshInvoker(invokerUrls);           

   Step4:根據回調通知重新整理服務提供者集合。

5、RegistryDirectory#refreshInvoker

   RegistryDirectory#refreshInvoker

if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
         && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
     this.forbidden = true; // Forbid to access
     this.methodInvokerMap = null; // Set the method invoker map to null
     destroyAllInvokers(); // Close all invokers
}            

   Step1:如果invokerUrls不為空并且長度為1,并且協定為empty,表示該服務的所有服務提供者都下線了。需要銷毀目前所有的服務提供者Invoker。

this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
        invokerUrls.addAll(this.cachedInvokerUrls);
} else {
      this.cachedInvokerUrls = new HashSet<URL>();
      this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
      return;
}           

   Step2: 如果invokerUrls為空,并且已緩存的invokerUrls不為空,将緩存中的invoker url複制到invokerUrls中,這裡可以說明如果providers目錄未發送變化,invokerUrls則為空,表示使用上次緩存的服務提供者URL對應的invoker;如果invokerUrls不為空,則用iinvokerUrls中的值替換原緩存的invokerUrls,這裡說明,如果providers發生變化,invokerUrls中會包含此時注冊中心所有的服務提供者。如果invokerUrls為空,則無需處理,結束本次更新服務提供者Invoker操作。

Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map           

   Step3:将invokerUrls轉換為對應的Invoke,然後根據服務級的url:invoker映射關系建立method:List< Invoker>映射關系,将在下文相信分析。

this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
        destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
        logger.warn("destroyUnusedInvokers error. ", e);
}           

   Step4:如果支援multiGroup機制,則合并methodInvoker,将在下文分析,然後根據toInvokers、toMethodInvokers重新整理目前最新的服務提供者資訊。

6、RegistryDirectory#toInvokers

   RegistryDirectory#toInvokers

String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
    // ...
}           

   Step1:擷取消息消費者URL中的協定類型,< dubbo:reference protocol="" .../>屬性值,然後周遊所有的Invoker Url(服務提供者URL)。

if (queryProtocols != null && queryProtocols.length() > 0) {
       boolean accept = false;
       String[] acceptProtocols = queryProtocols.split(",");
       for (String acceptProtocol : acceptProtocols) {
            if (providerUrl.getProtocol().equals(acceptProtocol)) {
                  accept = true;
                  break;
            }
       }
      if (!accept) {
            continue;
      }
}           

   Step2: 從這一步開始,代碼都包裹在for(URL providerUrl : urls)中,一個一個處理提供者URL。如果dubbo:referecnce标簽的protocol不為空,則需要對服務提供者URL進行過濾,比對其協定與protocol屬性相同的服務,如果不比對,則跳過後續處理邏輯,接着處理下一個服務提供者URL。

if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
      continue;
}           

   Step3:如果協定為empty,跳過,處理下一個服務提供者URL。

if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
       logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to 
                 consumer " + NetUtils.getLocalHost()
                        + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
   continue;
}           

   Step4:驗證服務提供者協定,如果不支援,則跳過。

URL url = mergeUrl(providerUrl);           

   Step5:合并URL中的屬性,其具體實作細節如下:

  1. 消費端屬性覆寫生産者端屬性(配置屬性消費者端優先生産者端屬性),其具體實作方法:ClusterUtils.mergeUrl(providerUrl, queryMap),其中queryMap為消費端屬性。
    a、首先移除隻在服務提供者端生效的屬性(線程池相關):threadname、default.threadname、threadpool、default.threadpool、corethreads、default.corethreads、threads、default.threads、queues、default.queues、alive、default.alive、transporter、default.transporter,服務提供者URL中的這些屬性來源于dubbo:protocol、dubbo:provider。
    b、用消費端配置屬性覆寫服務端屬性。
    c、如下屬性以服務端優先:dubbo(dubbo資訊)、version(版本)、group(服務組)、methods(服務方法)、timestamp(時間戳)。
    d、合并服務端,消費端Filter,其配置屬性(reference.filter),傳回結果為:provider#reference.filter,
    consumer#reference.filter。
    e、合并服務端,消費端Listener,其配置屬性(invoker.listener),傳回結果為:provider#invoker.listener,consumer#invoker.listener。           
  2. 合并configuratorUrls 中的屬性,我們現在應該知道,dubbo可以在監控中心或管理端(dubbo-admin)覆寫覆寫服務提供者的屬性,其使用協定為override,該部分的實作邏輯見:《源碼分析Dubbo配置規則機制(override協定)》
  3. 為服務提供者URL增加check=false,預設隻有在服務調用時才檢查服務提供者是否可用。
  4. 重新複制overrideDirectoryUrl,providerUrl在進過第一步參數合并後(包含override協定覆寫後的屬性)指派給overrideDirectoryUrl。
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
      continue;
}
keys.add(key);           

   Step6:擷取url所有屬性構成的key,該key也是RegistryDirectory中Map> urlInvokerMap;中的key。

Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
   try {
         boolean enabled = true;
         if (url.hasParameter(Constants.DISABLED_KEY)) {
               enabled = !url.getParameter(Constants.DISABLED_KEY, false);
          } else {
               enabled = url.getParameter(Constants.ENABLED_KEY, true);
           }
           if (enabled) {
                invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
           }
     } catch (Throwable t) {
               logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
      }
      if (invoker != null) { // Put new invoker in cache
            newUrlInvokerMap.put(key, invoker);
      }
} else {
       newUrlInvokerMap.put(key, invoker);
}           

   Step7:如果localUrlInvokerMap中未包含invoker并且該provider狀态為啟用,則建立該URL對應的Invoker,并添加到newUrlInvokerMap中。toInvokers運作結束後,回到refreshInvoker方法中繼續往下執行,根據 最新的服務提供者映射關系Map< String,Invoker>,建構Map< String,List< Invoker>>,其中鍵為methodName。然後更新RegistryDirectory的urlInvokerMap、methodInvokerMap屬性,并銷毀老的Invoker對象,完成一次路由發現過程。

   上面整個過程完成了一次動态服務提供者發現流程,下面再分析一下RegistryDirectory的另外一個重要方法,doList,再重複一遍RegistryDirectory的作用,服務提供者目錄服務,在叢集Invoker的實作中,内部持有一個Direcotry對象,在進行服務調用之前,首先先從衆多的Invoker中選擇一個來執行,那衆多的Invoker從哪來呢?其來源于叢集Invoker中會調用Direcotry的public List< Invoker< T>> list(Invocation invocation),首先将調用AbstractDirectory#list方法,然後再内部調用doList方法,doList方法有其子類實作。

7、RegistryDirectory#doList(Invocation invocation) 方法詳解

   RegistryDirectory#doList

if (forbidden) {
      // 1. No service provider 2. Service providers are disabled
      throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
            "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                  + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}           

   Step1:如果禁止通路(如果沒有服務提供者,或服務提供者被禁用),則抛出沒有提供者異常。

Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
    String methodName = RpcUtils.getMethodName(invocation);
    Object[] args = RpcUtils.getArguments(invocation);
    if (args != null && args.length > 0 && args[0] != null
             && (args[0] instanceof String || args[0].getClass().isEnum())) {
          invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
     }
    if (invokers == null) {
          invokers = localMethodInvokerMap.get(methodName);
    }
    if (invokers == null) {
          invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
     }
     if (invokers == null) {
         Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
         if (iterator.hasNext()) {
              invokers = iterator.next();
          }
     }
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;           

   Step2:根據方法名稱,從Map< String,List< Invoker>>這個集合中找到合适的List< Invoker>,如果方法名未命中,則傳回所有的Invoker,localMethodInvokerMap中方法名,主要是dubbo:service的子标簽dubbo:method,最終傳回invokers。

   本文詳細介紹了服務消費者基于注冊中心的服務發現機制,其中對routers(路由)與configurators(override協定)并未詳細展開,下節先重點分析configurators與routers(路由)實作細節。

   總結一下服務注冊與發現機制:

   基于注冊 中心的事件通知(訂閱與釋出),一切支援事件訂閱與釋出的架構都可以作為Dubbo注冊中心的選型。

  1. 服務提供者在暴露服務時,會向注冊中心注冊自己,具體就是在${service interface}/providers目錄下添加 一個節點(臨時),服務提供者需要與注冊中心保持長連接配接,一旦連接配接斷掉(重試連接配接)會話資訊失效後,注冊中心會認為該服務提供者不可用(提供者節點會被删除)。
  2. 消費者在啟動時,首先也會向注冊中心注冊自己,具體在${interface interface}/consumers目錄下建立一個節點。
  3. 消費者訂閱${service interface}/ [ providers、configurators、routers ]三個目錄,這些目錄下的節點删除、新增事件都胡通知消費者,根據通知,重構服務調用器(Invoker)。

   以上就是Dubbo服務注冊與動态發現機制的原理與實作細節。

原文釋出時間為:2019-02-28

本文作者:丁威,《RocketMQ技術内幕》作者。

本文來自

中間件興趣圈

,了解相關資訊可以關注

繼續閱讀