天天看點

Dubbo源碼分析七、服務導出(4)

本篇分析服務注冊和訂閱

接着上篇看,我們回到RegistryProtocol,從register(registryUrl, registeredProviderUrl);這行代碼看起:

private void register(URL registryUrl, URL registeredProviderUrl) {
    // 擷取 Registry 初始化注冊中心
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 注冊服務
    registry.register(registeredProviderUrl);
}
           

這裡做了兩步

  1. 初始化注冊中心
  2. 注冊服務

初始化注冊中心

我們以Zookeeper注冊中心為例

首先registryFactory.getRegistry方法是在AbstractRegistryFactory裡:

@Override
public Registry getRegistry(URL url) {
    if (destroyed.get()) {
        LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
        return DEFAULT_NOP_REGISTRY;
    }

    url = URLBuilder.from(url)
            .setPath(RegistryService.class.getName())
            .addParameter(INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(EXPORT_KEY, REFER_KEY)
            .build();
    String key = createRegistryCacheKey(url);
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        // 通路緩存
        // 如果注冊中心已存在,直接傳回,否則建立一個新的
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        // 緩存未命中,建立 Registry 執行個體
        //create registry by spi/ioc
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        // 寫入緩存
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}
           

然後看一下createRegistry方法,這是一個模闆方法,具體由子類實作,我們看ZookeeperRegistryFactory類:

@Override
public Registry createRegistry(URL url) {
    // 建立 ZookeeperRegistry
    return new ZookeeperRegistry(url, zookeeperTransporter);
}
           

初始化了一個ZookeeperRegistry,我們看一下它的構造方法:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 擷取組名,預設為 dubbo
    String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(PATH_SEPARATOR)) {
        // group = "/" + group
        group = PATH_SEPARATOR + group;
    }
    this.root = group;
    // 建立 Zookeeper 用戶端,預設為 CuratorZookeeperTransporter
    zkClient = zookeeperTransporter.connect(url);
    // 添加狀态監聽器
    zkClient.addStateListener((state) -> {
        if (state == StateListener.RECONNECTED) {
            logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                    " Since ephemeral ZNode will not get deleted for a connection lose, " +
                    "there's no need to re-register url of this instance.");
            ZookeeperRegistry.this.fetchLatestAddresses();
        } else if (state == StateListener.NEW_SESSION_CREATED) {
            logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
            try {
                ZookeeperRegistry.this.recover();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        } else if (state == StateListener.SESSION_LOST) {
            logger.warn("Url of this instance will be deleted from registry soon. " +
                    "Dubbo client will try to re-register once a new session is created.");
        } else if (state == StateListener.SUSPENDED) {

        } else if (state == StateListener.CONNECTED) {

        }
    });
}
           

我們重點關注 ZookeeperTransporter 的 connect 方法調用,這個方法用于建立 Zookeeper 用戶端。建立好 Zookeeper 用戶端,意味着注冊中心的建立過程就結束了。

@Override
public ZookeeperClient connect(URL url) {
    ZookeeperClient zookeeperClient;
    // address format: {[username:password@]address}
    List<String> addressList = getURLBackupAddress(url);
    // The field define the zookeeper server , including protocol, host, port, username, password
    if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
        logger.info("find valid zookeeper client from the cache for address: " + url);
        return zookeeperClient;
    }
    // avoid creating too many connections, so add lock
    synchronized (zookeeperClientMap) {
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }

        zookeeperClient = createZookeeperClient(url);
        logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
        writeToClientMap(addressList, zookeeperClient);
    }
    return zookeeperClient;
}
           

然後是createZookeeperClient方法:

@Override
public ZookeeperClient createZookeeperClient(URL url) {
    // 建立 CuratorZookeeperClient
    return new CuratorZookeeperClient(url);
}
           

然後看一下CuratorZookeeperClient的構造方法:

public CuratorZookeeperClient(URL url) {
    super(url);
    try {
        int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
        int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
        // 建立 CuratorFramework 構造器
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(url.getBackupAddress())
                .retryPolicy(new RetryNTimes(1, 1000))
                .connectionTimeoutMs(timeout)
                .sessionTimeoutMs(sessionExpireMs);
        String authority = url.getAuthority();
        if (authority != null && authority.length() > 0) {
            builder = builder.authorization("digest", authority.getBytes());
        }
        // 建構 CuratorFramework 執行個體
        client = builder.build();
        // 添加監聽器
        client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
        // 啟動用戶端
        client.start();
        boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
        if (!connected) {
            throw new IllegalStateException("zookeeper not connected");
        }
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
           

到這裡我們Zookeeper的用戶端啟動連結完成。

注冊服務

回去看registry.register(registeredProviderUrl);注冊服務

@Override
public void register(URL url) {
    if (!acceptable(url)) {
        logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
        return;
    }
    super.register(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side
        // 模闆方法,由子類實作
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        // 擷取 check 參數,若 check = true 将會直接抛出異常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        // 記錄注冊失敗的連結
        addFailedRegistered(url);
    }
}
           

doRegister模闆方法,ZookeeperRegistry的實作:

@Override
public void doRegister(URL url) {
    try {
        // 通過 Zookeeper 用戶端建立節點,節點路徑由 toUrlPath 方法生成,路徑格式如下:
        //   /${group}/${serviceInterface}/providers/${url}
        // 比如
        //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
           

然後是zkClient的create

@Override
public void create(String path, boolean ephemeral) {
    if (!ephemeral) {
        // 如果要建立的節點類型非臨時節點,那麼這裡要檢測節點是否存在
        if(persistentExistNodePath.contains(path)){
            return;
        }
        if (checkExists(path)) {
            persistentExistNodePath.add(path);
            return;
        }
    }
    int i = path.lastIndexOf('/');
    if (i > 0) {
        // 遞歸建立上一級路徑
        create(path.substring(0, i), false);
    }
    // 根據 ephemeral 的值建立臨時或持久節點
    if (ephemeral) {
        createEphemeral(path);
    } else {
        createPersistent(path);
        persistentExistNodePath.add(path);
    }
}
           

然後是建立臨時/持久節點,以createEphemeral為例:

@Override
public void createEphemeral(String path) {
    try {
        // 通過 Curator 架構建立節點
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
    } catch (NodeExistsException e) {
        logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
                ", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
                " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
                "we can just try to delete and create again.", e);
        deletePath(path);
        createEphemeral(path);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
           

到這裡服務注冊的過程分析完了。整個過程可簡單總結為:先建立注冊中心執行個體,之後再通過注冊中心執行個體注冊服務。

訂閱

registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
           

發起訂閱

@Override
public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // Sending a subscription request to the server side
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;

        List<URL> urls = getCacheUrls(url);
        if (CollectionUtils.isNotEmpty(urls)) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }

        // Record a failed registration request to a failed list, retry regularly
        addFailedSubscribed(url, listener);
    }
}
           

然後是doSubscribe模闆方法,我們看一下ZookeeperRegistry的實作:

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (ANY_VALUE.equals(url.getServiceInterface())) {
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
            ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                for (String child : currentChilds) {
                    child = URL.decode(child);
                    if (!anyServices.contains(child)) {
                        anyServices.add(child);
                        subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                Constants.CHECK_KEY, String.valueOf(false)), k);
                    }
                }
            });
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (CollectionUtils.isNotEmpty(services)) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            List<URL> urls = new ArrayList<>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                zkClient.create(path, false);
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
           

然後是zkClient.addChildListener

@Override
public List<String> addChildListener(String path, final ChildListener listener) {
    ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
    TargetChildListener targetListener = listeners.computeIfAbsent(listener, k -> createTargetChildListener(path, k));
    return addTargetChildListener(path, targetListener);
}
           
@Override
public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
    try {
        return client.getChildren().usingWatcher(listener).forPath(path);
    } catch (NoNodeException e) {
        return null;
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
           

給指定Path添加監聽器

到這給Zookeeper添加監聽器完了。現在回過頭來看一下監聽器的實作:

@Override
public synchronized void notify(List<URL> urls) {
    logger.debug("original override urls: " + urls);

    List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl.addParameter(CATEGORY_KEY,
            CONFIGURATORS_CATEGORY));
    logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);

    // No matching results
    if (matchedUrls.isEmpty()) {
        return;
    }

    this.configurators = Configurator.toConfigurators(classifyUrls(matchedUrls, UrlUtils::isConfigurator))
            .orElse(configurators);

    doOverrideIfNecessary();
}
           
public synchronized void doOverrideIfNecessary() {
    final Invoker<?> invoker;
    if (originInvoker instanceof InvokerDelegate) {
        invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
    } else {
        invoker = originInvoker;
    }
    //The origin invoker
    URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<?> exporter = bounds.get(key);
    if (exporter == null) {
        logger.warn(new IllegalStateException("error state, exporter should not be null"));
        return;
    }
    //The current, may have been merged many times
    URL currentUrl = exporter.getInvoker().getUrl();
    //Merged with this configuration
    URL newUrl = getConfigedInvokerUrl(configurators, currentUrl);
    newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
    newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
            .getConfigurators(), newUrl);
    if (!currentUrl.equals(newUrl)) {
        RegistryProtocol.this.reExport(originInvoker, newUrl);
        logger.info("exported provider url changed, origin url: " + originUrl +
                ", old export url: " + currentUrl + ", new export url: " + newUrl);
    }
}
           

監聽url發送變化時重新導出。

重新導出就不分析了,無非就是删除原來的添加新的。

總結

至此服務導出部分基本分析完了。簡單總結一下:

  1. EnableDubbo注解引入Dubbo,DubboBootstrapApplicationListener觸發DubboBootstrap的啟動start方法。該方法執行服務導出和引入初始化操作。
  2. 服務導出前需要構造URL對象,構造過程根據使用者配置來實作。
  3. 根據scope 進行不同類型的服務導出
  4. 服務導出之前構造Invoker執行個體,就是一個包裝了wrapper的對象,統一了服務接口的調用方式
  5. 遠端導出時先導出服務–實際上就是遠端服務(例如netty)的啟動
  6. 然後服務注冊
    1. 初始化注冊中心 --zookeeper的用戶端建立和連結
    2. 服務注冊 – 添加zookeeper節點
  7. 然後服務訂閱 – 就是zookeeper的節點添加監聽器