天天看點

dubbo源碼閱讀-服務訂閱(八)之遠端訂閱(dubbo)RegistryProtocolRegistryDirectory

說明

從《服務訂閱主流程》可以看到根據協定來注冊 我們預設沒有根據url直接配置是以url是registry SPI擴充就是走的RegistryProtocol

RegistryProtocol

<1>refer

/**
     * type為訂閱接口
     * @param type Service class
     * @param url  URL address for the remote service
     * @param <T>
     * @return
     * @throws RpcException
     */
    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        //删除registry協定 并擷取url訂閱協定預設dubbo 如果配置的注冊中心是zookeeper://name協定就是zookeeper
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //@SPI實作  獲得對應的registry實作
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        //将url參數 轉換為 map
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        //擷取group   https://blog.csdn.net/weixin_43868594/article/details/84956886
        String group = qs.get(Constants.GROUP_KEY);
        //如果配置了group
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                //<2>使用mergeable
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        //<2>cluster為SPI擴充 預設FailoverCluster 為叢集政策
        return doRefer(cluster, registry, type, url);
    }      

<2>doRefer

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //type為訂閱接口類型 url為訂閱url
        //zookeeper://IP:2181/com.alibaba.dubbo.registry.RegistryService?**
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        //擷取參數
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());

        //訂閱url
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
            //将訂閱url 添加到已注冊清單 registryList
            registry.register(registeredConsumerUrl);
            //設定到directory
            directory.setRegisteredConsumerUrl(registeredConsumerUrl);
        }
        //<3>訂閱服務 内部調用registry.subscribe
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        //通過叢集政策包裝成一個Invoker傳回
        Invoker invoker = cluster.join(directory);
        //注冊到訂閱清單
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }      

RegistryDirectory

<3>subscribe

public void subscribe(URL url) {
        //此時的url為:consumer://192.168.2.1/com.alibaba.dubbo.demo.DemoService?*
        setConsumerUrl(url);
         //對應的注冊中心實作類 調用訂閱方法 通知添加監聽器 就是目前對象 實作了 NotifyListener 具體可看:https://www.cnblogs.com/LQBlog/p/12522417.html
        registry.subscribe(url, this);
    }