天天看点

Dubbo源码分析七、服务导出(4)

本篇分析服务注册和订阅

接着上篇看,我们回到RegistryProtocol,从register(registryUrl, registeredProviderUrl);这行代码看起:

private void register(URL registryUrl, URL registeredProviderUrl) {
    // 获取 Registry 初始化注册中心
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 注册服务
    registry.register(registeredProviderUrl);
}
           

这里做了两步

  1. 初始化注册中心
  2. 注册服务

初始化注册中心

我们以Zookeeper注册中心为例

首先registryFactory.getRegistry方法是在AbstractRegistryFactory里:

@Override
public Registry getRegistry(URL url) {
    if (destroyed.get()) {
        LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
        return DEFAULT_NOP_REGISTRY;
    }

    url = URLBuilder.from(url)
            .setPath(RegistryService.class.getName())
            .addParameter(INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(EXPORT_KEY, REFER_KEY)
            .build();
    String key = createRegistryCacheKey(url);
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        // 访问缓存
        // 如果注册中心已存在,直接返回,否则创建一个新的
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        // 缓存未命中,创建 Registry 实例
        //create registry by spi/ioc
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        // 写入缓存
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}
           

然后看一下createRegistry方法,这是一个模板方法,具体由子类实现,我们看ZookeeperRegistryFactory类:

@Override
public Registry createRegistry(URL url) {
    // 创建 ZookeeperRegistry
    return new ZookeeperRegistry(url, zookeeperTransporter);
}
           

初始化了一个ZookeeperRegistry,我们看一下它的构造方法:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 获取组名,默认为 dubbo
    String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(PATH_SEPARATOR)) {
        // group = "/" + group
        group = PATH_SEPARATOR + group;
    }
    this.root = group;
    // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
    zkClient = zookeeperTransporter.connect(url);
    // 添加状态监听器
    zkClient.addStateListener((state) -> {
        if (state == StateListener.RECONNECTED) {
            logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                    " Since ephemeral ZNode will not get deleted for a connection lose, " +
                    "there's no need to re-register url of this instance.");
            ZookeeperRegistry.this.fetchLatestAddresses();
        } else if (state == StateListener.NEW_SESSION_CREATED) {
            logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
            try {
                ZookeeperRegistry.this.recover();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        } else if (state == StateListener.SESSION_LOST) {
            logger.warn("Url of this instance will be deleted from registry soon. " +
                    "Dubbo client will try to re-register once a new session is created.");
        } else if (state == StateListener.SUSPENDED) {

        } else if (state == StateListener.CONNECTED) {

        }
    });
}
           

我们重点关注 ZookeeperTransporter 的 connect 方法调用,这个方法用于创建 Zookeeper 客户端。创建好 Zookeeper 客户端,意味着注册中心的创建过程就结束了。

@Override
public ZookeeperClient connect(URL url) {
    ZookeeperClient zookeeperClient;
    // address format: {[username:password@]address}
    List<String> addressList = getURLBackupAddress(url);
    // The field define the zookeeper server , including protocol, host, port, username, password
    if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
        logger.info("find valid zookeeper client from the cache for address: " + url);
        return zookeeperClient;
    }
    // avoid creating too many connections, so add lock
    synchronized (zookeeperClientMap) {
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }

        zookeeperClient = createZookeeperClient(url);
        logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
        writeToClientMap(addressList, zookeeperClient);
    }
    return zookeeperClient;
}
           

然后是createZookeeperClient方法:

@Override
public ZookeeperClient createZookeeperClient(URL url) {
    // 创建 CuratorZookeeperClient
    return new CuratorZookeeperClient(url);
}
           

然后看一下CuratorZookeeperClient的构造方法:

public CuratorZookeeperClient(URL url) {
    super(url);
    try {
        int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
        int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
        // 创建 CuratorFramework 构造器
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(url.getBackupAddress())
                .retryPolicy(new RetryNTimes(1, 1000))
                .connectionTimeoutMs(timeout)
                .sessionTimeoutMs(sessionExpireMs);
        String authority = url.getAuthority();
        if (authority != null && authority.length() > 0) {
            builder = builder.authorization("digest", authority.getBytes());
        }
        // 构建 CuratorFramework 实例
        client = builder.build();
        // 添加监听器
        client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
        // 启动客户端
        client.start();
        boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
        if (!connected) {
            throw new IllegalStateException("zookeeper not connected");
        }
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
           

到这里我们Zookeeper的客户端启动链接完成。

注册服务

回去看registry.register(registeredProviderUrl);注册服务

@Override
public void register(URL url) {
    if (!acceptable(url)) {
        logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
        return;
    }
    super.register(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side
        // 模板方法,由子类实现
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        // 获取 check 参数,若 check = true 将会直接抛出异常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        // 记录注册失败的链接
        addFailedRegistered(url);
    }
}
           

doRegister模板方法,ZookeeperRegistry的实现:

@Override
public void doRegister(URL url) {
    try {
        // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
        //   /${group}/${serviceInterface}/providers/${url}
        // 比如
        //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
           

然后是zkClient的create

@Override
public void create(String path, boolean ephemeral) {
    if (!ephemeral) {
        // 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
        if(persistentExistNodePath.contains(path)){
            return;
        }
        if (checkExists(path)) {
            persistentExistNodePath.add(path);
            return;
        }
    }
    int i = path.lastIndexOf('/');
    if (i > 0) {
        // 递归创建上一级路径
        create(path.substring(0, i), false);
    }
    // 根据 ephemeral 的值创建临时或持久节点
    if (ephemeral) {
        createEphemeral(path);
    } else {
        createPersistent(path);
        persistentExistNodePath.add(path);
    }
}
           

然后是创建临时/持久节点,以createEphemeral为例:

@Override
public void createEphemeral(String path) {
    try {
        // 通过 Curator 框架创建节点
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
    } catch (NodeExistsException e) {
        logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
                ", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
                " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
                "we can just try to delete and create again.", e);
        deletePath(path);
        createEphemeral(path);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
           

到这里服务注册的过程分析完了。整个过程可简单总结为:先创建注册中心实例,之后再通过注册中心实例注册服务。

订阅

registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
           

发起订阅

@Override
public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // Sending a subscription request to the server side
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;

        List<URL> urls = getCacheUrls(url);
        if (CollectionUtils.isNotEmpty(urls)) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }

        // Record a failed registration request to a failed list, retry regularly
        addFailedSubscribed(url, listener);
    }
}
           

然后是doSubscribe模板方法,我们看一下ZookeeperRegistry的实现:

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (ANY_VALUE.equals(url.getServiceInterface())) {
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
            ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                for (String child : currentChilds) {
                    child = URL.decode(child);
                    if (!anyServices.contains(child)) {
                        anyServices.add(child);
                        subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                Constants.CHECK_KEY, String.valueOf(false)), k);
                    }
                }
            });
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (CollectionUtils.isNotEmpty(services)) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            List<URL> urls = new ArrayList<>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                zkClient.create(path, false);
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
           

然后是zkClient.addChildListener

@Override
public List<String> addChildListener(String path, final ChildListener listener) {
    ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
    TargetChildListener targetListener = listeners.computeIfAbsent(listener, k -> createTargetChildListener(path, k));
    return addTargetChildListener(path, targetListener);
}
           
@Override
public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
    try {
        return client.getChildren().usingWatcher(listener).forPath(path);
    } catch (NoNodeException e) {
        return null;
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
           

给指定Path添加监听器

到这给Zookeeper添加监听器完了。现在回过头来看一下监听器的实现:

@Override
public synchronized void notify(List<URL> urls) {
    logger.debug("original override urls: " + urls);

    List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl.addParameter(CATEGORY_KEY,
            CONFIGURATORS_CATEGORY));
    logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);

    // No matching results
    if (matchedUrls.isEmpty()) {
        return;
    }

    this.configurators = Configurator.toConfigurators(classifyUrls(matchedUrls, UrlUtils::isConfigurator))
            .orElse(configurators);

    doOverrideIfNecessary();
}
           
public synchronized void doOverrideIfNecessary() {
    final Invoker<?> invoker;
    if (originInvoker instanceof InvokerDelegate) {
        invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
    } else {
        invoker = originInvoker;
    }
    //The origin invoker
    URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<?> exporter = bounds.get(key);
    if (exporter == null) {
        logger.warn(new IllegalStateException("error state, exporter should not be null"));
        return;
    }
    //The current, may have been merged many times
    URL currentUrl = exporter.getInvoker().getUrl();
    //Merged with this configuration
    URL newUrl = getConfigedInvokerUrl(configurators, currentUrl);
    newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
    newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
            .getConfigurators(), newUrl);
    if (!currentUrl.equals(newUrl)) {
        RegistryProtocol.this.reExport(originInvoker, newUrl);
        logger.info("exported provider url changed, origin url: " + originUrl +
                ", old export url: " + currentUrl + ", new export url: " + newUrl);
    }
}
           

监听url发送变化时重新导出。

重新导出就不分析了,无非就是删除原来的添加新的。

总结

至此服务导出部分基本分析完了。简单总结一下:

  1. EnableDubbo注解引入Dubbo,DubboBootstrapApplicationListener触发DubboBootstrap的启动start方法。该方法执行服务导出和引入初始化操作。
  2. 服务导出前需要构造URL对象,构造过程根据用户配置来实现。
  3. 根据scope 进行不同类型的服务导出
  4. 服务导出之前构造Invoker实例,就是一个包装了wrapper的对象,统一了服务接口的调用方式
  5. 远程导出时先导出服务–实际上就是远程服务(例如netty)的启动
  6. 然后服务注册
    1. 初始化注册中心 --zookeeper的客户端创建和链接
    2. 服务注册 – 添加zookeeper节点
  7. 然后服务订阅 – 就是zookeeper的节点添加监听器