天天看點

dubbo源碼解析之服務暴露/服務導出

Official website: http://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html

暴露流程:

  • ClassPathXmlApplicationContext加載provider的xml檔案
  • 解析<dubbo:service interface=“xxx.xxx” ref=“xxx”/>到ServiceConfig對象,并且進行服務暴露
  • 啟動Netty服務端,向外提供tcp服務接口
  • 注冊服務到注冊中心
// 在這裡開始 ServiceBean.java
// 不延遲導出:onApplicationEvent()
// 延遲導出:afterPropertiesSet()
public void onApplicationEvent(ContextRefreshedEvent event) {
    if (isDelay() && !isExported() && !isUnexported()) {
        ...
        export();
    }
}

// 不延遲導出:onApplicationEvent()
// 延遲導出:afterPropertiesSet()
public void afterPropertiesSet() throws Exception {
    ...
    if (!isDelay()) {
        export();
    }
}

// 是否延遲導出
private boolean isDelay() {
    // 針對單個服務配置延遲 eg. @Service(delay = 4000)
    // 預設delay值為0(啟動會設定成null)
    Integer delay = getDelay();
    // 針對provider配置延遲 eg. <dubbo:provider filter="dubboFilter4Provider" delay="20"/>
    ProviderConfig provider = getProvider();
    if (delay == null && provider != null) {
        delay = provider.getDelay();
    }
    // true: (delay == null || delay == -1)
    // false: delay > 0
    return supportedApplicationListener && (delay == null || delay == -1);
}

// @Parameter的excluded為true表示不需要添加到URL的parameters中
@Parameter(excluded = true)
public boolean isExported() {
    return exported;
}
           

ServiceBean實作了ApplicationListener#onApplicationEvent(E),在容器初始化完成後執行。

ServiceBean也實作了InitializingBean#afterPropertiesSet(),可以實作延遲導出。

// 檢查是否需要導出。不導出,直接傳回
// 是否需要延遲導出。需要,則啟動定時任務
// ServiceConfig.export()
public synchronized void export() {
    if (provider != null) {
        // 配置 @Service(export = false) 時,export會設定被設定成null
        // 配置 @Service(export = true) 時,export 才會為 true
        // ??? 但對以下的邏輯都沒影響,是不是認為該參數沒有作用的。
        if (export == null) {
            // 設定provider級的export是不會設定成null的
            // eg. <dubbo:provider export="false" />
            export = provider.getExport();
        }
        if (delay == null) {
            delay = provider.getDelay();
        }
    }
    // true: export == false
    // export == false時,該服務不可被遠端調用
    if (export != null && !export) {
        return;
    }
    if (delay != null && delay > 0) { // delay > 0  休眠N毫秒再導出
        // 單線程定時排程線程池
        // private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));
        delayExportExecutor.schedule(new Runnable() {
            public void run() {
                doExport();
            }
        }, delay, TimeUnit.MILLISECONDS);
    } else {
        doExport();
    }
}

// 檢測 <dubbo:service> 标簽的 interface 屬性合法性
// 初始化application、module、registries、monitor、protocols等變量
// 檢查以上對象是否為空
// ServiceConfig#doExport()
protected synchronized void doExport() {
    if (unexported) {
        throw new IllegalStateException("Already unexported!");
    }
    if (exported) {
        return;
    }
    exported = true;
    // 檢測 <dubbo:service> 标簽的 interface 屬性合法性
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("<dubbo:service inter\" /> interface not allow null!");
    }
    // 為ProviderConfig的設定property,從環境變量或配置檔案中取dubbo.provider.<id>.<property> 和 dubbo.provider.<property>
    // 先從系統環境變量擷取,擷取不到,再從配置檔案種擷取
    // 配置檔案路徑可以通過啟動參數:dubbo.properties.file或dubbo.properties配置
    checkDefault();
    // 初始化application、module、registries、monitor、protocols等變量
    if (provider != null) {
        if (application == null) {
            application = provider.getApplication();
        }
        if (module == null) {
            module = provider.getModule();
        }
        if (registries == null) {
            registries = provider.getRegistries();
        }
        if (monitor == null) {
            monitor = provider.getMonitor();
        }
        if (protocols == null) {
            protocols = provider.getProtocols();
        }
    }
    ...
    if (application != null) {
        if (registries == null) {
            registries = application.getRegistries();
        }
        if (monitor == null) {
            monitor = application.getMonitor();
        }
    }
    // TODO 這句話有什麼作用
    if (ref instanceof GenericService) {
        interfaceClass = GenericService.class;
        if (StringUtils.isEmpty(generic)) {
            generic = Boolean.TRUE.toString();
        }
    } else {
        try {
            // 取得Class類類型對象
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        checkInterfaceAndMethods(interfaceClass, methods);
        checkRef();
        generic = Boolean.FALSE.toString();
    }
    // 已廢棄,請使用stub
    if (local != null) {
        // 此處的代碼和下一個 if 分支的代碼基本一緻,這裡省略
    }
    if (stub != null) {
        if ("true".equals(stub)) {
            stub = interfaceName + "Stub";
        }
        Class<?> stubClass;
        try {
            // 擷取本地存根類
            stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        // 檢測本地存根類是否可指派給接口類,若不可指派則會抛出異常,提醒使用者本地存根類類型不合法
        if (!interfaceClass.isAssignableFrom(stubClass)) {
            throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
        }
    }
    // 為ApplicationConfig的設定property,從環境變量或配置檔案中取dubbo.application.<id>.<property>和dubbo.application.<property>
    // 另外還設定dubbo.service.shutdown.wait和dubbo.service.shutdown.wait.seconds參數
    checkApplication();
    // 如果xml檔案中未配置<dubbo:registry address="xxx"/>則從環境變量或配置檔案中取dubbo.registry.address
    checkRegistry();
    // 如果xml檔案未配置<dubbo:protocol name="dubbo" port="xxx"/>則從取<dubbo:provider protocol="xxx" />
    checkProtocol();
    // 為ServiceConfig的設定property,從環境變量或配置檔案中取dubbo.service.<id>.<property> 和 dubbo.service.<property>
    appendProperties(this);
    checkStubAndMock(interfaceClass);
    if (path == null || path.length() == 0) {
        path = interfaceName;
    }
    // 導對外連結接URL
    doExportUrls();
    ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
    ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
           

初始化ApplicationConfig、ProtocolConfig、ServiceConfig等對象,為導出做一些提前的準備工作。

// 導對外連結接URL
// ServiceConfig#doExportUrls()
private void doExportUrls() {
    // 加載注冊中心連結
    List<URL> registryURLs = loadRegistries(true);
    // 周遊protocols,并在每個協定下導出服務
    for (ProtocolConfig protocolConfig : protocols) {
        // 暴露dubbo服務:<dubbo:protocol name="dubbo" port="20880" id="dubbo" />
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

// 檢查是否配置了注冊中心 eg. <dubbo:registry address="multicast://224.5.6.7:1234"/>
// 封裝URL對象
// 傳回注冊中心清單
// AbstractInterfaceConfig#loadRegistries(boolean)
protected List<URL> loadRegistries(boolean provider) {
    // 如果xml檔案中未配置<dubbo:registry address="xxx"/>則從環境變量或配置檔案中取dubbo.registry.address
    checkRegistry();
    List<URL> registryList = new ArrayList<URL>();
    if (registries != null && registries.size() > 0) {
        for (RegistryConfig config : registries) {
            String address = config.getAddress();
            // 注冊中心位址為空,則設定為0.0.0.0
            if (address == null || address.length() == 0) {
                address = Constants.ANYHOST_VALUE;
            }
            // 系統屬性中擷取注冊中心位址
            String sysaddress = System.getProperty("dubbo.registry.address");
            if (sysaddress != null && sysaddress.length() > 0) {
                // 系統環境變量參數dubbo.registry.address會覆寫xml配置檔案的注冊中心位址
                address = sysaddress;
            }
            if (address != null && address.length() > 0
                    && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                // 把ApplicationConfig、RegistryConfig、path、dubbo、timestamp、pid、protocol等放進map對象中
                Map<String, String> map = new HashMap<String, String>();
                appendParameters(map, application);
                appendParameters(map, config);
                map.put("path", RegistryService.class.getName());
                map.put("dubbo", Version.getVersion());
                map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                if (ConfigUtils.getPid() > 0) {
                    map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                }
                if (!map.containsKey("protocol")) {
                    if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                        map.put("protocol", "remote");
                    } else {
                        map.put("protocol", "dubbo");
                    }
                }
                // 注冊中心位址可能有多個,這裡傳回一個清單
                // 解析注冊中心配置位址到URL
                List<URL> urls = UrlUtils.parseURLs(address, map);
                for (URL url : urls) {
                    // 添加"registry"="zookeeper"到URL.parameters
                    url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                    // 設定url.protocol="registry"
                    url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                    if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                            || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                        // 添加到注冊中心清單中
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return registryList;
}
           

初始化注冊中心資訊,并且存放在URL的對象中,以便後期做導出準備。

// 檢查配置<dubbo:protocol name="dubbo" port="20886" />為空,預設協定為dubbo和端口20880
// 封裝map集合參數,然後封裝的URL對象中
// 導出服務到本地/遠端
// ServiceConfig#doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();
    if (name == null || name.length() == 0) {
        // 預設協定名為dubbo
        name = "dubbo";
    }

    Map<String, String> map = new HashMap<String, String>();
    // "side" -> "provider"
    map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
    // "dubbo" -> "2.0.2"
    map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
    // "timestamp" -> "1568339911006"
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    // "pid" -> "13072"
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }
    // "application" -> "demo-provider"
    // ...
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, provider, Constants.DEFAULT_KEY);
    appendParameters(map, protocolConfig);
    // "interface" -> "com.alibaba.dubbo.demo.DemoService"
    // "bean.name" -> "com.alibaba.dubbo.demo.DemoService"
    appendParameters(map, this);
    if (methods != null && methods.size() > 0) {
        // MethodConfig儲存了<dubbo:method>的資訊
        for (MethodConfig method : methods) {
            // 添加 MethodConfig 對象的字段資訊到 map 中,鍵 = 方法名.屬性名。
            // eg. <dubbo:method name="sayHello" retries="2"> => {"sayHello.retries": 2}
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            // 解析<dubbo:argument index="1"/>
            // 參考:http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-argument.html
            List<ArgumentConfig> arguments = method.getArguments();
            if (arguments != null && arguments.size() > 0) {
                for (ArgumentConfig argument : arguments) {
                    // convert argument type
                    ...
                }
            }
        } // end of methods for
    }

    if (ProtocolUtils.isGeneric(generic)) {
        map.put("generic", generic);
        map.put("methods", Constants.ANY_VALUE);
    } else {
        // 擷取接口版本<dubbo:service inter version="1.0.0">
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }
        // 通過Wrapper#makeWrapper(Class<?>)生成一個Wrapper的實作類Wrapper0,通過Wrapper0取到所有方法名
        // TODO 為什麼要大費周章,而不是用Method[] methods = c.getMethods();直接擷取呢
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put("methods", Constants.ANY_VALUE);
        } else {
            // 添加所有的方法名到map集合。方法名以逗号分隔
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    // 添加token到map集合
    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            map.put("token", UUID.randomUUID().toString());
        } else {
            map.put("token", token);
        }
    }
    // <dubbo:protocol name="injvm"/> 為本地測試
    if ("injvm".equals(protocolConfig.getName())) {
        protocolConfig.setRegister(false);
        // 不需要監聽通知
        map.put("notify", "false");
    }

    String contextPath = protocolConfig.getContextpath();
    if ((contextPath == null || contextPath.length() == 0) && provider != null) {
        contextPath = provider.getContextpath();
    }
    // 擷取host和port
    // 先從環境變量中擷取DUBBO_DUBBO_IP_TO_BIND或DUBBO_IP_TO_BIND
    // 從<dubbo:protocol />中取host參數
    // 從<dubbo:provider />中取host參數
    // 以上都擷取失敗了,則取本機的ip位址(官方推薦不要配置該參數)
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    // 先從環境變量中擷取DUBBO_DUBBO_PORT_TO_BIND或DUBBO_PORT_TO_BIND
    // 從<dubbo:protocol />中取host參數
    // 從<dubbo:provider />中取host參數
    // 以上都擷取失敗了,則取預設的dubbo服務暴露端口20880(在DubboProtocol.DEFAULT_PORT)
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    // 承載dubbo暴露接口配置資訊
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

    // TODO 不知道幹嘛的
    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }
    // 擷取provider的scope配置 eg. <dubbo:provider scope="local" />
    String scope = url.getParameter(Constants.SCOPE_KEY);
    // scope == null || scope != "none"
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        // (scope == null || scope != "remote") => 導出本地
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        // (scope == null || scope != "local") => 導出遠端
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            if (registryURLs != null && registryURLs.size() > 0) {
                // 周遊注冊中心URL
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                    // 加載Service監控對象MonitorConfig
                    // 另有章節詳細講解監控的實作
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        // 添加"monitor":"監控位址"到url.parameters
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    ...
                    // For providers, this is used to enable custom proxy to generate invoker
                    String proxy = url.getParameter(Constants.PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        // 為注冊中心的url.parameters添加參數proxy
                        registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                    }
                    // 為服務提供類(ref)生成Invoker
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    // 封裝invoker和ServiceConfig到DelegateProviderMetaDataInvoker
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    // 導出服務
                    // 注冊中心暴露服務過程:ProtocolFilterWrapper -> ProtocolListenerWrapper -> RegistryProtocol
                    // dubbo暴露服務過程:ProtocolFilterWrapper -> ProtocolListenerWrapper -> DubboProtocol
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            } else { // 無注冊中心
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        }
    }
    this.urls.add(url);
}
           

初始化URL(用于承載Dubbo服務暴露的),并封裝各種參數到url.parameters屬性中。下面分析一下DubboProtocol和RegistryProtocol的導出服務的過程。

// 導出DelegateProviderMetaDataInvoker
// Exporter<?> exporter = protocol.export(wrapperInvoker);
// DubboProtocol.export(wrapperInvoker);
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // 擷取服務辨別。由服務組名,服務名,服務版本号以及端口組成。
    // eg. demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // put到緩存
    // Map<String, Exporter<?>> exporterMap
    exporterMap.put(key, exporter);

    // 從url擷取dubbo.stub.event參數
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    // 從url擷取is_callback_service參數
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        // TODO 暫不深入
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            ...
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }
    // 啟動Netty伺服器端
    openServer(url);
    return exporter;
}

// DubboProtocol#openServer(URL)
private void openServer(URL url) {
    // find server.
    // eg. 10.0.75.1:20880
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            // 該dubbo協定還沒有Netty服務端
            // 建立Netty服務端
            serverMap.put(key, createServer(url));
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}

// DubboProtocol#createServer(URL url)
private ExchangeServer createServer(URL url) {
    // send readonly event when server closes, it's enabled by default
    // "channel.readonly.sent" -> "true"
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // enable heartbeat by default
    // "heartbeat" -> "60000"
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    // 擷取服務端類型,預設netty
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    // "codec" -> "dubbo"
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // 
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        // 是否為支援的服務端類型
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

// Exchangers#bind(URL, ExchangeHandler)
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    ...
    // 上面設定了"codec" -> "dubbo",是以不會執行下面方法
    // CODEC_KEY
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // getExchanger(url)預設獲得HeaderExchanger執行個體
    return getExchanger(url).bind(url, handler);
}

// HeaderExchanger#bind(URL, ExchangeHandler)
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    // 通過Transporters#bind(URL, ChannelHandler...)擷取一個Server執行個體
    // 通過Server執行個體建立一個HeaderExchangeServer執行個體
    // HeaderExchangeServer使用的是裝飾器模式,對NettyServer的增強,增加心跳機制
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

// Transporters#bind(URL, ChannelHandler...)
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    ...
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // ChannelHandlerDispatcher包含Collection<ChannelHandler> channelHandlers,是多個ChannelHandler的封裝
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // getTransporter()預設獲得NettyTransporter執行個體
    return getTransporter().bind(url, handler);
}

// NettyTransporter#bind(URL, ChannelHandler)
// NettyTransporter負責Netty服務端的綁定、Netty用戶端的連接配接
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}

           

使用DubboProtocol對暴露dubbo服務。初始化Netty服務端,開啟Netty服務,對外提供Netty服務。關于Netty如何解碼編碼暫不做詳細分析。

接下來,分析一下RegistryProtocol是如何把服務注冊到注冊中心的。

// 導出DelegateProviderMetaDataInvoker
// Exporter<?> exporter = protocol.export(wrapperInvoker);
// RegistryProtocol.export(wrapperInvoker)
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    ...
    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        // 注冊服務
        register(registryUrl, registedProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }
    ...
}
public void register(URL registryUrl, URL registedProviderUrl) {
    // 建立注冊中心
    // ZookeeperRegistryFactory.createRegistry(URL url)
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 注冊服務
    // FailbackRegistry.register(URL url)
    registry.register(registedProviderUrl);
}

// 建立注冊中心
// ZookeeperRegistryFactory.createRegistry(URL url)
public Registry createRegistry(URL url) {
    return new ZookeeperRegistry(url, zookeeperTransporter);
}
// ZookeeperRegistry的構造方法
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 擷取組名,預設為 dubbo
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        // group = "/" + group
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // 建立 Zookeeper 用戶端,預設為 CuratorZookeeperTransporter
    zkClient = zookeeperTransporter.connect(url);
    // 添加狀态監聽器
    zkClient.addStateListener(new StateListener() {
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

// 注冊服務
// FailbackRegistry.register(URL url)
public void register(URL url) {
    if (destroyed.get()){
        return;
    }
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 調用注冊中心注冊
        // 以Zookeeper為例分析
        // ZookeeperRegistry.doRegister(URL url)
        doRegister(url);
    } catch (Exception e) {
        ...
    }
}

// Zookeeper注冊中心注冊
// ZookeeperRegistry.doRegister(URL url)
protected void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        ...
    }
}
           

Zookeeper通過建立節點儲存服務提供者資訊,暴露服務。

繼續閱讀