天天看点

Nacos-Client如何实现高可用

引入

本期文章暂时以

nacos-client 1.4.x

版本进行介绍,后期有时间再写一篇关于

2.x

加入

Grpc

版本的,文章较长,谢谢你的观看。

使用案例

public class ConfigExample {

    public static void main(String[] args) throws NacosException, InterruptedException {
        String serverAddr = "localhost";
        String dataId = "test";
        String group = "DEFAULT_GROUP";
        Properties properties = new Properties();
        // 指定server地址
        properties.put("serverAddr", serverAddr);
        ConfigService configService = NacosFactory.createConfigService(properties);
        // 查询配置
        String content = configService.getConfig(dataId, group, 5000);
        System.out.println(content);
        // 监听配置
        configService.addListener(dataId, group, new Listener() {
            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println("receive:" + configInfo);
            }

            @Override
            public Executor getExecutor() {
                return null;
            }
        });
		// 发布配置
        boolean isPublishOk = configService.publishConfig(dataId, group, "content");
        System.out.println(isPublishOk);
		
        Thread.sleep(3000);
        // 查询配置
        content = configService.getConfig(dataId, group, 5000);
        System.out.println(content);
		
        // 删除配置
        boolean isRemoveOk = configService.removeConfig(dataId, group);
        System.out.println(isRemoveOk);
        Thread.sleep(3000);
		
        // 查询配置
        content = configService.getConfig(dataId, group, 5000);
        System.out.println(content);
        Thread.sleep(300000);

    }
}
           

Nacos

配置模型

Nacos-Client如何实现高可用
  • Namspace

    Tenant

    ),命名空间(租户),默认命名空间是

    public

    ,一个命名空间可以包含多个

    Group

    ,在

    Nacos

    源码里有些变量是

    tenant

    租户;
  • Group

    :组,默认分组是

    DEFAULT_GROUP

    ,一个组可以包含多个

    dataId

  • dataId

    :译为数据

    id

    ,在

    nacos

    DataId

    代表一整个配置文件,是配置的最小单位;

以上值共同构成一个配置或者一个服务的唯一标识。

ConfigService

  • ConfigService

    Nacos

    暴露给客户端的配置服务接口,一个

    Nacos

    配置中心 + 一个

    Namespace

    = 一个

    ConfigService

    实例;
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
properties.put("namespace", namespace);
ConfigService configService = NacosFactory.createConfigService(properties);
           
  • ConfigService

    通过

    ConfigFactory

    获取,每次

    create

    出来的都是不同的实例,内部没有做任何缓存;
    • 主要是为了将

      nacos-api

      层单独拆分出来;
public class NacosFactory {
    /**
     * Create config service.
     */
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        return ConfigFactory.createConfigService(properties);
    }
}
public class ConfigFactory {
    /**
     * Create Config.
     */
    public static ConfigService createConfigService(Properties properties) throws NacosException {
      Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
      Constructor constructor = driverImplClass.getConstructor(Properties.class);
      ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
      return vendorImpl;
    }
}
           
  • ConfigService

    包括定义了配置的增删查改,以及监听操作;
public interface ConfigService {
    String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
    String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener) throws NacosException;
    boolean publishConfig(String dataId, String group, String content) throws NacosException;
    boolean publishConfig(String dataId, String group, String content, String type) throws NacosException;
    boolean removeConfig(String dataId, String group) throws NacosException;
    void addListener(String dataId, String group, Listener listener) throws NacosException;
    void removeListener(String dataId, String group, Listener listener);
    String getServerStatus();
    void shutDown() throws NacosException;
}
           

配置查询

Nacos

配置来源
  • 首先

    getConfig()

    会本地

    Failover

    文件查询,在

    Failover

    文件存在的情况下,

    Nacos-client

    不会向远程发起查询;
  • Failover

    文件不存在时,就会通过

    ClientWorker

    向远程发起查询获取配置,并将查询结果保存在

    snapshot

    文件中;
  • 在远程服务器不可达时,就会直接采用本地

    snapshot

    文件;

Failover

文件在

Nacos

里是优先级最高的,如果

Failover

文件存在则不会使用

nacos

服务端的配置,永远会使用

Failover

文件,即使服务端的配置发生了变化.

Nacos

Failover

文件内容没有更新的入口,也就是说这个文件只能在文件系统中修改生效,生效时机在长轮询过程中。

Nacos-Client如何实现高可用
源码定义

Nacos

客户端获取配置的入口方法是

NacosConfigService#getConfigInner

private final ClientWorker worker;
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
    // group默认设置为DEFAULT_GROUP
    group = null2defaultGroup(group); 
    ConfigResponse cr = new ConfigResponse();
    cr.setDataId(dataId);
    cr.setTenant(tenant);
    cr.setGroup(group);

    // LEVEL1 : 使用本地文件系统的failover配置
    String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
    if (content != null) {
        cr.setContent(content);
        content = cr.getContent();
        return content;
    }

    // LEVEL2 : 读取config-server实时配置,并将snapshot保存到本地文件系统
    try {
        String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
        cr.setContent(ct[0]);
        content = cr.getContent();
        return content;
    } catch (NacosException ioe) {
        if (NacosException.NO_RIGHT == ioe.getErrCode()) {
            throw ioe;
        }
        // 非403错误进入LEVEL3
        LOGGER.warn(...);
    }

    // LEVEL3 : 如果读取config-server发生非403Forbidden错误,使用本地snapshot
    content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
    cr.setContent(content);
    content = cr.getContent();
    return content;
}
           

Failover

Snapshot

文件存储位置

${user.home)

下,以

_nacos

结尾的目录下,以

config-data-{namespace}

的就是

Failover

文件目录,以

snapshoy-{namespace}

的就是快照文件目录。

配置监听

组件调用图
Nacos-Client如何实现高可用
  • ClientWorker

    内维护着一张

    Map<GroupKey, CacheData>

    。一个唯一的配置

    GroupKey

    groupKey

    namespace

    group

    dataId

    共同组成;
  • CacheData

    是一个配置缓存的抽象,它可能来源于

    Failover

    文件或

    Snapshot

    文件,在创建的时候,只需要指定组和数据

    id

    ,它会默认去加载

    Snapshot

    文件,它维护着一个

    Listener

    列表,在配置发生变更时,可以对

    Listener

    进行通知;
  • ClientWorker

    内的长轮询任务由两个线程池共同完成;
    • 第一个线程池只有一个线程,

      10ms

      触发一次,负责去监听

      Map<GroupKey, CacheData>

      的数量变化,判断需要的长轮询任务数,默认情况下认为一个长轮询任务负责

      3000

      CacheData

      的监听;
    • 第二个线程池拥有和机器相同核数的线程数,由第一个线程池触发任务,真正进行轮询校验和通知的线程;
  • 外部程序调用

    API

    层添加对某个配置的监听器,本质上也是给

    ClientWorker

    增加一个

    CacheData

    ,在长轮询任务被检测而已;
  • 长轮询任务中去检测服务端的配置是否更新,采用的是

    Open API

    的方式,长轮询任务不是死循环的,但是在任务提交后,就会马上提交下一次轮询,所以本质上也类似死循环;
    • 如果长轮询任务出现异常,那么会延迟

      2s

      才会提交下一次轮询,而不会马上提交;
  • 每个长轮询任务

    LongPollingRunnable

    都有一个唯一的

    taskId

    CacheData

    也会有一个

    taskId

    ,用来对整体的

    CacheData

    进行分组,一个长轮询任务负责一组(一组最多允许

    3000

    个);

ClientWorker

的属性
public class ClientWorker implements Closeable {
	// 检测是否需要提交longPolling任务到executorService,如果需要则提交
    // 这个线程池只有一个线程
    final ScheduledExecutorService executor;
    
    // 执行长轮询,一般情况下执行listener回调也是在这个线程里
    // 这个线程池有和机器核数一样多的线程
    final ScheduledExecutorService executorService;
    
    // groupKey -> cacheData
    private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<String, CacheData>();
    
    // httpClient, 用来向Server发起请求的
    private final HttpAgent agent;
    
    // 钩子管理器, 内部含有多个Filter,用来对ConfigResponse进行处理加工的
    private final ConfigFilterChainManager configFilterChainManager;
    
    // nacos服务端是否健康
    private boolean isHealthServer = true;
    
    // 长轮询超时时间 默认30s
    private long timeout;
    
    // 当前长轮询任务数量
    private double currentLongingTaskCount = 0;
    
    // 长轮询发生异常,默认延迟2s进行下次长轮询
    private int taskPenaltyTime;
    
    // 是否在添加监听器时,主动获取最新配置
    private boolean enableRemoteSyncConfig = false;
}
           

CacheData

的属性
public class CacheData {
    // agentName
    private final String name;
    
    // dataId
    public final String dataId;
    
    // group
    public final String group;
    
    // namespace
    public final String tenant;
    
    // 注册在这个配置上的监听器
    private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
    
    // 配置的md5
    private volatile String md5;
    
    // 是否使用failover配置文件
    private volatile boolean isUseLocalConfig = false;
    
    // failover配置文件的上次更新时间戳
    private volatile long localConfigLastModified;
    
    // 配置
    private volatile String content;
    
    // 所属长轮询任务id
    private int taskId;
    
    // 是否正在初始化
    private volatile boolean isInitializing = true;
    
    // 配置文件类型 如:TEXT、JSON、YAML
    private String type;
    
    // 对查询配置的请求和响应提供钩子处理
    private final ConfigFilterChainManager configFilterChainManager;
    
    public CacheData(ConfigFilterChainManager configFilterChainManager, 
                     String name, String dataId, String group,
        String tenant) {
        if (null == dataId || null == group) {
            throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group);
        }
        this.name = name;
        this.configFilterChainManager = configFilterChainManager;
        this.dataId = dataId;
        this.group = group;
        this.tenant = tenant;
        listeners = new CopyOnWriteArrayList<ManagerListenerWrap>();
        this.isInitializing = true;
        // 这里会从本地文件系统加载配置内容,failover > snapshot
        this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
        this.md5 = getMd5String(content);
    }

}
           

ClientWorker

长轮询线程池任务
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
     // 以下是第一个线程池, 只有一个线程, 负责去监听Map<GroupKey, CacheData>
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

public void checkConfigInfo() {
    // cacheMap大小
    int listenerSize = cacheMap.size();
    // (cacheMap大小 / 3000)向上取整, 一个长轮询任务负责监听3000个配置
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    // 如果算出来任务数大于当前已经创建的任务数, 那么就加到标准为止
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // 开启新的长轮询任务
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}
           

LongPollingRunnable

长轮询任务
class LongPollingRunnable implements Runnable {
    private final int taskId;
    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }
    @Override
    public void run() {
        // 当前长轮询任务负责的CacheData集合
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        // 正在初始化的CacheData 即刚构建的CacheData,内部的content仍然是snapshot版本
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // 1. 对于failover配置文件的处理
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        // 判断cacheData是否需要使用failover配置,设置isUseLocalConfigInfo
                        // 如果需要则更新内存中的配置
                        checkLocalConfig(cacheData);
                        // 使用failover配置则检测content内容是否发生变化,如果变化则通知监听器
                        if (cacheData.isUseLocalConfigInfo()) {
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }

            // 2. 对于所有非failover配置,执行长轮询,返回发生改变的groupKey
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    // 3. 对于发生改变的配置,查询实时配置并保存snapshot
                    String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                    // 4. 更新内存中的配置
                    CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(ct[0]);
                    if (null != ct[1]) {
                        cache.setType(ct[1]);
                    }
                } catch (NacosException ioe) {
                    LOGGER.error(message, ioe);
                }
            }
            // 5. 对于非failover配置,触发监听器
            for (CacheData cacheData : cacheDatas) {
                // 排除failover文件
                if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                    // 校验md5是否发生变化,如果发生变化通知listener
                    cacheData.checkListenerMd5();
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();
            // 6-1. 都执行完成以后,再次提交长轮询任务
            executorService.execute(this);
        } catch (Throwable e) {
            // 6-2. 如果长轮询执行发生异常,延迟2s执行下一次长轮询
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }
}
           

结尾

综上,就是

Nacos SDK

1.4.X

对配置管理的高可用实现,但是实际使用过程中,因为长轮询导致性能问题。

其实

Server

端有配置变更推送机制,那能不能为何还需要客户端长轮询呢?是因为

Server

只能采用

UDP

方式,不可靠传输导致部分通知无法到达客户端,所以才客户端主动轮询进行补偿。

而在

2.X

(目前是

Nacos

正在进行

3.0

版本建设)版本,使用

grpc

的方式,摒弃了长轮询,在性能上远远超过

1.4.x

虽然,通知的方式变化了,但是

1.4.x+

的客户端是完全兼容

2.x

的服务端的,但是为了更好地发挥

Grpc

的性能优势,建议还是使用

2.x

的客户端和服务端,

Nacos

支持从

1.4.x

升级到

2.1.x

,但是需要进行一定的配置,比如

2.1.x

默认关闭了双写,需要打开等,具体可参考官方网站的手册:Nacos手册。

最后,文章较长,新手文手,谢谢你的耐心观看!