天天看点

Dubbo源码分析四、服务导出(1)

关于服务导出,首先先琢磨一下服务导出时要做什么呢? 我这里整理两点:

  1. 服务注册(注册的啥?Invoker!)
  2. 开启服务端(本文以netty4为例)

那么在分析源码时能跟到这两个点就基本上算完成了。

dubbo-config-spring 针对Spring提供的接入包

先看@EnableDubbo注解。在我们的启动类上增加@EnableDubbo注解即打开Dubbo

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@EnableDubboConfig
@DubboComponentScan
public @interface EnableDubbo {
    @AliasFor(annotation = DubboComponentScan.class, attribute = "basePackages")
    String[] scanBasePackages() default {};

    @AliasFor(annotation = DubboComponentScan.class, attribute = "basePackageClasses")
    Class<?>[] scanBasePackageClasses() default {};

    @AliasFor(annotation = EnableDubboConfig.class, attribute = "multiple")
    boolean multipleConfig() default true;

}
           

这是个复合注解,看一下EnableDubboConfig的定义:

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Import(DubboConfigConfigurationRegistrar.class)
public @interface EnableDubboConfig {
    boolean multiple() default true;

}
           

通过@Import引入了一个注册器DubboConfigConfigurationRegistrar

public class DubboConfigConfigurationRegistrar implements ImportBeanDefinitionRegistrar {

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

        AnnotationAttributes attributes = AnnotationAttributes.fromMap(
                importingClassMetadata.getAnnotationAttributes(EnableDubboConfig.class.getName()));

        boolean multiple = attributes.getBoolean("multiple");

        // Single Config Bindings
        registerBeans(registry, DubboConfigConfiguration.Single.class);

        if (multiple) { // Since 2.6.6 https://github.com/apache/dubbo/issues/3193
            registerBeans(registry, DubboConfigConfiguration.Multiple.class);
        }

        // Since 2.7.6
        registerCommonBeans(registry);
    }
}
           

先是根据multiple的值来注册配置类,看一下registerCommonBeans这个方法

static void registerCommonBeans(BeanDefinitionRegistry registry) {

    // Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean
    // @DubboReference的后置处理器,用于对引入该注解的属性注入
    // 服务引入部分从这里进入
    registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
            ReferenceAnnotationBeanPostProcessor.class);

    // Since 2.7.4 [Feature] https://github.com/apache/dubbo/issues/5093
    // dubbo的后置处理器,id注册为别名 alias
    registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME,
            DubboConfigAliasPostProcessor.class);

    // Since 2.7.5 Register DubboLifecycleComponentApplicationListener as an infrastructure Bean
    // Dubbo的生命周期管理类监听,当spring refresh、close时触发监听事件。
    // 这里有几个实现了LifeCycle接口的类,通过该监听器管理对应的生命周期。
    // 关注这里,这是基于spring生命周期管理自己框架中具备生命周期的对象。与spring 同生共死
    registerInfrastructureBean(registry, DubboLifecycleComponentApplicationListener.BEAN_NAME,
            DubboLifecycleComponentApplicationListener.class);

    // Since 2.7.4 Register DubboBootstrapApplicationListener as an infrastructure Bean
    // 这里实际上也是加了一个spring生命周期的监听器。 BootStrap管理dubbo的启动和销毁
    // BootStrap的start方法,dubbo启动处理。服务导出、注册等操作在这里实现。
    registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME,
            DubboBootstrapApplicationListener.class);

    // Since 2.7.6 Register DubboConfigDefaultPropertyValueBeanPostProcessor as an infrastructure Bean
    // 设置config的id、name字段的默认值为对应的beanName
    registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME,
            DubboConfigDefaultPropertyValueBeanPostProcessor.class);
}
           

这里就是注册了几个处理器和监听器。我们主要关注DubboBootstrapApplicationListener这个监听器

@Override
    public void onApplicationContextEvent(ApplicationContextEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }
           

监听ContextRefreshedEvent和ContextClosedEvent事件。

private void onContextRefreshedEvent(ContextRefreshedEvent event) {
    dubboBootstrap.start();
}

private void onContextClosedEvent(ContextClosedEvent event) {
    dubboBootstrap.stop();
}
           

通过DubboBootStrap管理启动和停止

DubboBootStrap的start方法

public DubboBootstrap start() {
    // 原子操作
    if (started.compareAndSet(false, true)) {
        ready.set(false);
        // 初始化
        initialize();
        if (logger.isInfoEnabled()) {
            logger.info(NAME + " is starting...");
        }
        // 1. export Dubbo Services
        // services导出
        exportServices();

        // Not only provider register
        if (!isOnlyRegisterProvider() || hasExportedServices()) {
            // 2. export MetadataService
            exportMetadataService();
            //3. Register the local ServiceInstance if required
            registerServiceInstance();
        }

        // 服务引入
        referServices();
        if (asyncExportingFutures.size() > 0) {
            new Thread(() -> {
                try {
                    this.awaitFinish();
                } catch (Exception e) {
                    logger.warn(NAME + " exportAsync occurred an exception.");
                }
                ready.set(true);
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " is ready.");
                }
            }).start();
        } else {
            ready.set(true);
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is ready.");
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info(NAME + " has started.");
        }
    }
    return this;
}
           

前半部分是服务导出,后半部分是服务引入。我们先看服务导出部分

private void exportServices() {

    // ServiceBean就是一个config,每个config在实例化时有一个@PostConstruct方法定义初始化完成后执行,执行时会向configManager中注册这个config信息
    // AbstractConfig 的 addIntoConfigManager方法
    // 所以这里执行时,是DubboBootstrapApplicationListener在spring容器触发refresh事件时执行。此时bean已经初始化完成,这里可以直接获取到所有的services
    // 遍历所有services
    configManager.getServices().forEach(sc -> {
        // TODO, compatible with ServiceConfig.export()
        ServiceConfig serviceConfig = (ServiceConfig) sc;
        serviceConfig.setBootstrap(this);

        // 如果是异步导出
        if (exportAsync) {
            // spi扩展执行器,未配置的情况下使用默认执行器 DefaultExecutorRepository
            // DefaultExecutorRepository注册的serviceExporterExecutor 任务线程池,只执行一次
            // 创建的执行器是ThreadPool的扩展,默认是FixedThreadPool,创建固定大小线程池
            ExecutorService executor = executorRepository.getServiceExporterExecutor();
            Future<?> future = executor.submit(() -> {
                sc.export();
                exportedServices.add(sc);
            });
            // future获取执行结果, 等待处理
            asyncExportingFutures.add(future);
        } else {
            // 同步导出,直接导出
            sc.export();
            exportedServices.add(sc);
        }
    });
}
           

区分是同步导出还是异步导出。异步的情况就是起一个任务执行。 这里我们直接看export导出方法

public synchronized void export() {
    // 判断导出是否需要导出。条件如下:
    // (export == null && provider != null) ? provider.getExport() : export;
    // export == null ? true : export;
    // 即优先provider的export配置,如果provider未配置export,则使用service的 export
    // 如果service也未配置export,则默认为true
    if (!shouldExport()) {
        return;
    }

    // 判断bootstrap是否存在,如不存在则初始化
    if (bootstrap == null) {
        bootstrap = DubboBootstrap.getInstance();
        bootstrap.initialize();
    }

    // 验证service的配置是否正确
    checkAndUpdateSubConfigs();

    //init serviceMetadata
    // service 元信息设置
    serviceMetadata.setVersion(getVersion());
    serviceMetadata.setGroup(getGroup());
    serviceMetadata.setDefaultGroup(getGroup());
    serviceMetadata.setServiceType(getInterfaceClass());
    serviceMetadata.setServiceInterfaceName(getInterface());
    serviceMetadata.setTarget(getRef());

    if (shouldDelay()) {
        // 如果需要延迟导出,丢到延迟导出线程池处理
        DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    } else {
        // 执行导出操作
        doExport();
    }

    // 导出后发布ServiceConfigExportedEvent事件
    exported();
}
           

前面就是各种配置检查,设置元数据,然后有一个属性delay,确定是否需要延迟导出的。如果delay>0,说明要延迟,丢到任务池里,延迟时间就是delay秒。

导出完成后调用exported方法触发ServiceConfigExportedEvent事件

这里我们直接看doExport

protected synchronized void doExport() {
    // 防止二次导出
    if (unexported) {
        throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
    }
    if (exported) {
        return;
    }
    exported = true;

    if (StringUtils.isEmpty(path)) {
        path = interfaceName;
    }
    doExportUrls();
}
           

然后是doExportUrls方法:

private void doExportUrls() {
    ServiceRepository repository = ApplicationModel.getServiceRepository();
    ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
    // 注册providerModel
    repository.registerProvider(
            getUniqueServiceName(),
            ref,
            serviceDescriptor,
            this,
            serviceMetadata
    );

    // 加载注册中心链接
    List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

    // 遍历 protocols,并在每个协议下导出服务 到每个注册中心
    for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getContextPath(protocolConfig)
                .map(p -> p + "/" + path)
                .orElse(path), group, version);
        // In case user specified path, register service one more time to map it to path.
        repository.registerService(pathKey, interfaceClass);
        // TODO, uncomment this line once service key is unified
        serviceMetadata.setServiceKey(pathKey);
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}
           

有多协议多注册中心的情况,这里遍历每一个协议protocol,然后执行doExportUrlsFor1Protocol,将protocol导出到所有注册中心去。

看一下doExportUrlsFor1Protocol这个方法:

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();
    // 如果协议名为空 则将协议名变量设置为 dubbo
    if (StringUtils.isEmpty(name)) {
        name = DUBBO;
    }

    Map<String, String> map = new HashMap<String, String>();
    // 添加 side
    map.put(SIDE_KEY, PROVIDER_SIDE);

    ServiceConfig.appendRuntimeParameters(map);

    // 通过反射将对象的字段信息添加到 map 中
    /**
     * AbstractConfig.appendParameters 这个方法出现的次数比较多,该方法用于将对象字段信息添加到 map 中。
     * 实现上则是通过反射获取目标对象的 getter 方法,并调用该方法获取属性值。
     * 然后再通过 getter 方法名解析出属性名,比如从方法名 getName 中可解析出属性 name。
     */
    AbstractConfig.appendParameters(map, getMetrics());
    AbstractConfig.appendParameters(map, getApplication());
    AbstractConfig.appendParameters(map, getModule());
    // remove 'default.' prefix for configs from ProviderConfig
    // appendParameters(map, provider, Constants.DEFAULT_KEY);
    AbstractConfig.appendParameters(map, provider);
    AbstractConfig.appendParameters(map, protocolConfig);
    AbstractConfig.appendParameters(map, this);
    MetadataReportConfig metadataReportConfig = getMetadataReportConfig();

    // 添加metadata-type
    if (metadataReportConfig != null && metadataReportConfig.isValid()) {
        map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
    }
    ... 省略部分代码
}
           

这部分是构造map的,这个map最终是为了生成URL对象。加了metrics、application、module、provider、protocolConfig、this(ServiceConfig)。注意

AbstractConfig.appendParameters这个方法,通过getter方法来往map里添加属性。这里有一个@Parameter注解的使用。

doExportUrlsFor1Protocol方法继续往下看:

...省略部分代码
// methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
if (CollectionUtils.isNotEmpty(getMethods())) {
    // 这段代码用于添加 Callback 配置到 map 中
    for (MethodConfig method : getMethods()) {
        // 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。
        // 比如存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig,
        // 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}
        AbstractConfig.appendParameters(map, method, method.getName());
        String retryKey = method.getName() + ".retry";
        if (map.containsKey(retryKey)) {
            String retryValue = map.remove(retryKey);
            // 检测 MethodConfig retry 是否为 false,若是,则设置重试次数为0
            if ("false".equals(retryValue)) {
                map.put(method.getName() + ".retries", "0");
            }
        }
        // 获取 ArgumentConfig 列表
        List<ArgumentConfig> arguments = method.getArguments();
        if (CollectionUtils.isNotEmpty(arguments)) {
            // 遍历方法参数--这里是ArgumentConfig,已经被解析了
            for (ArgumentConfig argument : arguments) {
                // convert argument type
                // 检测 type 属性是否为空,或者空串(分支1 ⭐️)
                if (argument.getType() != null && argument.getType().length() > 0) {
                    Method[] methods = interfaceClass.getMethods();
                    // visit all methods
                    if (methods.length > 0) {
                        for (int i = 0; i < methods.length; i++) {
                            String methodName = methods[i].getName();
                            // target the method, and get its signature
                            // 比对方法名,查找目标方法 --这里是遍历Method,找到和当前MethodConfig相对应的Method对象
                            if (methodName.equals(method.getName())) {
                                Class<?>[] argtypes = methods[i].getParameterTypes();
                                // one callback in the method
                                if (argument.getIndex() != -1) {
                                    // 检测 ArgumentConfig 中的 type 属性与方法参数列表
                                    // 中的参数名称是否一致,不一致则抛出异常(分支2 ⭐️)  保证config和真实的类型一致
                                    if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                        // 添加 ArgumentConfig 字段信息到 map 中,
                                        // 键前缀 = 方法名.index,比如:
                                        // map = {"sayHello.3.callback": true}
                                        AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                    } else {
                                        throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                    }
                                } else {// 分支3 ⭐️
                                    // 没有定义index,通过类型名称匹配 -- 这里是否会有相同类型名的参数?
                                    // multiple callbacks in the method
                                    for (int j = 0; j < argtypes.length; j++) {
                                        Class<?> argclazz = argtypes[j];
                                        // 从参数类型列表中查找类型名称为 argument.type 的参数
                                        if (argclazz.getName().equals(argument.getType())) {
                                            AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
                                            if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                // 用户未配置 type 属性,但配置了 index 属性,且 index != -1
                } else if (argument.getIndex() != -1) { // 分支4 ⭐️
                    // 添加 ArgumentConfig 字段信息到 map 中
                    AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                } else {
                    throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                }

            }
        }
    } // end of methods for
}
...省略部分代码
           

这一段逻辑很绕,简单来说就是检测method和配置,并将相关配置添加到 map 中。

下半部分是针对方法参数的配置解析,里面嵌套比较多,这里给个伪代码:

// 获取 ArgumentConfig 列表
for (遍历 ArgumentConfig 列表) {
    if (type 不为 null,也不为空串) {    // 分支1
        1. 通过反射获取 interfaceClass 的方法列表
        for (遍历方法列表) {
            1. 比对方法名,查找目标方法
        	2. 通过反射获取目标方法的参数类型数组 argtypes
            if (index != -1) {    // 分支2
                1. 从 argtypes 数组中获取下标 index 处的元素 argType
                2. 检测 argType 的名称与 ArgumentConfig 中的 type 属性是否一致
                3. 添加 ArgumentConfig 字段信息到 map 中,或抛出异常
            } else {    // 分支3
                1. 遍历参数类型数组 argtypes,查找 argument.type 类型的参数
                2. 添加 ArgumentConfig 字段信息到 map 中
            }
        }
    } else if (index != -1) {    // 分支4
		1. 添加 ArgumentConfig 字段信息到 map 中
    }
}
           

继续往下看:

... 省略部分代码
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

// You can customize Configurator to append extra parameters
// 配置扩展 对url进一步扩展
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
        .hasExtension(url.getProtocol())) {
    url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}

// 作用域
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
// 如果 scope = none,则什么都不做
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

    // export to local if the config is not remote (export to remote only when config is remote)
    // scope != remote,导出到本地
    if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
        exportLocal(url);
    }
    // export to remote if the config is not local (export to local only when config is local)
    // scope != local,导出到远程
    if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
        if (CollectionUtils.isNotEmpty(registryURLs)) {
            for (URL registryURL : registryURLs) {
                //if protocol is only injvm ,not register
                if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                    continue;
                }
                url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                // 加载监视器链接
                URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                if (monitorUrl != null) {
                    // 将监视器链接作为参数添加到 url 中
                    url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                }
                if (logger.isInfoEnabled()) {
                    if (url.getParameter(REGISTER_KEY, true)) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    } else {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                }

                // For providers, this is used to enable custom proxy to generate invoker
                String proxy = url.getParameter(PROXY_KEY);
                if (StringUtils.isNotEmpty(proxy)) {
                    // registryURL添加proxy扩展
                    registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                }

                // 为服务提供类(ref)生成 Invoker
                /**
                 * Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,
                 * 它代表一个可执行体,可向它发起 invoke 调用,
                 * 它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
                 * Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory
                 *
                 * ref属性在DubboComponentScanRegistrar定义ServiceBean的BeanDefinition时注入进去了,注入的实例就是原类的实例,比如测试用的DemoServiceImpl实例
                 * 所以wapper.invokeMethod时,传入的实例对象就是我们自己定义的实现类对象,
                 * 这里默认得到的invoker是AbstractProxyInvoker。运行时调用invoke方法->doInvoke->wapper.invokeMethod->demoService.sayHello
                 */
                Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                // 导出服务,并生成 Exporter
                // 由于wrapperInvoker里面的url是registryUrl 这里的PROTOCOL是一个RegistryProtocol
                // export时会做三件事
                // 1、本地导出(实际上就是netty开启服务,绑定端口)
                // 本地导出的过程中,url不再是registry协议,转换成dubbo协议,然后里面的protocol.export就转到了DubboProtocol.export
                // 2、注册中心注册服务
                // 3 向注册中心订阅注册的服务
                Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                exporters.add(exporter);
            }
        } else {
            // 不存在注册中心,仅导出服务
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

            // 此时的URL是 dubbo:// 这里只做上面的第一步
            Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
            exporters.add(exporter);
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         * metadata-type
         */
        WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
        if (metadataService != null) {
            metadataService.publishServiceDefinition(url);
        }
    }
}
this.urls.add(url);
... 省略部分代码
           

上面代码根据 url 中的 scope 参数决定服务导出方式,分别如下:

  • scope = none,不导出服务
  • scope != remote,导出到本地
  • scope != local,导出到远程

不管是导出到本地,还是远程。进行服务导出之前,均需要先创建 Invoker,这个Invoker是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用。而我们服务导出导出的东西就是这个对象,因此Invoker对象是如何创建的是很重要的。下篇详细分析Invoker的创建