天天看點

Dubbo系列講解之服務注冊【3萬字長文分享】服務注冊的幾個步驟Dubbo源碼分析

Dubbo系列講解之服務注冊【3萬字長文分享】服務注冊的幾個步驟Dubbo源碼分析

服務注冊的幾個步驟

  對于RPC架構的服務注冊,一般包含了如下的流程:

  • 加載服務提供者,可能是通過xml配置的,也可能是通過掃描注解的
  • 執行個體化服務提供者,并以服務接口作為key,實作類作為value存儲到一個map容器中
  • 開啟網絡監聽
  • 将服務提供者的位址路徑(ip:port/服務名?參數等)注冊到注冊中心
  • 當網絡監聽接收到請求時,根據請求過來的服務名及參數等,從容器中擷取到服務提供者實作,通過消費端調用時傳送的方法名稱反射調用服務提供者的相關方法

Dubbo源碼分析

Dubbo與Spring的整合

  在實際的開發過程中,Dubbo大部分情況都是與Spring的生态進行整合使用的,是以在真正進入Dubbo的服務注冊之前,我們需要先了解Dubbo是怎麼将自己的環境嵌入到Spring生态中的。

  在Spring中使用Dubbo的方式有兩種,一種是通過XML配置檔案,一種是通過注解的方式,由于當下Spring Boot盛行,是以這裡會比較深入的分析Dubbo在Spring Boot中的整合。不過其實兩種方式最終的都是将Dubbo的相關元件注入到Spring 的容器中

  在Spring 中提供了一種NamespaceHandler的機制,用于對Spring标簽的擴充,是以在Spring使用xml的方式時,Dubbo中會提供一個名為DubboNamespaceHandler的處理器,用于解析spring 的xml中的各種dubbo标簽,并注入到容器中,這裡不再深入。DubboNamespaceHandler源碼如下:

public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    @Override
    public void init() {
      // 将xml中的相關标簽注入到spring中
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
        registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
        registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }

    /**
     * Override {@link NamespaceHandlerSupport#parse(Element, ParserContext)} method
     *
     * @param element       {@link Element}
     * @param parserContext {@link ParserContext}
     * @return
     * @since 2.7.5
     */
    @Override
    public BeanDefinition parse(Element element, ParserContext parserContext) {
        BeanDefinitionRegistry registry = parserContext.getRegistry();
        registerAnnotationConfigProcessors(registry);
        /**
         * @since 2.7.8
         * issue : https://github.com/apache/dubbo/issues/6275
         */
        registerCommonBeans(registry);
        BeanDefinition beanDefinition = super.parse(element, parserContext);
        setSource(beanDefinition);
        return beanDefinition;
    }

    /**
     * Register the processors for the Spring Annotation-Driven features
     *
     * @param registry {@link BeanDefinitionRegistry}
     * @see AnnotationConfigUtils
     * @since 2.7.5
     */
    private void registerAnnotationConfigProcessors(BeanDefinitionRegistry registry) {
        AnnotationConfigUtils.registerAnnotationConfigProcessors(registry);
    }
}      

  接下來重點來看在Spring Boot中的整合。

  在Dubbo與Spring的整合,有兩個入口可以讓我們進入到Dubbo主見初始化的,第一種就是通過@EnableDubbo注解上的DubboComponentScan注解,它是一個@Import注解,Spring會通過引入一個DubboComponentScanRegistrar注冊器在其registerBeanDefinitions方法上注入一個ServiceAnnotationBeanPostProcessor後置處理器。基于Spring的後置處理器原理,我們可以知道,它将會在bean執行個體化完成,初始化之前和之後各自産生回調,具體稍後再叙。注入方法如下:

private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
    BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);
    builder.addConstructorArgValue(packagesToScan);
    builder.setRole(2);
    AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
    BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);
}      

  另一個入口就是Spring Boot的自動裝配,在Dubbo中存在一個DubboAutoConfiguration類,該類通過Spring Boot中的自動裝配原理注冊到IOC容器中,同時該類是一個配置類,在該類中同時也裝配了一個

ServiceAnnotationBeanPostProcessor

的bean,方法如下:

@ConditionalOnProperty(prefix = DUBBO_SCAN_PREFIX, name = BASE_PACKAGES_PROPERTY_NAME)
    @ConditionalOnBean(name = BASE_PACKAGES_PROPERTY_RESOLVER_BEAN_NAME)
    @Bean
    public ServiceAnnotationBeanPostProcessor serviceAnnotationBeanPostProcessor(
            @Qualifier(BASE_PACKAGES_PROPERTY_RESOLVER_BEAN_NAME) PropertyResolver propertyResolver) {
        Set<String> packagesToScan = propertyResolver.getProperty(BASE_PACKAGES_PROPERTY_NAME, Set.class, emptySet());
        return new ServiceAnnotationBeanPostProcessor(packagesToScan);
    }      

  接下來就進入到ServiceAnnotationBeanPostProcessor一探究竟。

  首先進入構造方法

public ServiceAnnotationBeanPostProcessor(Set<String> packagesToScan) {
        super(packagesToScan);
 }      

  這裡将傳入的packagesToScan往父類進行傳遞,由于它繼承了ServiceClassPostProcessor,現在進入ServiceClassPostProcessor類的構造方法:

public ServiceClassPostProcessor(Set<String> packagesToScan) {
    this.packagesToScan = packagesToScan;
}      

ServiceClassPostProcessor隻是将傳入的掃包路徑指派給packagesToScan

根據BeanPostProcessor的特性,現在進入到postProcessBeanDefinitionRegistry方法

@Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {

        // @since 2.7.5
        registerBeans(registry, DubboBootstrapApplicationListener.class);

        Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);

        if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
            registerServiceBeans(resolvedPackagesToScan, registry);
        } else {
            if (logger.isWarnEnabled()) {
                logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
            }
        }

    }      

該方法主要做了以下幾件事:

  • 注冊了一個DubboBootstrapApplicationListener監聽,具體作用稍後再叙
  • 調用resolvePackagesToScan方法解析所有包名的路徑。可能包名中存在一Placeholders的特殊定義
  • 調用registerServiceBeans方法進行注冊

  具體怎麼解析包路徑不在本次讨論範圍,所有就先不深入了,現在直接進入到registerServiceBeans方法中

private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {

        DubboClassPathBeanDefinitionScanner scanner =
                new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);

        BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);

        scanner.setBeanNameGenerator(beanNameGenerator);

        // refactor @since 2.7.7
        serviceAnnotationTypes.forEach(annotationType -> {
            scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));
        });

        for (String packageToScan : packagesToScan) {

            // Registers @Service Bean first
            scanner.scan(packageToScan);

            // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
            Set<BeanDefinitionHolder> beanDefinitionHolders =
                    findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);

            if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {

                for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
                    registerServiceBean(beanDefinitionHolder, registry, scanner);
                }

                if (logger.isInfoEnabled()) {
                    logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
                            beanDefinitionHolders +
                            " } were scanned under package[" + packageToScan + "]");
                }

            } else {

                if (logger.isWarnEnabled()) {
                    logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
                            + packageToScan + "]");
                }

            }

        }

    }      

  該方法有主要做了以下幾件事

  • 建構了一個DubboClassPathBeanDefinitionScanner對象,該對象繼承自Spring的ClassPathBeanDefinitionScanner。在Spring中,ClassPathBeanDefinitionScanner是一個掃描程式,主要用來掃描Classpath下符合條件的對象,然後将對象注入到給定的registry中
  • 定義一個為Bean生成名稱的BeanNameGenerator,這裡生成的是AnnotationBeanNameGenerator這個政策
  • 将bean名稱政策set到scanner中
  • 添加過濾Filter,這裡周遊serviceAnnotationTypes,擷取到所有的過濾條件,這裡是基于注解的攔截,到serviceAnnotationTypes指派的地方,可以看到。初始化了以下三種注解作為攔截
private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList(
        // @since 2.7.7 Add the @DubboService , the issue : https://github.com/apache/dubbo/issues/6007
        DubboService.class,
        // @since 2.7.0 the substitute @com.alibaba.dubbo.config.annotation.Service
        Service.class,
        // @since 2.7.3 Add the compatibility for legacy Dubbo's @Service , the issue : https://github.com/apache/dubbo/issues/4330
        com.alibaba.dubbo.config.annotation.Service.class
);      
  • 周遊解析後的掃描的包,調用scanner.scan(packageToScan)注冊所有标注了@Service的bean注入到ioc容器中
  • 調用findServiceBeanDefinitionHolders查找所有标注了@Service的Class封裝成BeanDefinitionHolders,不管是否被@ComponentScan掃描
  • 如果beanDefinitionHolders存在元素,周遊beanDefinitionHolders,調用registerServiceBean注冊

  将标注了@Service注解的bean注入到ioc容器不屬于本次讨論内容,這裡也不做詳細說明

下面進入到findServiceBeanDefinitionHolders方法,了解一下該方法都傳回了那些類型的BeanDefinitionHolder

private Set<BeanDefinitionHolder> findServiceBeanDefinitionHolders(
            ClassPathBeanDefinitionScanner scanner, String packageToScan, BeanDefinitionRegistry registry,
            BeanNameGenerator beanNameGenerator) {

        Set<BeanDefinition> beanDefinitions = scanner.findCandidateComponents(packageToScan);

        Set<BeanDefinitionHolder> beanDefinitionHolders = new LinkedHashSet<>(beanDefinitions.size());

        for (BeanDefinition beanDefinition : beanDefinitions) {

            String beanName = beanNameGenerator.generateBeanName(beanDefinition, registry);
            BeanDefinitionHolder beanDefinitionHolder = new BeanDefinitionHolder(beanDefinition, beanName);
            beanDefinitionHolders.add(beanDefinitionHolder);

        }

        return beanDefinitionHolders;

    }      
  • 首先掃描傳入的packageToScan包下的所有的符合在scanner中定義的過濾注解的.class檔案,封裝成BeanDefinition
  • 周遊掃描到的beanDefinitions,通過名稱政策,為Bean生成名稱,同時用Bean和名稱建構成BeanDefinitionHolder,加入到beanDefinitionHolders中,傳回該集合

  beanDefinitionHolders的擷取到這裡就已經完成了,接下來進入到 registerServiceBean方法中,看看具體的注冊流程

private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
                                 DubboClassPathBeanDefinitionScanner scanner) {

  // 通過beanDefinitionHolder中的BeanDefinition中儲存的全類名通過Class.forName加載成class對象
    Class<?> beanClass = resolveClass(beanDefinitionHolder);

    Annotation service = findServiceAnnotation(beanClass);

    /**
     * The {@link AnnotationAttributes} of @Service annotation
     */
    AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);

    Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);

    String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();

    AbstractBeanDefinition serviceBeanDefinition =
            buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);

    // ServiceBean Bean name
    String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);

    if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
        registry.registerBeanDefinition(beanName, serviceBeanDefinition);

        if (logger.isInfoEnabled()) {
            logger.info("The BeanDefinition[" + serviceBeanDefinition +
                    "] of ServiceBean has been registered with name : " + beanName);
        }

    } else {

        if (logger.isWarnEnabled()) {
            logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
                    "] of ServiceBean[ bean name : " + beanName +
                    "] was be found , Did @DubboComponentScan scan to same package in many times?");
        }

    }

}      
  • 擷取掃描到的類的位元組碼的class對象
  • 擷取 beanClass上的注解,會跟初始化時的serviceAnnotationTypes屬性中的注解進行比對,傳回比對到的注解
  • 擷取比對到的注解service上的所有屬性及其屬性值serviceAnnotationAttributes
  • 擷取掃描到的.class對象實作的接口的class對象interfaceClass
  • 擷取組裝BeanDefinitionHolder是為Bean生成的名稱
  • 将所有參數傳入到buildServiceBeanDefinition方法中,建構一個AbstractBeanDefinition的對象,這裡我們暫時是不知道AbstractBeanDefinition儲存的是哪個Bean的定義
  • 通過generateServiceBeanName方法建構一個ServiceBean的名稱。
  • 檢查是否存在重複名稱的bean,如果不存在,則直接注入AbstractBeanDefinition的定義到ioc容器中

  現在我們先來探索一下在buildServiceBeanDefinition中建構的是一個什麼Bean的定義,由于方法比較長,這裡就不貼代碼了,該方法的大概的流程就是建立了一個ServiceBean的BeanDefinition。然後組裝前面解析到的注解的參數和擷取到的實作類的接口等為ServiceBean的屬性進行指派,然後最後傳回一個ServiceBean的BeanDefinition。

  然後再來看看ServiceBean的命名規則是怎麼樣的

private String generateServiceBeanName(AnnotationAttributes serviceAnnotationAttributes, Class<?> interfaceClass) {
    ServiceBeanNameBuilder builder = create(interfaceClass, environment)
            .group(serviceAnnotationAttributes.getString("group"))
            .version(serviceAnnotationAttributes.getString("version"));
    return builder.build();
}      

  可以看到,ServiceBean的命名規則是通過接口的全類名以及group,version等一起來保證唯一名稱的,或許是長這樣的ServiceBean:com.bobo.dubbo.api.HelloService:2.0.1這一系列操作下來,就是為了建構一個ServiceBean。而我們在DubboNamespaceHandler的方式中,也可以看到,最終也注入了一個ServiceBean。那麼ServiceBean到底有何神奇之處呢?馬上揭曉!進入到ServiceBean,看到其繼承了ServiceConfig,同時實作了InitializingBean,DisposableBean,ApplicationContextAware,BeanNameAware,ApplicationEventPublisherAware等接口。而ServiceConfig又繼承了AbstractConfig類,它是,比如Service,Refrence,application,Monitor等配置類的父類,我們進入AbstractConfig類,發現它存在一個@PostConstruct注解标注的方法,它會在spring的bean初始化完成之後執行,我們進入該方法

@PostConstruct
public void addIntoConfigManager() {
    ApplicationModel.getConfigManager().addConfig(this);
}      

進入到ApplicationModel.getConfigManager()方法

public static ConfigManager getConfigManager() {
    return (ConfigManager) LOADER.getExtension(ConfigManager.NAME);
}      

  看到這裡,上一篇的SPI知識就排上了用場了,這是一個用來擷取FrameworkExt接口的擴充實作的擴充點,同時ConfigManager.NAME指定了需要擷取的擴充點的名稱為config。是以就到FrameworkExt的實作類中查找一個名為config的擴充點實作即可得到。其實這裡得到的就是ConfigManager本身。

  是以在addIntoConfigManager方法中,實際上是将目前的bean儲存到了ConfigManager的對象中,最終儲存到了ConfigManager的configsCache中。ConfigManager主要是用來管理Dubbo中的所有繼承了AbstractConfig的配置

  而在ServiceBean中,我們并沒有看到有任何任何有價值的東西,到這裡看起來似乎前路已斷,不知道怎麼樣入手了。

  此時突然想起來我們在進行掃包的一系列操作之前,貌似注冊了一個監聽器,是不是可以從監聽器入手呢?

進入到前面注冊的

DubboBootstrapApplicationListener

監聽器中

public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
        implements Ordered {

    /**
     * The bean name of {@link DubboBootstrapApplicationListener}
     *
     * @since 2.7.6
     */
    public static final String BEAN_NAME = "dubboBootstrapApplicationListener";

    private final DubboBootstrap dubboBootstrap;

    public DubboBootstrapApplicationListener() {
        this.dubboBootstrap = DubboBootstrap.getInstance();
    }

    @Override
    public void onApplicationContextEvent(ApplicationContextEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }

    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
        dubboBootstrap.start();
    }

    private void onContextClosedEvent(ContextClosedEvent event) {
        dubboBootstrap.stop();
    }

    @Override
    public int getOrder() {
        return LOWEST_PRECEDENCE;
    }
}      

  可以看到,該監聽器監聽了容器的容器的重新整理和關閉,我們前面的操作已經将ServiceBean注入到了ioc容器中,根據ioc的容器初始化的幾個周期,可以知道在Refreshd容器時,我們所有的服務提供者對應的ServiceBean已經全部裝載到了容器中。

  繼續往下,當産生ContextRefreshedEvent事件時,調用了onContextRefreshedEvent方法,該方法中調用dubboBootstrap.start();

  到這裡,跟Spring相關的東西已經走完了,下面做一個總結

  • 通過Spring Boot的自動裝配或@EnableDubbo注解自動注入一個ServiceAnnotationBeanPostProcessor傳入需要掃描的包的路徑
  • 注冊了一個``DubboBootstrapApplicationListener`監聽
  • 根據BeanPostProcessor的特性,調用postProcessBeanDefinitionRegistry方法,根據傳入的掃包路徑進行掃描,然後将所有的标注了@Service注解的bean注入到ioc容器中
  • 繼續掃描包,獲得标注了Service/DubboService等注解的所有BeanDefinitionHolders
  • 周遊BeanDefinitionHolders,解析出每個BeanDefinition中的接口,标注的注解,及注解上定義的參數等。
  • 通過解析出來的一系列資訊生成一個ServiceBean.然後将ServiceBean注入到ioc容器中。
  • 同時在ServiceBean的父類AbstractConfig中,會存在一個标注了@PostConstruct注解的方法,它會在bean初始化完成之後,将目前bean儲存到一個ConfigManager對象中,它dubbo環境中是一個單例的存在。
  • 在Spring進行Refresh容器時,會觸發一個事件,調用dubboBootstrap.start();方法,啟動

  接下來就真正的進入到Dubbo的服務釋出,注冊的世界,一探究竟吧

Dubbo的服務注冊與釋出

  在進入start()方法之前,首先需要看看dubboBootstrap的初始化過程,它是一個單例的對象,直接進入DubboBootstrap的構造方法

private DubboBootstrap() {
    configManager = ApplicationModel.getConfigManager();
    environment = ApplicationModel.getEnvironment();

    DubboShutdownHook.getDubboShutdownHook().register();
    ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
        @Override
        public void callback() throws Throwable {
            DubboBootstrap.this.destroy();
        }
    });
}      

  可以看到,初始化時建構了configManager和environment,其中configManager主要用于管理Dubbo中的所有配置。Environment展示先不關注

  根據以上的分析,我們現在進入到start()方法

public DubboBootstrap start() {
    // 已經啟動過後,就不用再次啟動了
    if (started.compareAndSet(false, true)) {
        ready.set(false);
      // 初始化方法,就是檢查一些配置,啟動配置中心等等
        initialize();
        if (logger.isInfoEnabled()) {
            logger.info(NAME + " is starting...");
        }
        // 1. export Dubbo Services
      // 真正執行釋出服務的方法
        exportServices();

        // Not only provider register
        if (!isOnlyRegisterProvider() || hasExportedServices()) {
            // 2. export MetadataService
            exportMetadataService();
            //3. Register the local ServiceInstance if required
            registerServiceInstance();
        }

        referServices();
        if (asyncExportingFutures.size() > 0) {
            new Thread(() -> {
                try {
                    this.awaitFinish();
                } catch (Exception e) {
                    logger.warn(NAME + " exportAsync occurred an exception.");
                }
                ready.set(true);
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " is ready.");
                }
            }).start();
        } else {
            ready.set(true);
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is ready.");
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info(NAME + " has started.");
        }
    }
    return this;
}      

  調用了exportServices方法進行了服務的釋出和注冊,調用referServices方法進行服務的發現,服務發現将留到下一篇去,今天隻對服務的注冊進行探索。

  進入到exportServices方法

private void exportServices() {
    configManager.getServices().forEach(sc -> {
        // TODO, compatible with ServiceConfig.export()
        ServiceConfig serviceConfig = (ServiceConfig) sc;
        serviceConfig.setBootstrap(this);

        if (exportAsync) {
            ExecutorService executor = executorRepository.getServiceExporterExecutor();
            Future<?> future = executor.submit(() -> {
                sc.export();
                exportedServices.add(sc);
            });
            asyncExportingFutures.add(future);
        } else {
            sc.export();
            exportedServices.add(sc);
        }
    });
}      

  周遊我們在Spring Boot環節時添加到configManager的所有ServiceConfig,将目前的對象傳入到ServiceConfig中,同步或異步調用ServiceConfig的export方法。

  進入到export方法

public synchronized void export() {
    if (!shouldExport()) {
        return;
    }

    if (bootstrap == null) {
        bootstrap = DubboBootstrap.getInstance();
        bootstrap.init();
    }

    checkAndUpdateSubConfigs();

    //init serviceMetadata
    serviceMetadata.setVersion(version);
    serviceMetadata.setGroup(group);
    serviceMetadata.setDefaultGroup(group);
    serviceMetadata.setServiceType(getInterfaceClass());
    serviceMetadata.setServiceInterfaceName(getInterface());
    serviceMetadata.setTarget(getRef());

    if (shouldDelay()) {
        DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    } else {
        doExport();
    }

    exported();
}      

  該方法算是Dubbo服務釋出的入口流程方法了。

  • 判斷是否應該釋出本服務
  • 如果DubboBootstrap對象為null,初始化一個DubboBootstrap對象
  • 檢查是否更新存根配置
  • 初始化ServiceMetadata,将注入Bean時初始化的一些參數儲存到serviceMetadata中
  • 延時或同步調用doExport

  進入doExport

protected synchronized void doExport() {
    if (unexported) {
        throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
    }
    if (exported) {
        return;
    }
    exported = true;

    if (StringUtils.isEmpty(path)) {
        path = interfaceName;
    }
    doExportUrls();
}      

  做了一系列判斷,辨別等初始化之後,再調用doExportUrls方法

private void doExportUrls() {
    ServiceRepository repository = ApplicationModel.getServiceRepository();
    ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
    repository.registerProvider(
            getUniqueServiceName(),
            ref,
            serviceDescriptor,
            this,
            serviceMetadata
    );

    List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

    for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getContextPath(protocolConfig)
                .map(p -> p + "/" + path)
                .orElse(path), group, version);
        // In case user specified path, register service one more time to map it to path.
        repository.registerService(pathKey, interfaceClass);
        // TODO, uncomment this line once service key is unified
        serviceMetadata.setServiceKey(pathKey);
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}      
  • 擷取到一個ServiceRepository,根據前面分析的經驗,可以看出這裡得到的就是一個ServiceRepository對象。
  • 根據服務的接口名稱和位元組碼封裝一個ServiceDescriptor儲存到repository的services中。并傳回ServiceDescriptor
  • 調用registerProvider方法,将唯一服務名,服務的Provider,serviceDescriptor,目前對象,ServiceMetadata等傳入方法。

  解析注冊中心的URLregistryURLs,這裡傳回的是如下的URL

registry://192.168.100.127:2181/org.apache.dubbo.registry.RegistryService?application=spring-cloud-alibaba-boot-dubbo-provider&default=true&dubbo=2.0.2&pid=48053&preferred=true&qos.enable=false&registry=zookeeper&release=2.7.7&timeout=10000&timestamp=1598411022992      
  • 周遊protocols,根據周遊到的協定拼接成不同的pathKey,調用registerService進行注冊,儲存服務源資訊
  • 根據不同的協定,調用doExportUrlsFor1Protocol方法進行注冊

  進入registerProvider方法,該方法會将傳入的對象建構成一個ProviderModel對象。儲存到相應的集合中,同時在ProviderModel對象初始化時,會調用将該接口的所有方法周遊,建構成一個ProviderMethodModel儲存到methods中。

  然後進入到doExportUrlsFor1Protocol,方法過長,這裡就不貼代碼了。其主要完成了以下功能

  • 根據初始化ServiceBean時傳入的各個參數,封裝成一個map
  • 擷取目前伺服器的host
  • 擷取目前服務需要監聽的port
  • 根據封裝的參數,協定,host,port建構一個URL
  • 釋出一個本地服務-injvm
  • 擷取到配置的注冊中心的URL,可以存在多個注冊中心,這就是Dubbo對多注冊中心的支援
  • 添加注冊中心的URL的參數
  • 生成monitor的URL
  • 再次封裝釋出服務的URL的參數
  • 通過Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));将目前服務的url作為參數添加到注冊中心的url上,然後使用registryURL和目前服務接口位元組碼,服務實作建構一個invoker,這是一個屬于注冊中心的invoker;
  • 使用Invoker和目前的ServiceConfig建構一個 DelegateProviderMetaDataInvoker對象
  • 調用Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);進行服務的釋出
  • 将傳回的exporter添加到exporters中

  接下來看看是怎麼擷取到Invoker的,根據我們的SPI的知識,在沒有參數中沒有指定擴充點時,會使用預設@SPI注解上預設指定的擴充點,由于在ProxyFactory類上的注解為@SPI(“javassist”),是以可以知道這裡擷取到的擴充點為JavassistProxyFactory的對象,在進入JavassistProxyFactory的getInvoker()方法之前,根據我們學習SPI的知識,或許該擴充點存在一些包裝,這裡就不詳細說明了,主要講服務釋出的主要流程。進入該類的getInvoker()方法。

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}      

  根據傳入的服務接口的class對象,動态生成的一個包裝器,該包裝器繼承了Wrapper了,重寫了invokeMethod()方法。重寫方法如下

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
        com.wangx.spring.cloud.alibaba.provider.HelloServiceImpl w;
        try {
            w = ((com.wangx.spring.cloud.alibaba.provider.HelloServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            if ("hello".equals($2) && $3.length == 1) {
                return ($w) w.hello((java.lang.String) $4[0]);
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.wangx.spring.cloud.alibaba.provider.HelloServiceImpl.");
    }      

  是以,當AbstractProxyInvoker的doInvoke方法被調用的時候,會直接執行被傳入服務提供者的具體方法。這樣做的好處就是在服務啟動時就将方法調用準備好,在被遠端調用時,直接通過引用調用,而不需要通過反射調用。提高性能。回到doExportUrlsFor1Protocol方法,根據傳回的invoker和目前對象包裝一個DelegateProviderMetaDataInvoker對象,接下來調用Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);方法。

  現在先來确定PROCTOL的具體實作是什麼,PROCTOL是一個自适應的擴充點,它會生成一個Proctol&Adaptie的類,該類實作了Protocol接口,重寫了Protocol的export和refer()方法。這裡隻讨論生成的export方法,如下:

public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
    if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null)
        throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
    org.apache.dubbo.common.URL url = arg0.getUrl();
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
        throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
}      

  該方法會根據傳入的invoker對象中的 protocol作為擴充名,擷取Protocol的擴充實作,因為我上一步我們傳入的是registryURl,根據注解的協定可以知道,通過自适應擴充對象擷取到的擴充實作為RegistryProtocol的對象,進入該類的export方法。

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

    // decide if we need to delay publish
    boolean register = providerUrl.getParameter(REGISTER_KEY, true);
    if (register) {
        register(registryUrl, registeredProviderUrl);
    }

    // register stated url on provider model
    registerStatedUrl(registryUrl, registeredProviderUrl, register);

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);

    notifyExport(exporter);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}      
  • 調用getRegistryUrl(originInvoker);方法擷取注冊中心的真正的URL,這裡會将registry替換成我們配置的zookeeper,如果配置的是nacos,則傳回nacos
  • 調用getProviderUrl方法擷取服務釋出的url,稍後會将該服務釋出到注冊中心上
  • 調用doLocalExport(originInvoker, providerUrl);釋出一個本地服務
  • 調用getRegistry(originInvoker);擷取真正配置的注冊中心的Registry.
  • 将服務注冊到注冊中心

    進入doLocalExport(originInvoker, providerUrl);方法

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);

    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}      

  首先擷取服務的的key,格式如下:

dubbo://192.168.100.127:20880/com.wangx.dubbo.api.HelloService?anyhost=true&application=spring-cloud-alibaba-boot-dubbo-provider&bind.ip=192.168.100.127&bind.port=20880&deprecated=false&dubbo=2.0.2&generic=false&interface=com.wangx.dubbo.api.HelloService&methods=hello&pid=58949&qos.enable=false&release=2.7.7&revision=2.0.1&side=provider&timestamp=1598493906319&version=2.0.1

  包含了服務的協定,端口,服務名稱,應用名稱,版本等資訊。将入傳入的Invoker和providerUrl建構一個InvokerDelegate類型的對象。并将服務提供者URL指派為InvokerDelegate對象的url屬性,将該對象傳入到protocol.export(invokerDelegate)方法中。根據SPI的原理,這裡會依賴注入一個Procotol$Adaptive的自适應擴充類,此時傳入的是Dubbo協定的URL,是以這裡實際會執行的是DubboProtocol中的方法。進入DubboProtocol類中的export方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }

        }
    }

    openServer(url);
    optimizeSerialization(url);

    return exporter;
}      
  • 根據URL擷取一個service的key,建構一個DubboExporter對象,該對象包含了傳入的key,invoker,exporterMap等,在invoker中有嵌套了最開始生成的能夠實際執行服務提供者方法的代理對象。
  • 将DubboExporter的對象儲存到exporterMap集合中。
  • 調用openServer方法釋出服務
  • 序列化url

進入openServer(url)

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    if (isServer) {
        ProtocolServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}      

  剛方法主要雙重檢查是否存在目前位址的ProtocolServer,不存在建立一個儲存到serverMap中。進入createServer方法該方法主要是調用Exchangers.bind(url, requestHandler);方法對位址和端口進行監聽,然後轉入一個requestHandler對象,當接收到請求時,調用requestHandler對象的reply方法進行處理

  這裡不再深入到netty網絡部分,直接來看看接收到請求的時候,是怎麼處理的,進入到reply方法

@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ": " + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    Invocation inv = (Invocation) message;
    Invoker<?> invoker = getInvoker(channel, inv);
    // need to consider backward-compatibility if it's a callback
    if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        if (methodsStr == null || !methodsStr.contains(",")) {
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            String[] methods = methodsStr.split(",");
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        if (!hasMethod) {
            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                    + " not found in callback service interface ,invoke will be ignored."
                    + " please update the api interface. url is:"
                    + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    Result result = invoker.invoke(inv);
    return result.thenApply(Function.identity());
    }
         

  将接收到的message轉成Invocation對象,這是rpc在傳輸過程中對于調用端的一些參數的封裝,包含了服務名,端口,方法名,方法參數等,通過inv擷取到invoker,進入getInvoker該方法主要通過inv封裝成一個serviceKey,然後通過該serviceKey從exporterMap容器中擷取到對應的DubboExporter,然後從該DubboExporter中擷取到初始化時傳入的invoker回到reply方法,會調用傳回的invoker.invoker方法,該invoker中最底層封裝了一個最原始的AbstractProxyInvoker的invoker,是以最終會調用到該類型的對象的invoker,如下:

public Result invoke(Invocation invocation) throws RpcException {
     try {
         Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
         CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
             AppResponse result = new AppResponse();
             if (t != null) {
                 if (t instanceof CompletionException) {
                     result.setException(t.getCause());
                 } else {
                     result.setException(t);
                 }
             } else {
                 result.setValue(obj);
             }
             return result;
         });
         return new AsyncRpcResult(appResponseFuture, invocation);
     } catch (InvocationTargetException e) {
         if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
             logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
         }
         return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
     } catch (Throwable e) {
         throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
     }
 }      

  然後這裡會調用一個doInvoker()方法,這是一個模闆方法,正好對應前面建立的如下代碼

return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };      

  是以最終調用wrapper.invokerMethod方法,然後調用對應服務的對應的方法。

  到這裡服務本地監聽和當請求過來時的處理就大緻聊完了,接下來重新回到RegistryProtocol類的export方法

  進入到register(registryUrl, registeredProviderUrl);方法

private void register(URL registryUrl, URL registeredProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registeredProviderUrl);
}      

  這裡的registryUrl經過轉換,已經變成了我們配置的zokeeper協定的url,是以這裡我們将會獲得一個ZookeeperRegistry的對象。将registeredProviderUrl傳入到register方法中,在進入到ZookeeperRegistry中時,發現并沒有register方法,那麼隻可能存在于它的父類中,在它的父類FailbackRegistry中,我們找到了這個方法,進入

public void register(URL url) {
    if (!acceptable(url)) {
        logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
        return;
    }
    super.register(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        addFailedRegistered(url);
    }
}      

  該發放做了一系列操作之後,實際調用了doRegister()方法,該方法是一個抽象方法,由子類實作,這裡就是有zookeeperRegistry進行實作的。

  進入到ZookeeperRegistry中的doRegister()方法

@Override
public void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}      

  發現就是調用zkClient在注冊中心上建立一些節點,至此,我們的整個服務釋出和注冊功能都已經完成了

接下來總結一下在Dubbo的整個階段,有哪些主要的過程

  • ServiceBean注冊到IOC容器時會将自身對象儲存到ConfigManager的對象中
  • 在Dubbo調用DubboBoostrap.start()中的釋出方法時,周遊ConfigManager對象中儲存的ServiceBean,開始了服務注冊之旅
  • 調用ServiceBean的父類的export()方法,進行真正的服務注冊,該對象中會儲存一個ref屬性,這是一個服務真正的提供者
  • 擷取配置的注冊中心,因為可能存在多個注冊中心,需要周遊多個,這就是多注冊中心的實作,拿到注冊中心的url
  • 建構服務提供這的URL并封裝各項參數,根據參數和注冊中心的url接服務提供者實作生成一個Invoker,該invoker中會儲存一個代理對象,該對象那個的invokerMethod()方法将會直接調用服務提供者的對應的方法
  • 将該invoker進行一些列封裝,然後儲存到一個map集合中,使用netty開啟一個服務提供者的端口監聽,并将一個requestHandler進行綁定,當接收到請求時,調用該對象的reply方法,該方法最終會執行上一步驟生成的invokerMethod()方法,這就形成了一個釋出和調用的閉環
  • 在釋出完成本地服務,開啟了端口的監聽之後,需要将服務提供者的URL注冊到注冊中心,根據注冊中心配置的協定,這裡擷取到了Zookeeper的注冊中心,然後将服務提供者的URL通過zkClint在注冊中心上建立一個node,表示注冊完成

搞定~