天天看點

Dubbo Consumer 直連和注冊中心服務引用流程

開篇

 這篇文章的目的在于描述Dubbo Consumer在直連和注冊中心兩種場景下針對provider側invoker的封裝。整篇文章主要從單注冊中心、單直連位址、多注冊中心、多直連位址的角度進行分析。

 通過這篇文章能夠了解到Consumer側針對invoker的生成流程,通過invoker的生成可以了解invoker的調用鍊。

Consumer reference流程

public class ReferenceConfig<T> extends AbstractReferenceConfig {

    private static final Protocol REF_PROTOCOL = 
        ExtensionLoader.getExtensionLoader(Protocol.class)
       .getAdaptiveExtension();

    private T createProxy(Map<String, String> map) {
        if (shouldJvmRefer(map)) {
            // 省略無關代碼
        } else {
            urls.clear(); 
            // 處理reference配置直連情況
            if (url != null && url.length() > 0) { 
                // 處理逗号分隔的直連位址
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            // 添加直連位址到urls
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else {
                // 處理注冊中心的情況
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
                    checkRegistry();
                    // 擷取注冊中心
                    List<URL> us = loadRegistries(false);
                    // 添加所有注冊中心到urls
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                    }
                }
            }

            // 處理單注冊中心或者隻有單個直連的情況
            if (urls.size() == 1) {
                // 單注冊中心協定為"registry",直連場景下協定為"dubbo"
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else { // 處理多注冊中心或者隻有多個直連的情況
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();

                URL registryURL = null;
                for (URL url : urls) {
                    // 單注冊中心協定為"registry",直連場景下協定為"dubbo"
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }

                if (registryURL != null) {
                    // 針對多注冊中心的方式,通過RegistryAwareCluster進行封裝
                    URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
                    // RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                } else { // 處理多直連方式
                    invoker = CLUSTER.join(new StaticDirectory(invokers));
                }
            }
        }

        if (shouldCheck() && !invoker.isAvailable()) {
            // 處理invoker可用檢查的邏輯
            throw new IllegalStateException("Failed to check the status of the service " 
            + interfaceName + ". No provider available for the service " 
            + (group == null ? "" : group + "/") + interfaceName 
            + (version == null ? "" : ":" + version) + " from the url "  
            + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() 
            + " use dubbo version " + Version.getVersion());
        }

        MetadataReportService metadataReportService = null;
        if ((metadataReportService = getMetadataReportService()) != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataReportService.publishConsumer(consumerURL);
        }

        // create service proxy
        return (T) PROXY_FACTORY.getProxy(invoker);
    }
}           
  • 區分單注冊中心、單直連位址、多注冊中心、多直連位址的四種情況生成invoker。
  • "urls.size() == 1"的條件用于處理單注冊中心和單直連位址兩種情況。
  • "urls.size() != 1"的條件用于處理多注冊中心和多直連位址兩種情況。

Protocol擴充卡

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {

    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        
        org.apache.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );

        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        // 擷取擴充
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        // 執行擴充的refer方法
        return extension.refer(arg0, arg1);
    }
}           
com.alibaba.dubbo.rpc.Protocol檔案

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper           
  • Protocol$Adaptive根據url.getProtocol()的Protocol協定生成對應的Protocol對象。
  • Protocol檔案内容如上圖所示。

Cluster擴充卡

public class Cluster$Adaptive implements Cluster {

    public Invoker join(Directory directory) throws RpcException {
        
        if (directory == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument == null");
        }
        
        if (directory.getUrl() == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument getUrl() == null");
        }
        
        URL uRL = directory.getUrl();
        String string = uRL.getParameter("cluster", "failover");
        
        if (string == null) {
            throw new IllegalStateException(new StringBuffer()
            .append("Failed to get extension (org.apache.dubbo.rpc.cluster.Cluster) name from url (")
            .append(uRL.toString()).append(") use keys([cluster])").toString());
        }
        
        Cluster cluster = (Cluster)ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(string);

        return cluster.join(directory);
    }
}           
com.alibaba.dubbo.rpc.cluster.Cluster檔案

mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
registryaware=org.apache.dubbo.rpc.cluster.support.RegistryAwareCluster           
  • Cluster$Adaptive根據uRL.getParameter("cluster", "failover")的Cluster協定生成對應的Cluster對象。
  • Cluster檔案内容如上圖所示。

單注冊中心或直連位址場景

處理單注冊中心場景

public class RegistryProtocol implements Protocol {

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = URLBuilder.from(url)
                .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
                .removeParameter(REGISTRY_KEY)
                .build();
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }

        return doRefer(cluster, registry, type, url);
    }

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

        // invoker為MockClusterInvoker=>FailoverClusterInvoker
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);

        return invoker;
    }
}           
Dubbo Consumer 直連和注冊中心服務引用流程
  • 單注冊中心場景下,urls.get(0)的協定是”registry“,Protocol$Adaptive傳回的被封裝的Protocol對象為RegistryProtocol。
  • ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry")傳回的extension為ProtocolListenerWrapper對象,ProtocolListenerWrapper的封裝鍊為ProtocolListenerWrapper => ProtocolFilterWrapper => RegistryProtocol。
  • extension.refer()過程按照ProtocolListenerWrapper.refer() => ProtocolFilterWrapper.refer() => RegistryProtocol.refer()流程調用。
  • ProtocolListenerWrapper.refer() => ProtocolFilterWrapper.refer() => RegistryProtocol.refer()的調用鍊路針對"registry"作了特殊處理,直接走RegistryProtocol.refer()的方法。
  • RegistryProtocol.refer()執行cluster.join(directory)生成MockClusterInvoker對象,封裝關系為MockClusterInvoker=>FailoverClusterInvoker。
  • "registry"協定傳回的invoker對象封裝關系為MockClusterInvoker => FailoverClusterInvoker。
  • MockClusterInvoker的invoke()方法會執行FailoverClusterInvoker的doInvoke()方法,進入Dubbo的Cluster叢集調用政策。

處理單直連位址場景

public class DubboProtocol extends AbstractProtocol {

    public static final String NAME = "dubbo";

    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }
}

public abstract class AbstractProtocol implements Protocol {

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

    protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
}


public class AsyncToSyncInvoker<T> implements Invoker<T> {

    private Invoker<T> invoker;

    public AsyncToSyncInvoker(Invoker<T> invoker) {
        this.invoker = invoker;
    }
}           
Dubbo Consumer 直連和注冊中心服務引用流程
  • 單直連位址場景下,urls.get(0)的協定為"dubbo",Protocol$Adaptive傳回的被封裝的Protocol對象為DubboProtocol。
  • ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo")傳回的extension為ProtocolListenerWrapper對象,ProtocolListenerWrapper的封裝鍊為ProtocolListenerWrapper => ProtocolFilterWrapper => DubboProtocol。
  • extension.refer()過程按照ProtocolListenerWrapper.refer() => ProtocolFilterWrapper.refer() => DubboProtocol.refer()流程調用。
  • DubboProtocol.refer()執行AbstractProtocol.refer()方法生成AsyncToSyncInvoker對象, AsyncToSyncInvoker對象内部包含DubboInvoker對象。
  • ”dubbo“協定的invoker對象封裝關系為ListenerInvokerWrapper => ProtocolFilterWrapper => AsyncToSyncInvoker。
  • AsyncToSyncInvoker的invoke()方法會執行DubboProtocol的invoke()方法。

多注冊中心或多直連位址場景

List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();

URL registryURL = null;
for (URL url : urls) {
    // 單注冊中心協定為"registry",直連場景下協定為"dubbo"
    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
    if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        registryURL = url; // use last registry url
    }
}

if (registryURL != null) {
    // 針對注冊中心的方式,通過RegistryAwareCluster進行封裝
    URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
    // RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // 處理直連方式
    invoker = CLUSTER.join(new StaticDirectory(invokers));
}           
  • 多注冊中心場景下invokers為MockClusterInvoker對象清單,CLUSTER.join()方法的Cluster為"failover"。
  • 多直連位址場景下invokers為ListenerInvokerWrapper對象清單,CLUSTER.join()方法的Cluster為"failover"。

處理多注冊中心場景

  • 多注冊中心場景下每個注冊中心對應MockClusterInvoker對象,在外層有一層MockClusterInvoker包裝。
  • 多注冊中心場景下通過MockClusterInvoker包裝多個注冊中心中每個注冊中心對應的MockClusterInvoker。
Dubbo Consumer 直連和注冊中心服務引用流程

處理多直接位址場景

  • 多直連位址場景下每個直連位址對應ListenerInvokerWrapper對象,在外層有一層MockClusterInvoker包裝。
  • 多直連位址場景下通過MockClusterInvoker包裝直連位址對應的ListenerInvokerWrapper對象清單。
Dubbo Consumer 直連和注冊中心服務引用流程

總結

  • 在單直連位址場景下:invoker對象為ListenerInvokerWrapper。
  • 在多直連位址場景下:invoker對象為MockClusterInvoker,内部包含ListenerInvokerWrapper對象。
  • 在單注冊中心場景下:invoker對象為MockClusterInvoker。
  • 在多注冊中心場景下:invoker對象為MockClusterInvoker,MockClusterInvoker内部包含注冊中心對應的MockClusterInvoker對象,相當于在多注冊中心情況下,每個注冊中心對應一個MockClusterInvoker對象,外部通過MockClusterInvoker進行二次封裝。

繼續閱讀