天天看點

精盡 Dubbo 源碼分析 —— 服務引用1. 概述2. 本地引用3. 服務引用

1. 概述

Dubbo 服務引用,和 Dubbo 服務暴露一樣,也有兩種方式:

本地引用,JVM 本地調用。

遠端暴露,網絡遠端通信。

2. 本地引用

2.1 createProxy

本地引用服務的順序圖如下:

精盡 Dubbo 源碼分析 —— 服務引用1. 概述2. 本地引用3. 服務引用
/**
     * 建立 Service 代理對象
     *
     * @param map 集合
     * @return 代理對象
     */
    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        // 是否本地引用
        final boolean isJvmRefer;
        // injvm 屬性為空,不通過該屬性判斷
        if (isInjvm() == null) {
            // 直連服務提供者,參見文檔《直連提供者》https://dubbo.gitbooks.io/dubbo-user-book/demos/explicit-target.html
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            // 通過 `tmpUrl` 判斷,是否需要本地引用
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            // 預設不是
            } else {
                isJvmRefer = false;
            }
        // 通過 injvm 屬性。
        } else {
            isJvmRefer = isInjvm();
        }

        // 本地引用
        if (isJvmRefer) {
            // 建立本地服務引用 URL 對象。
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // 引用服務,傳回 Invoker 對象
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        // 正常流程,一般為遠端引用
        } else {
          // ... 省略本文暫時不分享的服務遠端引用 
                }
            
        // 啟動時檢查
        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }

        // 建立 Service 代理對象
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }
           

2.2 isInjvmRefer

/**
     * 是否本地引用
     *
     * @param url URL
     * @return 是否
     */
    public boolean isInjvmRefer(URL url) {
        final boolean isJvmRefer;
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // Since injvm protocol is configured explicitly, we don't need to set any extra flag, use normal refer process.
        // 當 `protocol = injvm` 時,本身已經是 jvm 協定了,走正常流程就是了。
        if (Constants.LOCAL_PROTOCOL.toString().equals(url.getProtocol())) {
            isJvmRefer = false;
        // 當 `scope = local` 或者 `injvm = true` 時,本地引用
        } else if (Constants.SCOPE_LOCAL.equals(scope) || (url.getParameter("injvm", false))) {
            // if it's declared as local reference
            // 'scope=local' is equivalent to 'injvm=true', injvm will be deprecated in the future release
            isJvmRefer = true;
        // 當 `scope = remote` 時,遠端引用
        } else if (Constants.SCOPE_REMOTE.equals(scope)) {
            // it's declared as remote reference
            isJvmRefer = false;
        // 當 `generic = true` 時,即使用泛化調用,遠端引用。
        } else if (url.getParameter(Constants.GENERIC_KEY, false)) {
            // generic invocation is not local reference
            isJvmRefer = false;
        // 當本地已經有該 Exporter 時,本地引用
        } else if (getExporter(exporterMap, url) != null) {
            // by default, go through local reference if there's the service exposed locally
            isJvmRefer = true;
        // 預設,遠端引用
        } else {
            isJvmRefer = false;
        }
        return isJvmRefer;
    }

           

2.3 Protocol

涉及的 Protocol 類圖如下:

精盡 Dubbo 源碼分析 —— 服務引用1. 概述2. 本地引用3. 服務引用

2.3.1 ProtocolFilterWrapper

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 注冊中心
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        // 引用服務,傳回 Invoker 對象
        // 給改 Invoker 對象,包裝成帶有 Filter 過濾鍊的 Invoker 對象.建立帶有 Filter 過濾鍊的 Invoker 對象。
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }
           

2.3.2 ProtocolListenerWrapper

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 注冊中心協定
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        // 引用服務
        Invoker<T> invoker = protocol.refer(type, url);
        // 獲得 InvokerListener 數組
        List<InvokerListener> listeners = Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(InvokerListener.class).getActivateExtension(url, Constants.INVOKER_LISTENER_KEY));
        // 建立 ListenerInvokerWrapper 對象
        return new ListenerInvokerWrapper<T>(invoker, listeners);
    }
           

2.3.3 InjvmProtocol

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}
           

建立 InjvmInvoker 對象。注意,傳入的 exporterMap 參數,包含所有的 InjvmExporter 對象。

2.4 Invoker

2.4.1 AbstractInvoker

實作 Invoker 接口,抽象 Invoker 類,主要提供了 Invoker 的通用屬性和 #invoke(Invocation) 方法的通用實作。

本文主要涉及到它的通用屬性,代碼如下:

/**
     * 接口類型
     */
    private final Class<T> type;
    /**
     * 服務 URL
     */
    private final URL url;
    /**
     * 公用的隐式傳參。在 {@link #invoke(Invocation)} 方法中使用。
     */
    private final Map<String, String> attachment;
    /**
     * 是否可用
     */
    private volatile boolean available = true;
    /**
     * 是否銷毀
     */
    private AtomicBoolean destroyed = new AtomicBoolean(false);
           

2.4.1.1 InjvmInvoker

實作 AbstractInvoker 抽象類,Injvm Invoker 實作類。

/**
     * 服務鍵
     */
    private final String key;
    /**
     * Exporter 集合
     *
     * key: 服務鍵
     *
     * 該值實際就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
     */
    private final Map<String, Exporter<?>> exporterMap;

    InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
        super(type, url);
        this.key = key;
        this.exporterMap = exporterMap;
    }

           

2.4.1.2 isAvailable

isAvailable() 方法,是否可用

@Override
public boolean isAvailable() {
    // 判斷是否有 Exporter 對象
    InjvmExporter<?> exporter = (InjvmExporter<?>) exporterMap.get(key);
    if (exporter == null) {
        return false;
    } else {
        return super.isAvailable();
    }
}
           

開啟 啟動時檢查 時,調用該方法,判斷該 Invoker 對象,是否有對應的 Exporter 。若不存在,說明依賴服務不存在,檢查不通過

2.4.2 ListenerInvokerWrapper

實作 Invoker 接口,具有監聽器功能的 Invoker 包裝器

public class ListenerInvokerWrapper<T> implements Invoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(ListenerInvokerWrapper.class);

    /**
     * 真實的 Invoker 對象
     */
    private final Invoker<T> invoker;
    /**
     * Invoker 監聽器數組
     */
    private final List<InvokerListener> listeners;

    public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {
        if (invoker == null) {
            throw new IllegalArgumentException("invoker == null");
        }
        this.invoker = invoker;
        this.listeners = listeners;
        // 執行監聽器
        if (listeners != null && !listeners.isEmpty()) {
            for (InvokerListener listener : listeners) {
                if (listener != null) {
                    try {
                        listener.referred(invoker);
                    } catch (Throwable t) {
                        logger.error(t.getMessage(), t);
                    }
                }
            }
        }
    }

    public Class<T> getInterface() {
        return invoker.getInterface();
    }

    public URL getUrl() {
        return invoker.getUrl();
    }

    public boolean isAvailable() {
        return invoker.isAvailable();
    }

    public Result invoke(Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }

    @Override
    public String toString() {
        return getInterface() + " -> " + (getUrl() == null ? " " : getUrl().toString());
    }

    public void destroy() {
        try {
            invoker.destroy();
        } finally {
            // 執行監聽器
            if (listeners != null && !listeners.isEmpty()) {
                for (InvokerListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.destroyed(invoker);
                        } catch (Throwable t) {
                            logger.error(t.getMessage(), t);
                        }
                    }
                }
            }
        }
    }

}

           

2.5 InvokerListener

com.alibaba.dubbo.rpc.InvokerListener ,Invoker 監聽器。

@SPI
public interface InvokerListener {

    /**
     * The invoker referred
     *
     * 當服務引用完成
     *
     * @param invoker
     * @throws RpcException
     * @see com.alibaba.dubbo.rpc.Protocol#refer(Class, URL)
     */
    void referred(Invoker<?> invoker) throws RpcException;

    /**
     * The invoker destroyed.
     *
     * 當服務銷毀引用完成
     *
     * @param invoker
     * @see com.alibaba.dubbo.rpc.Invoker#destroy()
     */
    void destroyed(Invoker<?> invoker);

}
           
2.5.1 InvokerListenerAdapter

com.alibaba.dubbo.rpc.listener.InvokerListenerAdapter ,實作 InvokerListener 接口,InvokerListener 擴充卡抽象類。代碼如下:

public abstract class InvokerListenerAdapter implements InvokerListener {

    public void referred(Invoker<?> invoker) throws RpcException { }

    public void destroyed(Invoker<?> invoker) { }

}
           
2.5.2 DeprecatedInvokerListener

現 InvokerListenerAdapter 抽象類 ,引用廢棄的服務時,列印錯誤日志提醒。代碼如下:

@Activate(Constants.DEPRECATED_KEY)
public class DeprecatedInvokerListener extends InvokerListenerAdapter {

    private static final Logger LOGGER = LoggerFactory.getLogger(DeprecatedInvokerListener.class);

    public void referred(Invoker<?> invoker) throws RpcException {
        if (invoker.getUrl().getParameter(Constants.DEPRECATED_KEY, false)) {
            LOGGER.error("The service " + invoker.getInterface().getName() + " is DEPRECATED! Declare from " + invoker.getUrl());
        }
    }

}
           

3. 服務引用

在 Dubbo 中提供多種協定( Protocol ) 的實作,大體流程一緻,本文以 Dubbo Protocol 為例子,這也是 Dubbo 的預設協定。

遠端暴露服務的順序圖如下:

精盡 Dubbo 源碼分析 —— 服務引用1. 概述2. 本地引用3. 服務引用
3.1 createProxy(map)

涉及遠端引用服務的代碼如下:

* 建立 Service 代理對象
     *
     * @param map 集合
     * @return 代理對象
     */
    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        // 是否本地引用
        final boolean isJvmRefer;
        // injvm 屬性為空,不通過該屬性判斷
        if (isInjvm() == null) {
            // 直連服務提供者,參見文檔《直連提供者》https://dubbo.gitbooks.io/dubbo-user-book/demos/explicit-target.html
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            // 通過 `tmpUrl` 判斷,是否需要本地引用
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            // 預設不是
            } else {
                isJvmRefer = false;
            }
        // 通過 injvm 屬性。
        } else {
            isJvmRefer = isInjvm();
        }

        // 本地引用
        if (isJvmRefer) {
           // 【省略代碼】本地引用
        } else {
        // 正常流程,一般為遠端引用
            // 定義直連位址,可以是服務提供者的位址,也可以是注冊中心的位址
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                // 拆分位址成數組,使用 ";" 分隔。
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                // 循環數組,添加到 `url` 中。
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        // 建立 URL 對象
                        URL url = URL.valueOf(u);
                        // 設定預設路徑
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        // 注冊中心的位址,帶上服務引用的配置參數
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        // 服務提供者的位址
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            // 注冊中心
            } else { // assemble URL from register center's configuration
                // 加載注冊中心 URL 數組
                List<URL> us = loadRegistries(false);
                // 循環數組,添加到 `url` 中。
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        // 加載監控中心 URL
                        URL monitorUrl = loadMonitor(u);
                        // 服務引用配置對象 `map`,帶上監控中心的 URL
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // 注冊中心的位址,帶上服務引用的配置參數
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); // 注冊中心,帶上服務引用的配置參數
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            // 單 `urls` 時,引用服務,傳回 Invoker 對象
            if (urls.size() == 1) {
                // 引用服務
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                // 循環 `urls` ,引用服務,傳回 Invoker 對象
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    // 引用服務
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    // 使用最後一個注冊中心的 URL
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                // 有注冊中心
                if (registryURL != null) { // registry url is available
                    // 對有注冊中心的 Cluster 隻用 AvailableCluster
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                // 無注冊中心,全部都是服務直連
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        // 啟動時檢查
        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }

        // 建立 Service 代理對象
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }
           
3.2 Protocol

本文涉及的 Protocol 類圖如下:

精盡 Dubbo 源碼分析 —— 服務引用1. 概述2. 本地引用3. 服務引用
3.2.1 ProtocolFilterWrapper

本文涉及的 #refer(type, url) 方法,代碼如下:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 注冊中心
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        // 引用服務,傳回 Invoker 對象
        // 給改 Invoker 對象,包裝成帶有 Filter 過濾鍊的 Invoker 對象
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }
           
3.2.2 RegistryProtocol

本文涉及的 #refer(type, url) 方法,代碼如下:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 獲得真實的注冊中心的 URL
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        // 獲得注冊中心
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // 獲得服務引用配置參數集合
        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        // 分組聚合,參見文檔 http://dubbo.io/books/dubbo-user-book/demos/group-merger.html
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                // 執行服務引用
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        // 執行服務引用
        return doRefer(cluster, registry, type, url);
    }
           
3.2.3 doRefer

#doRefer(cluster, registry, type, url) 方法,執行服務引用的邏輯。代碼如下:

/**
     * 執行服務引用,傳回 Invoker 對象
     *
     * @param cluster Cluster 對象
     * @param registry 注冊中心對象
     * @param type 服務接口類型
     * @param url 注冊中心 URL
     * @param <T> 泛型
     * @return Invoker 對象
     */
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 建立 RegistryDirectory 對象,并設定注冊中心
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // 建立訂閱 URL
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 服務引用配置集合
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        // 向注冊中心注冊自己(服務消費者)
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false))); // 不檢查的原因是,不需要檢查。
        }
        // 向注冊中心訂閱服務提供者 + 路由規則 + 配置規則
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                        Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        // 建立 Invoker 對象
        Invoker invoker = cluster.join(directory);
        // 向本地系統資料庫,注冊消費者
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }
           
3.3 DubboProtocol
3.3.1 refer

本文涉及的 #refer(type, url) 方法,代碼如下:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // 初始化序列化優化器
        optimizeSerialization(url);
        // 獲得遠端通信用戶端數組
        // 建立 DubboInvoker 對象
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        // 添加到 `invokers`
        invokers.add(invoker);
        return invoker;
    }

           
3.3.2 getClients

getClients(url) 方法,獲得連接配接服務提供者的遠端通信用戶端數組。代碼如下:

/**
     * 獲得連接配接服務提供者的遠端通信用戶端數組
     *
     * @param url 服務提供者 URL
     * @return 遠端通信用戶端
     */
    private ExchangeClient[] getClients(URL url) {
        // 是否共享連接配接
        // whether to share connection
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) { // 未配置時,預設共享
            service_share_connect = true;
            connections = 1;
        }

        // 建立連接配接服務提供者的 ExchangeClient 對象數組
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) { // 共享
                clients[i] = getSharedClient(url);
            } else { // 不共享
                clients[i] = initClient(url);
            }
        }
        return clients;
    }
           
3.3.3 getSharedClient

getClients(url) 方法,獲得連接配接服務提供者的遠端通信用戶端數組。代碼如下:

* Get shared connection
     *
     * 獲得 ExchangeClient 對象。若集合中已經存在,則直接使用,無需建立。否則,建立 ExchangeClient 對象。
     */
    private ExchangeClient getSharedClient(URL url) {
        // 從集合中,查找 ReferenceCountExchangeClient 對象
        String key = url.getAddress();
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            // 若未關閉,增加指向該 Client 的數量,并傳回它
            if (!client.isClosed()) {
                client.incrementAndGetCount();
                return client;
            // 若已關閉,移除
            } else {
                referenceClientMap.remove(key);
            }
        }
        // 同步,建立 ExchangeClient 對象。
        synchronized (key.intern()) {
            // 建立 ExchangeClient 對象
            ExchangeClient exchangeClient = initClient(url);
            // 将 `exchangeClient` 包裝,建立 ReferenceCountExchangeClient 對象
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            // 添加到集合
            referenceClientMap.put(key, client);
            // 添加到 `ghostClientMap`
            ghostClientMap.remove(key);
            return client;
        }
    }
           
3.3.4 initClient

#initClient(url) 方法,建立 ExchangeClient 對象,”連接配接”伺服器。

/**
     * Create new connection
     *
     * 建立 ExchangeClient 對象,"連接配接"伺服器
     */
    private ExchangeClient initClient(URL url) {
        // 校驗 Client 的 Dubbo SPI 拓展是否存在
        // client type setting.
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
        // BIO is not allowed since it has severe performance issue.
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        // 設定編解碼器為 Dubbo ,即 DubboCountCodec
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);

        // 預設開啟 heartbeat
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // 連接配接伺服器,建立用戶端
        ExchangeClient client;
        try {
            // 懶連接配接,建立 LazyConnectExchangeClient 對象
            // connection should be lazy
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            // 直接連接配接,建立 HeaderExchangeClient 對象
            } else {
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }
           
3.4 Invoker

本文涉及的 Invoker 類圖如下:

精盡 Dubbo 源碼分析 —— 服務引用1. 概述2. 本地引用3. 服務引用
3.4.1 DubboInvoker

實作 AbstractExporter 抽象類,Dubbo Invoker 實作類。代碼如下:

/**
     * 遠端通信用戶端數組
     */
    private final ExchangeClient[] clients;
    /**
     * 使用的 {@link #clients} 的位置
     */
    private final AtomicPositiveInteger index = new AtomicPositiveInteger();
    /**
     * 版本
     */
    private final String version;
    /**
     * 銷毀鎖
     *
     * 在 {@link #destroy()} 中使用
     */
    private final ReentrantLock destroyLock = new ReentrantLock();
    /**
     * Invoker 集合,從 {@link DubboProtocol#invokers} 擷取
     */
    private final Set<Invoker<?>> invokers;

    public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients) {
        this(serviceType, url, clients, null);
    }

    public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) {
        super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
        this.clients = clients;
        // get version.
        this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
        this.invokers = invokers;
    }
           
3.5 Client
3.5.1 ReferenceCountExchangeClient

com.alibaba.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient ,實作 ExchangeClient 接口,支援指向計數的資訊交換用戶端實作類。

/**
     * URL
     */
    private final URL url;
    /**
     * 指向數量
     */
    private final AtomicInteger refenceCount = new AtomicInteger(0);
    /**
     * 幽靈用戶端集合
     */
    private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap;
    /**
     * 用戶端
     */
    private ExchangeClient client;

    public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
        this.client = client;
        // 指向加一
        refenceCount.incrementAndGet();
        this.url = client.getUrl();
        if (ghostClientMap == null) {
            throw new IllegalStateException("ghostClientMap can not be null, url: " + url);
        }
        this.ghostClientMap = ghostClientMap;
    }

           
3.5.2 LazyConnectExchangeClient

com.alibaba.dubbo.rpc.protocol.dubbo.LazyConnectExchangeClient ,實作 ExchangeClient 接口,支援懶連接配接伺服器的資訊交換用戶端實作類。

/**
     * URL
     */
    private final URL url;
    /**
     * 通道處理器
     */
    private final ExchangeHandler requestHandler;
    /**
     * 連接配接鎖
     */
    private final Lock connectLock = new ReentrantLock();
    /**
     * lazy connect 如果沒有初始化時的連接配接狀态
     */
    // lazy connect, initial state for connection
    private final boolean initialState;
    /**
     * 通信用戶端
     */
    private volatile ExchangeClient client;
    /**
     * 請求時,是否檢查告警
     */
    protected final boolean requestWithWarning;
    /**
     * 警告計數器。每超過一定次數,列印告警日志。參見 {@link #warning(Object)}
     */
    private AtomicLong warningcount = new AtomicLong(0);

    public LazyConnectExchangeClient(URL url, ExchangeHandler requestHandler) {
        // lazy connect, need set send.reconnect = true, to avoid channel bad status.
        this.url = url.addParameter(Constants.SEND_RECONNECT_KEY, Boolean.TRUE.toString());
        this.requestHandler = requestHandler;
        this.initialState = url.getParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE);
        this.requestWithWarning = url.getParameter(REQUEST_WITH_WARNING_KEY, false);
    }
           

繼續閱讀