天天看点

dubbo源码之服务暴露服务暴露过程

Dubbo 作为rpc 封装的典型框架,我们其实要了解一个rpc调用大致过程,dubbo可以作为一个比较好学习资料。

dubbo源码之服务暴露服务暴露过程

这个是dubbo 调用的流程图,这篇文件主要要介绍第一步中的服务暴露过程,这次使用的源码为2.7.1

服务暴露过程

这里先将具体暴露过程贴出来,配合源码进行分析

dubbo源码之服务暴露服务暴露过程

我们在看源码的时候需要一个入口,dubbo的入口可以从ServiceBean 开始

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean,
 DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, 
 BeanNameAware, ApplicationEventPublisherAware {

           

这里涉及到Spring 相关内容了,主要是实现ApplicationContextAware、InitializingBean等接口。这两个是在spring生命周期中会先后调用,主要进行是一些初始化的操作,我们可以不关心。同时它实现了 ApplicationListener,这个是关于spring的监听器内容。这会在 Spring IOC 容器刷新完成后调用 onApplicationEvent 方法,而这个方法里面做的就是服务暴露,这就是服务暴露的启动点。

ServiceBean 中onApplicationEvent方法

public void onApplicationEvent(ContextRefreshedEvent event) {
 //是否没暴露过
        if (!this.isExported() && !this.isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + this.getInterface());
            }
//如果不是延迟暴露,并且没有暴露过,直接调用暴露方法
            this.export();
        }

    }
           

直接调用,父类暴露方法

dubbo源码之服务暴露服务暴露过程

父类的serviceConfig的export方法

public synchronized void export() {
    //检查更新配置
        this.checkAndUpdateSubConfigs();
        //判断是否需要暴露,并且是延迟暴露,如果延迟暴露通过延迟线程池进行暴露
        if (this.shouldExport()) {
            if (this.shouldDelay()) {
                delayExportExecutor.schedule(this::doExport, (long)this.delay, TimeUnit.MILLISECONDS);
            } else {
            //调用方法直接暴露
                this.doExport();
            }

        }
    }
           

serviceConfig的doExport方法

dubbo源码之服务暴露服务暴露过程

父类的serviceConfig的doExportUrls方法

private void doExportUrls() {
    //1、加载注册信息,构造url
        List<URL> registryURLs = this.loadRegistries(true);
        Iterator var2 = this.protocols.iterator();

        while(var2.hasNext()) {
            ProtocolConfig protocolConfig = (ProtocolConfig)var2.next();
            String pathKey = URL.buildKey((String)this.getContextPath(protocolConfig).map((p) -> {
                return p + "/" + this.path;
            }).orElse(this.path), this.group, this.version);
            ProviderModel providerModel = new ProviderModel(pathKey, this.ref, this.interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            //根据 URL 来进行服务暴露了
            this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }

    }
           
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = "dubbo";
        }
//将参数塞进map,构建url
        Map<String, String> map = new HashMap();
        map.put("side", "provider");
        appendRuntimeParameters(map);
        appendParameters(map, this.application);
        appendParameters(map, this.module);
        appendParameters(map, this.provider, "default");
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        String scope;
        Iterator metadataReportService;
        if (CollectionUtils.isNotEmpty(this.methods)) {
            Iterator var5 = this.methods.iterator();

            label161:
            while(true) {
                MethodConfig method;
                List arguments;
                do {
                    if (!var5.hasNext()) {
                        break label161;
                    }

                    method = (MethodConfig)var5.next();
                    appendParameters(map, method, method.getName());
                    String retryKey = method.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        scope = (String)map.remove(retryKey);
                        if ("false".equals(scope)) {
                            map.put(method.getName() + ".retries", "0");
                        }
                    }

                    arguments = method.getArguments();
                } while(!CollectionUtils.isNotEmpty(arguments));

                metadataReportService = arguments.iterator();

              、//... 省略部分代码(关于一些参数的判断)
            }
        }

    //..省略部分代码
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
            url = ((ConfiguratorFactory)ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol())).getConfigurator(url).configure(url);
        }

//获取scope为null,会进入判断进行本地暴露
        scope = url.getParameter("scope");
        if (!"none".equalsIgnoreCase(scope)) {
            if (!"remote".equalsIgnoreCase(scope)) {
            //进行本地暴露,默认javassist 代理对象,由于本地暴露和远程暴露流程大致一样,不再展开
                this.exportLocal(url);
            }

            if (!"local".equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + this.interfaceClass.getName() + " to url " + url);
                }

                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    metadataReportService = registryURLs.iterator();

                    while(metadataReportService.hasNext()) {
                        URL registryURL = (URL)metadataReportService.next();
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = this.loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded("monitor", monitorUrl.toFullString());
                        }

                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + this.interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }


			
                        String proxy = url.getParameter("proxy");
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter("proxy", proxy);
                        }
						//通过proxyFactory 将接口封装为invoker 的代理
                        Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        //根据协议对象进行远程暴露
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        this.exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    this.exporters.add(exporter);
                }

                metadataReportService = null;
                MetadataReportService metadataReportService;
                if ((metadataReportService = this.getMetadataReportService()) != null) {
                    metadataReportService.publishProvider(url);
                }
            }
        }

        this.urls.add(url);
    }
           
dubbo源码之服务暴露服务暴露过程

远程暴露

这个是远程暴露的大致流程

dubbo源码之服务暴露服务暴露过程

由于目前协议是registery,调用registeryProtocol#export 进行暴露

public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = this.getRegistryUrl(originInvoker);
        URL providerUrl = this.getProviderUrl(originInvoker);
        URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(providerUrl);
        RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker);
        this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        providerUrl = this.overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        
        RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker, providerUrl);
        //获取注册中心的URL 
        Registry registry = this.getRegistry(originInvoker);
        // 获取注册服务提供者的URL
        URL registeredProviderUrl = this.getRegisteredProviderUrl(providerUrl, registryUrl);
        //将提供者的信息注册到服务提供和消费者注册表中
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
        boolean register = registeredProviderUrl.getParameter("register", true);
        //如果需要注册
        if (register) {
        //想注册中心注册服务,这里是向zookeeper注册
            this.register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }
	//获取订阅url
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        return new RegistryProtocol.DestroyableExporter(exporter);
    }
           

调用registeryProtocol#export 之后会再调用DubboProtocol#export进行暴露

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//获取url
        URL url = invoker.getUrl();
        //这里构建的时候serverMap的key
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
        this.exporterMap.put(key, exporter);
        Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
        Boolean isCallbackservice = url.getParameter("is_callback_service", false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter("dubbo.stub.event.methods");
            if (stubServiceMethods != null && stubServiceMethods.length() != 0) {
                this.stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            } else if (this.logger.isWarnEnabled()) {
                this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
            }
        }
	//如果没有server创建server方法
        this.openServer(url);
        this.optimizeSerialization(url);
        return exporter;
    }

 private void openServer(URL url) {
        String key = url.getAddress();
        boolean isServer = url.getParameter("isserver", true);
        if (isServer) {
            ExchangeServer server = (ExchangeServer)this.serverMap.get(key);
            if (server == null) {
                synchronized(this) {
                    server = (ExchangeServer)this.serverMap.get(key);
                    if (server == null) {
                    //如果构建server为空,则创建server(这个方法是创建Netty监听器,这里不再跟了)
                        this.serverMap.put(key, this.createServer(url));
                    }
                }
            } else {
                server.reset(url);
            }
        }

    }
           

可以看到将provider信息已经注册到zookeeper节点上了

dubbo源码之服务暴露服务暴露过程

继续阅读