天天看點

dubbo - 服務釋出

概述

 最近做了個服務化項目,基于dubbo項目的,是以抽空把原本想看dubbo源碼的願望給實作了,等把dubbo的事情了解了再繼續去完善flume部分的源碼閱讀。

 這一章主要是講dubbo服務釋出的過程,其實dubbo服務的釋出我認為其實有三個階段:階段一是指解析xml配置檔案;階段二是spring如何生成bean對象;階段三是實作bean的服務釋出。其中階段二由于涉及到spring本身的bean初始化,這塊暫時沒有能力去講解清楚。是以努力将其他兩個階段講解清楚。

 參考了很多其他人的文章,先把概念性的東西最先寫出來,結合着後面的具體過程分析會比較好了解,這裡共享了《

益文的圈

》的圖檔,特意備注出來。

基礎概念普及

Invoker

public interface Invoker<T> extends Node {

    Class<T> getInterface();
    Result invoke(Invocation invocation) throws RpcException;
}

           

可執行對象的抽象,能夠根據方法的名稱、參數得到相應的執行結果。

Invoker可分為三類:

  • AbstractProxyInvoker:本地執行類的Invoker,實際通過Java反射的方式執行原始對象的方法。
  • AbstractInvoker:遠端通信類的Invoker,實際通過通信協定發起遠端調用請求并接收響應。
  • AbstractClusterInvoker:多個遠端通信類的Invoker聚合成的叢集版Invoker,加入了叢集容錯和負載均衡政策。
    dubbo - 服務釋出
    invoker

Invocation:包含了需要執行的方法和參數等重要資訊,他有兩個實作類RpcInvocation和MockInvocation。

ProxyFactory

public interface ProxyFactory {

    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}
           

服務接口代理抽象,用于生成一個接口的代理類。

  • getInvoker方法:針對Server端,将服務對象(如DemoServiceImpl)包裝成一個Invoker對象。
  • getProxy方法:針對Client端,建立接口(如DemoService)的代理對象。

Exporter

public interface Exporter<T> {

    Invoker<T> getInvoker();
    void unexport();
}

           

維護Invoker的生命周期,内部包含Invoker或者ExporterMap。

Protocol

@SPI("dubbo")
public interface Protocol {

    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

   
    void destroy();

}

           

協定抽象接口。封裝RPC調用。

  • exporter方法:暴露遠端服務(用于服務端),就是将Invoker對象通過協定暴露給外部。
  • refer方法:引用遠端服務(用于用戶端),通過Clazz、url等資訊建立遠端的動态代理Invoker。

關系圖

dubbo - 服務釋出
  • ServiceConfig包含ProxyFactory和Protocol,通過SPI的方式注入生成;
  • ProxyFactory負責建立Invoker;
  • Protocol負責通過Invoker生成Exporter,将服務啟動并暴露;

dubbo的注冊中心位址

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
application=demo-provider&dubbo=2.0.0&pid=1908&registry=zookeeper&timestamp=1527741226496
           

dubbo的服務釋出位址

dubbo://172.17.32.44:20880/com.alibaba.dubbo.demo.DemoService?
anyhost=true&application=demo-provider&bind.ip=172.17.32.44&bind.port=20880&dubbo=2.0.0&generic=false&
interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1908&side=provider&timestamp=1527741226515
           

XML配置解析

 dubbo的demo中的基本XML配置如下,基本上我們可以看到核心的幾個配置包括dubbo:application、dubbo:registry、dubbo:protocol、dubbo:service,我們的解析過程就是對這個XML檔案進行了解析。

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- provider's application name, used for tracing dependency relationship -->
    <dubbo:application name="demo-provider"/>

    <!-- use multicast registry center to export service -->
    <dubbo:registry protocol="zookeeper" address="127.0.0.1:2181"/>

    <!-- use dubbo protocol to export service on port 20880 -->
    <dubbo:protocol name="dubbo" port="20881"/>

    <!-- service implementation, as same as regular local bean -->
    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>

    <!-- declare the service interface to be exported -->
    <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" delay="-1"/>

</beans>
           

 XML配置解析的過程主要是解析XML檔案并生成BeanDefinition,這個BeanDefinition就是用于spring初始化bean執行個體的對象。DubboNamespaceHandler主要用于注冊各個模型的解析類,其中的application、module、provider、consumer、service等都是dubbo的schema關鍵字。

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    }

}
           

 DubboBeanDefinitionParser類作為解析XML的核心,通過parse方法傳回BeanDefinition對象。這裡Element和beanClass是一一對應的,解析dubbo:application标簽對應的beanClass為ApplicationConfig.class,解析dubbo:service标簽對應的beanClass為ServiceBean.class。下面代碼裡面标準了step_x用來解析每一步具體執行的内容。當然這裡需要考慮每個類的繼承關系,因為子類會繼承父類的的方法。

private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
        //step_1 建立RootBeanDefinition對象并設定對應的beanClass。
        RootBeanDefinition beanDefinition = new RootBeanDefinition();
        beanDefinition.setBeanClass(beanClass);
        beanDefinition.setLazyInit(false);

        //step_2 生成id的邏輯集中在這塊
        String id = element.getAttribute("id");
        if ((id == null || id.length() == 0) && required) {
            String generatedBeanName = element.getAttribute("name");
            if (generatedBeanName == null || generatedBeanName.length() == 0) {
                if (ProtocolConfig.class.equals(beanClass)) {
                    generatedBeanName = "dubbo";
                } else {
                    generatedBeanName = element.getAttribute("interface");
                }
            }
            if (generatedBeanName == null || generatedBeanName.length() == 0) {
                generatedBeanName = beanClass.getName();
            }
            id = generatedBeanName;
            int counter = 2;
            while (parserContext.getRegistry().containsBeanDefinition(id)) {
                id = generatedBeanName + (counter++);
            }
        }
        if (id != null && id.length() > 0) {
            if (parserContext.getRegistry().containsBeanDefinition(id)) {
                throw new IllegalStateException("Duplicate spring bean id " + id);
            }
            parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
            beanDefinition.getPropertyValues().addPropertyValue("id", id);
        }

        //step_3 針對不同的beanClass做特殊字段的解析
        if (ProtocolConfig.class.equals(beanClass)) {
            for (String name : parserContext.getRegistry().getBeanDefinitionNames()) {
                BeanDefinition definition = parserContext.getRegistry().getBeanDefinition(name);
                PropertyValue property = definition.getPropertyValues().getPropertyValue("protocol");
                if (property != null) {
                    Object value = property.getValue();
                    if (value instanceof ProtocolConfig && id.equals(((ProtocolConfig) value).getName())) {
                        definition.getPropertyValues().addPropertyValue("protocol", new RuntimeBeanReference(id));
                    }
                }
            }
        } else if (ServiceBean.class.equals(beanClass)) {
            String className = element.getAttribute("class");
            if (className != null && className.length() > 0) {
                RootBeanDefinition classDefinition = new RootBeanDefinition();
                classDefinition.setBeanClass(ReflectUtils.forName(className));
                classDefinition.setLazyInit(false);
                parseProperties(element.getChildNodes(), classDefinition);
                beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
            }
        } else if (ProviderConfig.class.equals(beanClass)) {
            parseNested(element, parserContext, ServiceBean.class, true, "service", "provider", id, beanDefinition);
        } else if (ConsumerConfig.class.equals(beanClass)) {
            parseNested(element, parserContext, ReferenceBean.class, false, "reference", "consumer", id, beanDefinition);
        }

        //step_4 針對beanClass通過反射擷取所有的set方法擷取所有需要解析的屬性放置在Set<String> props當中。
        //beanDefinition.getPropertyValues().addPropertyValue将所有參數儲存到beanDefinition當中。
        Set<String> props = new HashSet<String>();
        ManagedMap parameters = null;
        for (Method setter : beanClass.getMethods()) {
            String name = setter.getName();
            if (name.length() > 3 && name.startsWith("set")
                    && Modifier.isPublic(setter.getModifiers())
                    && setter.getParameterTypes().length == 1) {
                Class<?> type = setter.getParameterTypes()[0];
                String property = StringUtils.camelToSplitName(name.substring(3, 4).toLowerCase() + name.substring(4), "-");
                props.add(property);
                Method getter = null;
                try {
                    getter = beanClass.getMethod("get" + name.substring(3), new Class<?>[0]);
                } catch (NoSuchMethodException e) {
                    try {
                        getter = beanClass.getMethod("is" + name.substring(3), new Class<?>[0]);
                    } catch (NoSuchMethodException e2) {
                    }
                }
                if (getter == null
                        || !Modifier.isPublic(getter.getModifiers())
                        || !type.equals(getter.getReturnType())) {
                    continue;
                }
                if ("parameters".equals(property)) {
                    parameters = parseParameters(element.getChildNodes(), beanDefinition);
                } else if ("methods".equals(property)) {
                    parseMethods(id, element.getChildNodes(), beanDefinition, parserContext);
                } else if ("arguments".equals(property)) {
                    parseArguments(id, element.getChildNodes(), beanDefinition, parserContext);
                } else {
                    String value = element.getAttribute(property);
                    if (value != null) {
                        value = value.trim();
                        if (value.length() > 0) {
                            if ("registry".equals(property) && RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(value)) {
                                RegistryConfig registryConfig = new RegistryConfig();
                                registryConfig.setAddress(RegistryConfig.NO_AVAILABLE);
                                beanDefinition.getPropertyValues().addPropertyValue(property, registryConfig);
                            } else if ("registry".equals(property) && value.indexOf(',') != -1) {
                                parseMultiRef("registries", value, beanDefinition, parserContext);
                            } else if ("provider".equals(property) && value.indexOf(',') != -1) {
                                parseMultiRef("providers", value, beanDefinition, parserContext);
                            } else if ("protocol".equals(property) && value.indexOf(',') != -1) {
                                parseMultiRef("protocols", value, beanDefinition, parserContext);
                            } else {
                                Object reference;
                                if (isPrimitive(type)) {
                                    if ("async".equals(property) && "false".equals(value)
                                            || "timeout".equals(property) && "0".equals(value)
                                            || "delay".equals(property) && "0".equals(value)
                                            || "version".equals(property) && "0.0.0".equals(value)
                                            || "stat".equals(property) && "-1".equals(value)
                                            || "reliable".equals(property) && "false".equals(value)) {
                                        // backward compatibility for the default value in old version's xsd
                                        value = null;
                                    }
                                    reference = value;
                                } else if ("protocol".equals(property)
                                        && ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(value)
                                        && (!parserContext.getRegistry().containsBeanDefinition(value)
                                        || !ProtocolConfig.class.getName().equals(parserContext.getRegistry().getBeanDefinition(value).getBeanClassName()))) {
                                    if ("dubbo:provider".equals(element.getTagName())) {
                                        logger.warn("Recommended replace <dubbo:provider protocol=\"" + value + "\" ... /> to <dubbo:protocol name=\"" + value + "\" ... />");
                                    }
                                    // backward compatibility
                                    ProtocolConfig protocol = new ProtocolConfig();
                                    protocol.setName(value);
                                    reference = protocol;
                                } else if ("onreturn".equals(property)) {
                                    int index = value.lastIndexOf(".");
                                    String returnRef = value.substring(0, index);
                                    String returnMethod = value.substring(index + 1);
                                    reference = new RuntimeBeanReference(returnRef);
                                    beanDefinition.getPropertyValues().addPropertyValue("onreturnMethod", returnMethod);
                                } else if ("onthrow".equals(property)) {
                                    int index = value.lastIndexOf(".");
                                    String throwRef = value.substring(0, index);
                                    String throwMethod = value.substring(index + 1);
                                    reference = new RuntimeBeanReference(throwRef);
                                    beanDefinition.getPropertyValues().addPropertyValue("onthrowMethod", throwMethod);
                                } else {
                                    if ("ref".equals(property) && parserContext.getRegistry().containsBeanDefinition(value)) {
                                        BeanDefinition refBean = parserContext.getRegistry().getBeanDefinition(value);
                                        if (!refBean.isSingleton()) {
                                            throw new IllegalStateException("The exported service ref " + value + " must be singleton! Please set the " + value + " bean scope to singleton, eg: <bean id=\"" + value + "\" scope=\"singleton\" ...>");
                                        }
                                    }
                                    reference = new RuntimeBeanReference(value);
                                }
                                beanDefinition.getPropertyValues().addPropertyValue(property, reference);
                            }
                        }
                    }
                }
            }
        }
        NamedNodeMap attributes = element.getAttributes();
        int len = attributes.getLength();
        for (int i = 0; i < len; i++) {
            Node node = attributes.item(i);
            String name = node.getLocalName();
            if (!props.contains(name)) {
                if (parameters == null) {
                    parameters = new ManagedMap();
                }
                String value = node.getNodeValue();
                parameters.put(name, new TypedStringValue(value, String.class));
            }
        }
        if (parameters != null) {
            beanDefinition.getPropertyValues().addPropertyValue("parameters", parameters);
        }
        return beanDefinition;
    }
           

bean初始化過程

 整個bean的執行個體化是由spring内部實作的(暫時未來得及分析其中過程),但是執行個體化後收尾的初始化工作其實是在dubbo當中完成的,我們就暫時忽略其中的spring初始化bean的過程直接分析後面的的過程。service的釋出過程集中在ServiceBean類當中。這裡我們關注onApplicationEvent()和afterPropertiesSet()方法。這裡先補充下bean的初始化過程。

1、啟動容器
2、調用BeanFactoryPostProcessor的postProcessBeanFactory()方法進行工廠後處理
3、getBean()調用某一個Bean
4、調用InstantiationAwareBeanPostProcessor 的postProcessBeforeInstantiation()方法。
5、執行個體化Bean(構造函數、工廠方法)
6、調用InstantiationAwareBeanPostProcessor 的postProcessAfterInstantiation()方法。
7、調用InstantiationAwareBeanPostProcessor 的postProcessPropertyValues()方法。
8、設定Bean的屬性值
9、調用BeanNameAware的setBeanName()方法
10、調用BeanFactoryAware的setBeanFactory()方法
11、調用ApplicationContextAware的setApplicationContext()方法
12、調用BeanPostProcessor的postProcessBeforeInitialization()方法
13、調用InitializingBean的afterPropertiesSet()方法
14、調用init-method初始化方法
15、調用BeanPostProcessor的postProcessAfterInitialization()方法
16、如果singleton則緩存bean,如果prototype則傳回bean。
17、如果singleton則在容器銷毀的時候調用DisposableBean的destroy()方法
18、如果singleton則在容器銷毀的時候調用destory-method方法
           

 跟進ServiceBean.java類,主要是關注onApplicationEvent和afterPropertiesSet和setApplicationContext方法,其中setApplicationContext方法要早于afterPropertiesSet方法執行。

 setApplicationContext負責把spring的上下文注入到目前bean當中,afterPropertiesSet負責初始化該bean,onApplicationEvent負責把bean釋出成service。

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, 
             DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {

    private final transient Service service;

    private transient ApplicationContext applicationContext;

    private transient String beanName;

    public ServiceBean() {
        super();
        this.service = null;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
    }

    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            //todo 這裡的核心初始化bean後的動作,會進入ServiceConfig當中去執行
            export();
        }
    }

    private boolean isDelay() {
        Integer delay = getDelay();
        ProviderConfig provider = getProvider();
        if (delay == null && provider != null) {
            delay = provider.getDelay();
        }
        return supportedApplicationListener && (delay == null || delay == -1);
    }

    @SuppressWarnings({"unchecked", "deprecation"})
    public void afterPropertiesSet() throws Exception {
    }

    public void destroy() throws Exception {
        unexport();
    }
}
           

 ServiceBean的初始化過程關鍵在于afterPropertiesSet,内部主要擷取provider、application、module、registries、monitor、protocols、path等對象并注入到bean當中,整個過去方法其實就是從spring的上下文中擷取對象,BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false)方法就是用來實作這個功能的。

 代碼末尾的export其實本身是沒有執行的,真正執行export是在onApplicationEvent當中實作的,是以我們後面需要跟蹤核心的export過程。

public void afterPropertiesSet() throws Exception {
            
            //step_1 擷取provider設定到bean當中
            if (getProvider() == null) {
            //todo 從上下文擷取是否有對應的對象
            Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
            if (providerConfigMap != null && providerConfigMap.size() > 0) {
                Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
                if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
                        && providerConfigMap.size() > 1) { // backward compatibility
                    List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
                    for (ProviderConfig config : providerConfigMap.values()) {
                        if (config.isDefault() != null && config.isDefault().booleanValue()) {
                            providerConfigs.add(config);
                        }
                    }
                    if (providerConfigs.size() > 0) {
                        setProviders(providerConfigs);
                    }
                } else {
                    ProviderConfig providerConfig = null;
                    for (ProviderConfig config : providerConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (providerConfig != null) {
                                throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
                            }
                            providerConfig = config;
                        }
                    }
                    if (providerConfig != null) {
                        setProvider(providerConfig);
                    }
                }
            }
        }

         //step_2 擷取application設定到bean當中
        if (getApplication() == null
                && (getProvider() == null || getProvider().getApplication() == null)) {
            Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
            if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
                ApplicationConfig applicationConfig = null;
                for (ApplicationConfig config : applicationConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        if (applicationConfig != null) {
                            throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
                        }
                        applicationConfig = config;
                    }
                }
                if (applicationConfig != null) {
                    setApplication(applicationConfig);
                }
            }
        }

         //step_3 擷取module設定到bean當中
        if (getModule() == null
                && (getProvider() == null || getProvider().getModule() == null)) {
            Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
            if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
                ModuleConfig moduleConfig = null;
                for (ModuleConfig config : moduleConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        if (moduleConfig != null) {
                            throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
                        }
                        moduleConfig = config;
                    }
                }
                if (moduleConfig != null) {
                    setModule(moduleConfig);
                }
            }
        }

        //step_4 擷取registries設定到bean當中
        if ((getRegistries() == null || getRegistries().size() == 0)
                && (getProvider() == null || getProvider().getRegistries() == null || getProvider().getRegistries().size() == 0)
                && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {
            Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
            if (registryConfigMap != null && registryConfigMap.size() > 0) {
                List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
                for (RegistryConfig config : registryConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        registryConfigs.add(config);
                    }
                }
                if (registryConfigs != null && registryConfigs.size() > 0) {
                    super.setRegistries(registryConfigs);
                }
            }
        }

        //step_5 擷取monitor設定到bean當中
        if (getMonitor() == null
                && (getProvider() == null || getProvider().getMonitor() == null)
                && (getApplication() == null || getApplication().getMonitor() == null)) {
            Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
            if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
                MonitorConfig monitorConfig = null;
                for (MonitorConfig config : monitorConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        if (monitorConfig != null) {
                            throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
                        }
                        monitorConfig = config;
                    }
                }
                if (monitorConfig != null) {
                    setMonitor(monitorConfig);
                }
            }
        }

        //step_6 擷取protocols設定到bean當中
        if ((getProtocols() == null || getProtocols().size() == 0)
                && (getProvider() == null || getProvider().getProtocols() == null || getProvider().getProtocols().size() == 0)) {
            Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
            if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
                List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
                for (ProtocolConfig config : protocolConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        protocolConfigs.add(config);
                    }
                }
                if (protocolConfigs != null && protocolConfigs.size() > 0) {
                    super.setProtocols(protocolConfigs);
                }
            }
        }

        //step_7 擷取path設定到bean當中
        if (getPath() == null || getPath().length() == 0) {
            if (beanName != null && beanName.length() > 0
                    && getInterface() != null && getInterface().length() > 0
                    && beanName.startsWith(getInterface())) {
                setPath(beanName);
            }
        }

        //todo 如果不是延遲暴露就會立即export,否則應該會等到onApplicationEvent中去執行暴露
        if (!isDelay()) {
            export();
        }
    }
           

bean暴露過程

 整個服務的暴露過程如下圖,共享自《

》,這裡的指假設我們沒有指定本地釋出或遠端釋出,那麼就會執行兩個釋出,暫時隻關注遠端釋出。

dubbo - 服務釋出

服務釋出過程

 首先我們需要了解下ServiceBean的類繼承關系,ServiceBean的onApplicationEvent方法中export()因為我們剛剛說的export方法其實就是在ServiceConfig類當中的。後面我們會就ServiceConfig類的export()方法實作跟蹤。

dubbo - 服務釋出

ServiceBean的類圖

 執行ServiceConfig類的doExport方法的核心邏輯,doExport -> doExportUrls -> doExportUrlsFor1Protocol,這裡需要關注registryURLs的生成方法loadRegistries,也就是說我們的注冊中心的url位址是如何生成的。

protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("Already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;
       
        doExportUrls();
        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
    }

private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
           

 注冊中心URL生成過程主要集中在loadRegistries方法中,整個思路就是周遊所有dubbo:registry标簽生成的registries清單,針對每個registry結合application+registry自身的config最後生成注冊中心的URL。

protected List<URL> loadRegistries(boolean provider) {
        checkRegistry();
        List<URL> registryList = new ArrayList<URL>();
        if (registries != null && registries.size() > 0) {
            for (RegistryConfig config : registries) {
                String address = config.getAddress();
                if (address == null || address.length() == 0) {
                    address = Constants.ANYHOST_VALUE;
                }
                String sysaddress = System.getProperty("dubbo.registry.address");
                if (sysaddress != null && sysaddress.length() > 0) {
                    address = sysaddress;
                }
                if (address != null && address.length() > 0
                        && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                    Map<String, String> map = new HashMap<String, String>();
                    appendParameters(map, application);
                    appendParameters(map, config);
                    map.put("path", RegistryService.class.getName());
                    map.put("dubbo", Version.getVersion());
                    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                    if (ConfigUtils.getPid() > 0) {
                        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                    }
                    if (!map.containsKey("protocol")) {
                        if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                            map.put("protocol", "remote");
                        } else {
                            map.put("protocol", "dubbo");
                        }
                    }
                    List<URL> urls = UrlUtils.parseURLs(address, map);
                    for (URL url : urls) {
                        url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                        url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                        if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                                || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                            registryList.add(url);
                        }
                    }
                }
            }
        }
        return registryList;
    }
           

 核心重點來了,doExportUrlsFor1Protocol部分主要是負責導出local和remote的dubbo服務,暫時關注導出remote服務,整體步驟是:

  • 1、 proxyFactory生成invoker,這裡factory為JavassistProxyFactory。(dubug跟進下)
  • 2、由protocol導出為Exporter,這裡protocol為RegistryProtocol。(dubug跟進下)

    由于整個暴露過程還是比較重要的,是以繼續深入進行分析。

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        // 省略一些不重要的代碼

        String scope = url.getParameter(Constants.SCOPE_KEY);

        //todo scope值為null,是以會釋出到local
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //釋出local的export
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //  scope值為null,是以會釋出到remote
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0) {
                    for (URL registryURL : registryURLs) {
                       
                        // todo 核心重點
                        //step_1 proxyFactory生成invoker
                        //step_2 生成wrapperInvoker
                        //step_3 由protocol導出為Exporter
                        //step_4 加入到exporters當中
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, 
                                  registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }
           

 首先proxyFactory在這裡其實由dubbo的擴充機制生成的,内部實作是通過javassit生成的源碼。proxyFactory負責生産invoker,Protocol負責到處Export對象。

//step_1 proxyFactory生成invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, 
                                  registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

//step_2 生成wrapperInvoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

//step_3 由protocol導出為Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);

exporters.add(exporter);
           

 proxyFactory.getInvoker()的調用鍊路如下,proxyFactory其實代表的是ProxyFactory$Adaptive對象。

Invoker<?> invoker = proxyFactory.getInvoker()
等同于ProxyFactory$Adaptive.getInvoker(ref, interfaceClass, dubboUrl)

      内部執行 extension = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
      傳回StubProxyFactoryWrapper

      extension.getInvoker(arg0, arg1, arg2) 
      等同于StubProxyFactoryWrapper.getInvoker(arg0, arg1, arg2) 
     
        内部執行javassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
        傳回通過new AbstractProxyInvoker()建立的對象
           

 ProxyFactory$Adaptive的源碼如下,服務暴露需要關注getInvoker()方法。

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        
        if (arg0.getUrl() == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
       
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        
        return extension.getProxy(arg0);
    }
    
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg2 == null) 
            throw new IllegalArgumentException("url == null");
        
        com.alibaba.dubbo.common.URL url = arg2;
        
        String extName = url.getParameter("proxy", "javassist");
        
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        
        return extension.getInvoker(arg0, arg1, arg2);
    }
}
           

 JavassistProxyFactory的代碼中我們需要關注getInvoker方法,内部其實就是通過wrapper.invokeMethod()方法實作調用。

public class JavassistProxyFactory extends AbstractProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}
           

 protocol.export()調用鍊如下,

protocol.export() 等同 Protocol$Adaptive.export()

  執行 extension = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
  傳回 ProtocolListenerWrapper

  執行 extension.export(arg0) 等同 ProtocolListenerWrapper.export(Invoker<T> invoker)

    執行 ProtocolFilterWrapper.export(Invoker<T> invoker)
        執行RegistryProtocol.export()
            (1)執行doLocalExport()将服務在本地開啟端口監聽之類并傳回exporter
                執行Protocol$Adaptive.export()
                    ProtocolFilterWrapper.export() -> ProtocolListenerWrapper.export() -> DubboProtocol.export() -> openServer開啟服務監聽
            (2)執行register()方法将服務釋出将服務注冊到zookeeper上
        
           

 Protocol$Adaptive的源碼如下,服務暴露需要關注export()方法。

package com.alibaba.dubbo.rpc;
        
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) 
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        
        return extension.refer(arg0, arg1);
    }
    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) 
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        
        return extension.export(arg0);
    }
}
           

 DubboProtocol的核心export部分代碼如下,核心事情包括openServer開啟監聽;建立exporter對象并傳回。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        //todo 核心在于打開socket連接配接
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }
           

整體流程

dubbo - 服務釋出

服務暴露整體流程

參考知識點

spring bean初始化過程

dubbo - 服務釋出

bean生成過程

dubbo - 服務釋出

bean初始化順序