天天看點

Dubbo3.0中zookeeper配置中心建構源碼分析以及其他配置中心簡介

作者:程式員阿龍

上一篇:dubbo 3.0服务调用者调用第三方接口源码主流程分析

zookeeper配置中心

zookeeper配置中心构建路径:

1、ZookeeperDynamicConfiguration构建

2、AbstractZookeeperTransporter的connect方法

3、Curator5ZookeeperTransporter的createZookeeperClient方法

4、Curator5ZookeeperClient构建,在构建时会创建具体的ZookeeperClient,后续对zk元数据的操作都基于这个Client;

public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration {

    private Executor executor;
    private ZookeeperClient zkClient;

    private CacheListener cacheListener;
    private static final int DEFAULT_ZK_EXECUTOR_THREADS_NUM = 1;
    private static final int DEFAULT_QUEUE = 10000;
    private static final Long THREAD_KEEP_ALIVE_TIME = 0L;

    ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);

        this.cacheListener = new CacheListener(rootPath);

        final String threadName = this.getClass().getSimpleName();
        /** 构建线程池,核心线程数和最大线程数:1,空闲被回收的时间:0 ,队列大小:10000 */
        this.executor = new ThreadPoolExecutor(DEFAULT_ZK_EXECUTOR_THREADS_NUM, DEFAULT_ZK_EXECUTOR_THREADS_NUM,
            THREAD_KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(DEFAULT_QUEUE),
            new NamedThreadFactory(threadName, true),
            new AbortPolicyWithReport(threadName, url));

        /** 通过zookeeperTransporter的connect根据url与zk建立连接,拿到了一个ZookeeperClient */
        zkClient = zookeeperTransporter.connect(url);
        boolean isConnected = zkClient.isConnected();
        if (!isConnected) {
            throw new IllegalStateException("Failed to connect with zookeeper, pls check if url " + url + " is correct.");
        }
    }

    /**
     * @param key e.g., {service}.configurators, {service}.tagrouters, {group}.dubbo.properties
     * @return
     */
    @Override
    public String getInternalProperty(String key) {
        return zkClient.getContent(buildPathKey("", key));
    }

    @Override
    protected void doClose() throws Exception {
        // zkClient is shared in framework, should not close it here
        // zkClient.close();
        // See: org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
        // All zk clients is created and destroyed in ZookeeperTransporter.
        zkClient = null;
    }

    /**
     * 发布配置
     * @param pathKey
     * @param content
     * @return
     * @throws Exception
     */
    @Override
    protected boolean doPublishConfig(String pathKey, String content) throws Exception {
        /** 将配置项写入zk中,是否临时节点:false */
        zkClient.create(pathKey, content, false);
        return true;
    }

    /**
     * 通过cas机制发布配置
     * @param key
     * @param group
     * @param content
     * @param ticket
     * @return
     */
    @Override
    public boolean publishConfigCas(String key, String group, String content, Object ticket) {
        try {
            if (ticket != null && !(ticket instanceof Stat)) {
                throw new IllegalArgumentException("zookeeper publishConfigCas requires stat type ticket");
            }
            /** 构建key */
            String pathKey = buildPathKey(group, key);
            /** 创建或者更新,是否临时节点:false */
            zkClient.createOrUpdate(pathKey, content, false, ticket == null ? 0 : ((Stat) ticket).getVersion());
            return true;
        } catch (Exception e) {
            logger.warn("zookeeper publishConfigCas failed.", e);
            return false;
        }
    }

    @Override
    protected String doGetConfig(String pathKey) throws Exception {
        return zkClient.getContent(pathKey);
    }

    @Override
    public ConfigItem getConfigItem(String key, String group) {
        String pathKey = buildPathKey(group, key);
        return zkClient.getConfigItem(pathKey);
    }

    @Override
    protected boolean doRemoveConfig(String pathKey) throws Exception {
        zkClient.delete(pathKey);
        return true;
    }

    @Override
    protected Collection<String> doGetConfigKeys(String groupPath) {
        return zkClient.getChildren(groupPath);
    }

    @Override
    protected void doAddListener(String pathKey, ConfigurationListener listener) {
        cacheListener.addListener(pathKey, listener);
        zkClient.addDataListener(pathKey, cacheListener, executor);
    }

    @Override
    protected void doRemoveListener(String pathKey, ConfigurationListener listener) {
        cacheListener.removeListener(pathKey, listener);
        Set<ConfigurationListener> configurationListeners = cacheListener.getConfigurationListeners(pathKey);
        if (CollectionUtils.isNotEmpty(configurationListeners)) {
            zkClient.removeDataListener(pathKey, cacheListener);
        }
    }
}
           

AbstractZookeeperTransporter的connect方法

public ZookeeperClient connect(URL url) {
    ZookeeperClient zookeeperClient;
    // address format: {[username:password@]address}
    List<String> addressList = getURLBackupAddress(url);
    // The field define the zookeeper server , including protocol, host, port, username, password
    if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
        logger.info("find valid zookeeper client from the cache for address: " + url);
        return zookeeperClient;
    }
    // avoid creating too many connections, so add lock
    synchronized (zookeeperClientMap) {
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }

        /** 构建ZookeeperClient */
        zookeeperClient = createZookeeperClient(url);
        logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
        writeToClientMap(addressList, zookeeperClient);
    }
    return zookeeperClient;
}           

Curator5ZookeeperTransporter的createZookeeperClient方法

public ZookeeperClient createZookeeperClient(URL url) {
    return new Curator5ZookeeperClient(url);
}           

Curator5ZookeeperClient的构建

public class Curator5ZookeeperClient extends AbstractZookeeperClient<Curator5ZookeeperClient.NodeCacheListenerImpl, Curator5ZookeeperClient.CuratorWatcherImpl> {

    protected static final Logger logger = LoggerFactory.getLogger(Curator5ZookeeperClient.class);

    private static final Charset CHARSET = StandardCharsets.UTF_8;
    private final CuratorFramework client;
    private static Map<String, NodeCache> nodeCacheMap = new ConcurrentHashMap<>();

    public Curator5ZookeeperClient(URL url) {
        super(url);
        try {
            /** 超时时间,默认:30秒 */
            int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
            /** session过期时间,默认:60秒 */
            int sessionExpireMs = url.getParameter(SESSION_KEY, DEFAULT_SESSION_TIMEOUT_MS);
            /**
             * 基于Curator框架去构建ZookeeperClient
             * Curator目前是zk在使用中最为常用的框架,对原生zk的client做了一层包装
             */
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    /** 设置zk的url地址 */
                    .connectString(url.getBackupAddress())
                    /** 如果失败了间隔一秒钟重试一次 */
                    .retryPolicy(new RetryNTimes(1, 1000))
                    /** 设置连接超时时间 */
                    .connectionTimeoutMs(timeout)
                    /** 设置session会话过期时间 */
                    .sessionTimeoutMs(sessionExpireMs);
            String userInformation = url.getUserInformation();
            /** 权限控制 */
            if (userInformation != null && userInformation.length() > 0) {
                builder = builder.authorization("digest", userInformation.getBytes());
            }
            /** 构建client */
            client = builder.build();
            /**
             * 添加监听器
             * 跟zk的连接建立了之后,一般来说得关注一下跟这个zk之间的连接
             * 如果跟zk的连接有断开此时是会通知的
             */
            client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
            /** 开始建立连接 */
            client.start();
            /** 阻塞住,置到跟zk成功建立连接,超时时间:30秒 */
            boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
            if (!connected) {
                /** 如果没建立成功则抛异常 */
                throw new IllegalStateException("zookeeper not connected");
            }
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    public void createPersistent(String path) {
        try {
            client.create().forPath(path);
        } catch (NodeExistsException e) {
            logger.warn("ZNode " + path + " already exists.", e);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    public void createEphemeral(String path) {
        try {
            client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
        } catch (NodeExistsException e) {
            logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
                    ", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
                    " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
                    "we can just try to delete and create again.", e);
            deletePath(path);
            createEphemeral(path);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    protected void createPersistent(String path, String data) {
        byte[] dataBytes = data.getBytes(CHARSET);
        try {
            client.create().forPath(path, dataBytes);
        } catch (NodeExistsException e) {
            try {
                client.setData().forPath(path, dataBytes);
            } catch (Exception e1) {
                throw new IllegalStateException(e.getMessage(), e1);
            }
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    protected void createEphemeral(String path, String data) {
        byte[] dataBytes = data.getBytes(CHARSET);
        try {
            client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes);
        } catch (NodeExistsException e) {
            logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
                    ", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
                    " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
                    "we can just try to delete and create again.", e);
            deletePath(path);
            createEphemeral(path, data);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    protected void update(String path, String data, int version) {
        byte[] dataBytes = data.getBytes(CHARSET);
        try {
            client.setData().withVersion(version).forPath(path, dataBytes);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    protected void createOrUpdatePersistent(String path, String data, int version) {
        try {
            /** 检查节点是否存在 */
            if (checkExists(path)) {
                /** 如果存在则进行更新 */
                update(path, data, version);
            } else {
                /** 创建节点 */
                createPersistent(path, data);
            }
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    protected void createOrUpdateEphemeral(String path, String data, int version) {
        try {
            if (checkExists(path)) {
                update(path, data, version);
            } else {
                createEphemeral(path, data);
            }
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    protected void deletePath(String path) {
        try {
            client.delete().deletingChildrenIfNeeded().forPath(path);
        } catch (NoNodeException ignored) {
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    public List<String> getChildren(String path) {
        try {
            return client.getChildren().forPath(path);
        } catch (NoNodeException e) {
            return null;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    public boolean checkExists(String path) {
        try {
            if (client.checkExists().forPath(path) != null) {
                return true;
            }
        } catch (Exception ignored) {
        }
        return false;
    }

    @Override
    public boolean isConnected() {
        return client.getZookeeperClient().isConnected();
    }

    @Override
    public String doGetContent(String path) {
        try {
            byte[] dataBytes = client.getData().forPath(path);
            return (dataBytes == null || dataBytes.length == 0) ? null : new String(dataBytes, CHARSET);
        } catch (NoNodeException e) {
            // ignore NoNode Exception.
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        return null;
    }

    @Override
    public ConfigItem doGetConfigItem(String path) {
        String content;
        Stat stat;
        try {
            stat = new Stat();
            byte[] dataBytes = client.getData().storingStatIn(stat).forPath(path);
            content = (dataBytes == null || dataBytes.length == 0) ? null : new String(dataBytes, CHARSET);
        } catch (NoNodeException e) {
            return new ConfigItem();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        return new ConfigItem(content, stat);
    }

    @Override
    public void doClose() {
        super.doClose();
        client.close();
    }

    @Override
    public CuratorWatcherImpl createTargetChildListener(String path, ChildListener listener) {
        return new CuratorWatcherImpl(client, listener, path);
    }

    @Override
    public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
        try {
            return client.getChildren().usingWatcher(listener).forPath(path);
        } catch (NoNodeException e) {
            return null;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    protected NodeCacheListenerImpl createTargetDataListener(String path, DataListener listener) {
        return new NodeCacheListenerImpl(client, listener, path);
    }

    @Override
    protected void addTargetDataListener(String path, NodeCacheListenerImpl nodeCacheListener) {
        this.addTargetDataListener(path, nodeCacheListener, null);
    }

    @Override
    protected void addTargetDataListener(String path, NodeCacheListenerImpl nodeCacheListener, Executor executor) {
        try {
            NodeCache nodeCache = new NodeCache(client, path);
            if (nodeCacheMap.putIfAbsent(path, nodeCache) != null) {
                return;
            }
            if (executor == null) {
                nodeCache.getListenable().addListener(nodeCacheListener);
            } else {
                nodeCache.getListenable().addListener(nodeCacheListener, executor);
            }

            nodeCache.start();
        } catch (Exception e) {
            throw new IllegalStateException("Add nodeCache listener for path:" + path, e);
        }
    }

    @Override
    protected void removeTargetDataListener(String path, NodeCacheListenerImpl nodeCacheListener) {
        NodeCache nodeCache = nodeCacheMap.get(path);
        if (nodeCache != null) {
            nodeCache.getListenable().removeListener(nodeCacheListener);
        }
        nodeCacheListener.dataListener = null;
    }

    @Override
    public void removeTargetChildListener(String path, CuratorWatcherImpl listener) {
        listener.unwatch();
    }

    static class NodeCacheListenerImpl implements NodeCacheListener {

        private CuratorFramework client;

        private volatile DataListener dataListener;

        private String path;

        protected NodeCacheListenerImpl() {
        }

        public NodeCacheListenerImpl(CuratorFramework client, DataListener dataListener, String path) {
            this.client = client;
            this.dataListener = dataListener;
            this.path = path;
        }

        @Override
        public void nodeChanged() throws Exception {
            ChildData childData = nodeCacheMap.get(path).getCurrentData();
            String content = null;
            EventType eventType;
            if (childData == null) {
                eventType = EventType.NodeDeleted;
            } else if (childData.getStat().getVersion() == 0) {
                content = new String(childData.getData(), CHARSET);
                eventType = EventType.NodeCreated;
            } else {
                content = new String(childData.getData(), CHARSET);
                eventType = EventType.NodeDataChanged;
            }
            dataListener.dataChanged(path, content, eventType);
        }
    }

    static class CuratorWatcherImpl implements CuratorWatcher {

        private CuratorFramework client;
        private volatile ChildListener childListener;
        private String path;

        public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) {
            this.client = client;
            this.childListener = listener;
            this.path = path;
        }

        protected CuratorWatcherImpl() {
        }

        public void unwatch() {
            this.childListener = null;
        }

        @Override
        public void process(WatchedEvent event) throws Exception {
            // if client connect or disconnect to server, zookeeper will queue
            // watched event(Watcher.Event.EventType.None, .., path = null).
            if (event.getType() == Watcher.Event.EventType.None) {
                return;
            }

            if (childListener != null) {
                childListener.childChanged(path, client.getChildren().usingWatcher(this).forPath(path));
            }
        }
    }

    private class CuratorConnectionStateListener implements ConnectionStateListener {
        private final long UNKNOWN_SESSION_ID = -1L;

        private long lastSessionId;
        private int timeout;
        private int sessionExpireMs;

        public CuratorConnectionStateListener(URL url) {
            this.timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
            this.sessionExpireMs = url.getParameter(SESSION_KEY, DEFAULT_SESSION_TIMEOUT_MS);
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState state) {
            long sessionId = UNKNOWN_SESSION_ID;
            try {
                sessionId = client.getZookeeperClient().getZooKeeper().getSessionId();
            } catch (Exception e) {
                logger.warn("Curator client state changed, but failed to get the related zk session instance.");
            }

            if (state == ConnectionState.LOST) {
                logger.warn("Curator zookeeper session " + Long.toHexString(lastSessionId) + " expired.");
                Curator5ZookeeperClient.this.stateChanged(StateListener.SESSION_LOST);
            } else if (state == ConnectionState.SUSPENDED) {
                logger.warn("Curator zookeeper connection of session " + Long.toHexString(sessionId) + " timed out. " +
                        "connection timeout value is " + timeout + ", session expire timeout value is " + sessionExpireMs);
                Curator5ZookeeperClient.this.stateChanged(StateListener.SUSPENDED);
            } else if (state == ConnectionState.CONNECTED) {
                lastSessionId = sessionId;
                logger.info("Curator zookeeper client instance initiated successfully, session id is " + Long.toHexString(sessionId));
                Curator5ZookeeperClient.this.stateChanged(StateListener.CONNECTED);
            } else if (state == ConnectionState.RECONNECTED) {
                if (lastSessionId == sessionId && sessionId != UNKNOWN_SESSION_ID) {
                    logger.warn("Curator zookeeper connection recovered from connection lose, " +
                            "reuse the old session " + Long.toHexString(sessionId));
                    Curator5ZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                } else {
                    logger.warn("New session created after old session lost, " +
                            "old session " + Long.toHexString(lastSessionId) + ", new session " + Long.toHexString(sessionId));
                    lastSessionId = sessionId;
                    Curator5ZookeeperClient.this.stateChanged(StateListener.NEW_SESSION_CREATED);
                }
            }
        }

    }

    /**
     * just for unit test
     *
     * @return
     */
    CuratorFramework getClient() {
        return client;
    }
}
           

Apollo配置中心

public class ApolloDynamicConfiguration implements DynamicConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(ApolloDynamicConfiguration.class);
    private static final String APOLLO_ENV_KEY = "env";
    private static final String APOLLO_ADDR_KEY = "apollo.meta";
    private static final String APOLLO_CLUSTER_KEY = "apollo.cluster";
    private static final String APOLLO_PROTOCOL_PREFIX = "http://";
    private static final String APOLLO_APPLICATION_KEY = "application";
    private static final String APOLLO_APPID_KEY = "app.id";

    private final URL url;
    private final Config dubboConfig;
    private final ConfigFile dubboConfigFile;
    private final ConcurrentMap<String, ApolloListener> listeners = new ConcurrentHashMap<>();

    ApolloDynamicConfiguration(URL url) {
        this.url = url;
        // Instead of using Dubbo's configuration, I would suggest use the original configuration method Apollo provides.
        String configEnv = url.getParameter(APOLLO_ENV_KEY);
        String configAddr = getAddressWithProtocolPrefix(url);
        String configCluster = url.getParameter(CLUSTER_KEY);
        String configAppId = url.getParameter(APOLLO_APPID_KEY);
        if (StringUtils.isEmpty(System.getProperty(APOLLO_ENV_KEY)) && configEnv != null) {
            System.setProperty(APOLLO_ENV_KEY, configEnv);
        }
        if (StringUtils.isEmpty(System.getProperty(APOLLO_ADDR_KEY)) && !ANYHOST_VALUE.equals(url.getHost())) {
            System.setProperty(APOLLO_ADDR_KEY, configAddr);
        }
        if (StringUtils.isEmpty(System.getProperty(APOLLO_CLUSTER_KEY)) && configCluster != null) {
            System.setProperty(APOLLO_CLUSTER_KEY, configCluster);
        }
        if (StringUtils.isEmpty(System.getProperty(APOLLO_APPID_KEY)) && configAppId != null) {
            System.setProperty(APOLLO_APPID_KEY, configAppId);
        }

        String namespace = url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP);
        String apolloNamespace = StringUtils.isEmpty(namespace) ? url.getGroup(DEFAULT_GROUP) : namespace;
        dubboConfig = ConfigService.getConfig(apolloNamespace);
        dubboConfigFile = ConfigService.getConfigFile(apolloNamespace, ConfigFileFormat.Properties);

        // Decide to fail or to continue when failed to connect to remote server.
        boolean check = url.getParameter(CHECK_KEY, true);
        if (dubboConfig.getSourceType() != ConfigSourceType.REMOTE) {
            if (check) {
                throw new IllegalStateException("Failed to connect to config center, the config center is Apollo, " +
                    "the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv));
            } else {
                logger.warn("Failed to connect to config center, the config center is Apollo, " +
                    "the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv) +
                    ", will use the local cache value instead before eventually the connection is established.");
            }
        }
    }

    @Override
    public void close() {
        try {
            listeners.clear();
        } catch (UnsupportedOperationException e) {
            logger.warn("Failed to close connect from config center, the config center is Apollo");
        }
    }

    private String getAddressWithProtocolPrefix(URL url) {
        String address = url.getBackupAddress();
        if (StringUtils.isNotEmpty(address)) {
            address = Arrays.stream(COMMA_SPLIT_PATTERN.split(address))
                .map(addr -> {
                    if (addr.startsWith(APOLLO_PROTOCOL_PREFIX)) {
                        return addr;
                    }
                    return APOLLO_PROTOCOL_PREFIX + addr;
                })
                .collect(Collectors.joining(","));
        }
        return address;
    }

    /**
     * Since all governance rules will lay under dubbo group, this method now always uses the default dubboConfig and
     * ignores the group parameter.
     */
    @Override
    public void addListener(String key, String group, ConfigurationListener listener) {
        ApolloListener apolloListener = listeners.computeIfAbsent(group + key, k -> createTargetListener(key, group));
        apolloListener.addListener(listener);
        dubboConfig.addChangeListener(apolloListener, Collections.singleton(key));
    }

    @Override
    public void removeListener(String key, String group, ConfigurationListener listener) {
        ApolloListener apolloListener = listeners.get(group + key);
        if (apolloListener != null) {
            apolloListener.removeListener(listener);
            if (!apolloListener.hasInternalListener()) {
                dubboConfig.removeChangeListener(apolloListener);
            }
        }
    }

    @Override
    public String getConfig(String key, String group, long timeout) throws IllegalStateException {
        if (StringUtils.isNotEmpty(group)) {
            if (group.equals(url.getApplication())) {
                return ConfigService.getAppConfig().getProperty(key, null);
            } else {
                return ConfigService.getConfig(group).getProperty(key, null);
            }
        }
        return dubboConfig.getProperty(key, null);
    }

    /**
     * Recommend specify namespace and group when using Apollo.
     * <p>
     * <dubbo:config-center namespace="governance" group="dubbo" />, 'dubbo=governance' is for governance rules while
     * 'group=dubbo' is for properties files.
     *
     * @param key     default value is 'dubbo.properties', currently useless for Apollo.
     * @param group
     * @param timeout
     * @return
     * @throws IllegalStateException
     */
    @Override
    public String getProperties(String key, String group, long timeout) throws IllegalStateException {
        if (StringUtils.isEmpty(group)) {
            return dubboConfigFile.getContent();
        }
        if (group.equals(url.getApplication())) {
            return ConfigService.getConfigFile(APOLLO_APPLICATION_KEY, ConfigFileFormat.Properties).getContent();
        }

        ConfigFile configFile = ConfigService.getConfigFile(group, ConfigFileFormat.Properties);
        if (configFile == null) {
            throw new IllegalStateException("There is no namespace named " + group + " in Apollo.");
        }
        return configFile.getContent();
    }

    /**
     * This method will be used by Configuration to get valid value at runtime.
     * The group is expected to be 'app level', which can be fetched from the 'config.appnamespace' in url if necessary.
     * But I think Apollo's inheritance feature of namespace can solve the problem .
     */
    @Override
    public String getInternalProperty(String key) {
        return dubboConfig.getProperty(key, null);
    }

    /**
     * Ignores the group parameter.
     *
     * @param key   property key the native listener will listen on
     * @param group to distinguish different set of properties
     * @return
     */
    private ApolloListener createTargetListener(String key, String group) {
        return new ApolloListener();
    }

    public class ApolloListener implements ConfigChangeListener {

        private Set<ConfigurationListener> listeners = new CopyOnWriteArraySet<>();

        ApolloListener() {
        }

        @Override
        public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) {
            for (String key : changeEvent.changedKeys()) {
                ConfigChange change = changeEvent.getChange(key);
                if ("".equals(change.getNewValue())) {
                    logger.warn("an empty rule is received for " + key + ", the current working rule is " +
                        change.getOldValue() + ", the empty rule will not take effect.");
                    return;
                }

                ConfigChangedEvent event = new ConfigChangedEvent(key, change.getNamespace(), change.getNewValue(), getChangeType(change));
                listeners.forEach(listener -> listener.process(event));
            }
        }

        private ConfigChangeType getChangeType(ConfigChange change) {
            if (change.getChangeType() == PropertyChangeType.DELETED) {
                return ConfigChangeType.DELETED;
            }
            return ConfigChangeType.MODIFIED;
        }

        void addListener(ConfigurationListener configurationListener) {
            this.listeners.add(configurationListener);
        }

        void removeListener(ConfigurationListener configurationListener) {
            this.listeners.remove(configurationListener);
        }

        boolean hasInternalListener() {
            return listeners != null && listeners.size() > 0;
        }
    }

}
           

Nacos配置中心

public class NacosDynamicConfiguration implements DynamicConfiguration {

    private static final String GET_CONFIG_KEYS_PATH = "/v1/cs/configs";

    private final Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * the default timeout in millis to get config from nacos
     */
    private static final long DEFAULT_TIMEOUT = 5000L;

    private Properties nacosProperties;

    /**
     * The nacos configService
     */
    private final NacosConfigServiceWrapper configService;

    private HttpAgent httpAgent;

    /**
     * The map store the key to {@link NacosConfigListener} mapping
     */
    private final Map<String, NacosConfigListener> watchListenerMap;

    private MD5Utils md5Utils = new MD5Utils();

    NacosDynamicConfiguration(URL url) {
        this.nacosProperties = buildNacosProperties(url);
        this.configService = buildConfigService(url);
        this.httpAgent = getHttpAgent(configService.getConfigService());
        watchListenerMap = new ConcurrentHashMap<>();
    }

    private NacosConfigServiceWrapper buildConfigService(URL url) {
        ConfigService configService = null;
        try {
            configService = NacosFactory.createConfigService(nacosProperties);
        } catch (NacosException e) {
            if (logger.isErrorEnabled()) {
                logger.error(e.getErrMsg(), e);
            }
            throw new IllegalStateException(e);
        }
        return new NacosConfigServiceWrapper(configService);
    }

    private HttpAgent getHttpAgent(ConfigService configService) {
        HttpAgent agent = null;
        try {
            Field field = configService.getClass().getDeclaredField("agent");
            field.setAccessible(true);
            agent = (HttpAgent) field.get(configService);
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        return agent;
    }

    private Properties buildNacosProperties(URL url) {
        Properties properties = new Properties();
        setServerAddr(url, properties);
        setProperties(url, properties);
        return properties;
    }

    private void setServerAddr(URL url, Properties properties) {
        StringBuilder serverAddrBuilder =
                new StringBuilder(url.getHost()) // Host
                        .append(':')
                        .append(url.getPort()); // Port

        // Append backup parameter as other servers
        String backup = url.getParameter(BACKUP_KEY);
        if (backup != null) {
            serverAddrBuilder.append(',').append(backup);
        }
        String serverAddr = serverAddrBuilder.toString();
        properties.put(SERVER_ADDR, serverAddr);
    }

    private static void setProperties(URL url, Properties properties) {
        // Get the parameters from constants
        Map<String, String> parameters = url.getParameters(of(PropertyKeyConst.class));
        // Put all parameters
        properties.putAll(parameters);
    }

    private static void putPropertyIfAbsent(URL url, Properties properties, String propertyName) {
        String propertyValue = url.getParameter(propertyName);
        if (StringUtils.isNotEmpty(propertyValue)) {
            properties.setProperty(propertyName, propertyValue);
        }
    }

    private static void putPropertyIfAbsent(URL url, Properties properties, String propertyName, String defaultValue) {
        String propertyValue = url.getParameter(propertyName);
        if (StringUtils.isNotEmpty(propertyValue)) {
            properties.setProperty(propertyName, propertyValue);
        } else {
            properties.setProperty(propertyName, defaultValue);
        }
    }

    /**
     * Ignores the group parameter.
     *
     * @param key   property key the native listener will listen on
     * @param group to distinguish different set of properties
     * @return
     */
    private NacosConfigListener createTargetListener(String key, String group) {
        NacosConfigListener configListener = new NacosConfigListener();
        configListener.fillContext(key, group);
        return configListener;
    }

    @Override
    public void close() throws Exception {
        configService.shutdown();
    }

    @Override
    public void addListener(String key, String group, ConfigurationListener listener) {
        String listenerKey = buildListenerKey(key, group);
        NacosConfigListener nacosConfigListener =
                watchListenerMap.computeIfAbsent(listenerKey, k -> createTargetListener(key, group));
        nacosConfigListener.addListener(listener);
        try {
            configService.addListener(key, group, nacosConfigListener);
        } catch (NacosException e) {
            logger.error(e.getMessage());
        }
    }

    @Override
    public void removeListener(String key, String group, ConfigurationListener listener) {
        String listenerKey = buildListenerKey(key, group);
        NacosConfigListener eventListener = watchListenerMap.get(listenerKey);
        if (eventListener != null) {
            eventListener.removeListener(listener);
        }
    }

    @Override
    public String getConfig(String key, String group, long timeout) throws IllegalStateException {
        try {
            long nacosTimeout = timeout < 0 ? getDefaultTimeout() : timeout;
            if (StringUtils.isEmpty(group)) {
                group = DEFAULT_GROUP;
            }
            return configService.getConfig(key, group, nacosTimeout);
        } catch (NacosException e) {
            logger.error(e.getMessage());
        }
        return null;
    }

    @Override
    public ConfigItem getConfigItem(String key, String group) {
        String content = getConfig(key, group);
        String casMd5 = "";
        if (StringUtils.isNotEmpty(content)) {
            casMd5 = md5Utils.getMd5(content);
        }
        return new ConfigItem(content, casMd5);
    }

    @Override
    public Object getInternalProperty(String key) {
        try {
            return configService.getConfig(key, DEFAULT_GROUP, getDefaultTimeout());
        } catch (NacosException e) {
            logger.error(e.getMessage());
        }
        return null;
    }

    @Override
    public boolean publishConfig(String key, String group, String content) {
        boolean published = false;
        try {
            published = configService.publishConfig(key, group, content);
        } catch (NacosException e) {
            logger.error(e.getErrMsg(), e);
        }
        return published;
    }

    @Override
    public boolean publishConfigCas(String key, String group, String content, Object ticket) {
        try {
            if (!(ticket instanceof String)) {
                throw new IllegalArgumentException("nacos publishConfigCas requires string type ticket");
            }
            return configService.publishConfigCas(key, group, content, (String) ticket);
        } catch (NacosException e) {
            logger.warn("nacos publishConfigCas failed.", e);
            return false;
        }
    }

    @Override
    public long getDefaultTimeout() {
        return DEFAULT_TIMEOUT;
    }

    /**
     * TODO Nacos does not support atomic update of the value mapped to a key.
     *
     * @param group the specified group
     * @return
     */
    @Override
    public SortedSet<String> getConfigKeys(String group) {
        // TODO use Nacos Client API to replace HTTP Open API
        SortedSet<String> keys = new TreeSet<>();
        try {

            Map<String, String> paramsValues = new HashMap<>();
            paramsValues.put("search", "accurate");
            paramsValues.put("dataId", "");
            paramsValues.put("group", group.replace(SLASH_CHAR, HYPHEN_CHAR));
            paramsValues.put("pageNo", "1");
            paramsValues.put("pageSize", String.valueOf(Integer.MAX_VALUE));

            String encoding = getProperty(ENCODE, "UTF-8");

            HttpRestResult<String> result = httpAgent.httpGet(GET_CONFIG_KEYS_PATH, emptyMap(), paramsValues, encoding, 5 * 1000);
            Stream<String> keysStream = toKeysStream(result.getData());
            if (keysStream != null) {
                keysStream.forEach(keys::add);
            }
        } catch (Exception e) {
            if (logger.isErrorEnabled()) {
                logger.error(e.getMessage(), e);
            }
        }
        return keys;
    }


    @Override
    public boolean removeConfig(String key, String group) {
        boolean removed = false;
        try {
            removed = configService.removeConfig(key, group);
        } catch (NacosException e) {
            if (logger.isErrorEnabled()) {
                logger.error(e.getMessage(), e);
            }
        }
        return removed;
    }

    private Stream<String> toKeysStream(String content) {
        JSONObject jsonObject = JSON.parseObject(content);
        if (jsonObject == null) {
            return null;
        }
        JSONArray pageItems = jsonObject.getJSONArray("pageItems");
        if (pageItems == null) {
            return null;
        }
        return pageItems.stream()
                .map(object -> (JSONObject) object)
                .map(json -> json.getString("dataId"));
    }

    private String getProperty(String name, String defaultValue) {
        return nacosProperties.getProperty(name, defaultValue);
    }

    public class NacosConfigListener extends AbstractSharedListener {

        private Set<ConfigurationListener> listeners = new CopyOnWriteArraySet<>();
        /**
         * cache data to store old value
         */
        private Map<String, String> cacheData = new ConcurrentHashMap<>();

        @Override
        public Executor getExecutor() {
            return null;
        }

        /**
         * receive
         *
         * @param dataId     data ID
         * @param group      group
         * @param configInfo content
         */
        @Override
        public void innerReceive(String dataId, String group, String configInfo) {
            String oldValue = cacheData.get(dataId);
            ConfigChangedEvent event = new ConfigChangedEvent(dataId, group, configInfo, getChangeType(configInfo, oldValue));
            if (configInfo == null) {
                cacheData.remove(dataId);
            } else {
                cacheData.put(dataId, configInfo);
            }
            listeners.forEach(listener -> listener.process(event));
        }

        void addListener(ConfigurationListener configurationListener) {

            this.listeners.add(configurationListener);
        }

        void removeListener(ConfigurationListener configurationListener) {
            this.listeners.remove(configurationListener);
        }

        private ConfigChangeType getChangeType(String configInfo, String oldValue) {
            if (StringUtils.isBlank(configInfo)) {
                return ConfigChangeType.DELETED;
            }
            if (StringUtils.isBlank(oldValue)) {
                return ConfigChangeType.ADDED;
            }
            return ConfigChangeType.MODIFIED;
        }
    }

    protected String buildListenerKey(String key, String group) {
        return key + HYPHEN_CHAR + group;
    }
}
           

繼續閱讀