引入
本期文章暂时以
nacos-client 1.4.x
版本进行介绍,后期有时间再写一篇关于
2.x
加入
Grpc
版本的,文章较长,谢谢你的观看。
使用案例
public class ConfigExample {
public static void main(String[] args) throws NacosException, InterruptedException {
String serverAddr = "localhost";
String dataId = "test";
String group = "DEFAULT_GROUP";
Properties properties = new Properties();
// 指定server地址
properties.put("serverAddr", serverAddr);
ConfigService configService = NacosFactory.createConfigService(properties);
// 查询配置
String content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
// 监听配置
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("receive:" + configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
// 发布配置
boolean isPublishOk = configService.publishConfig(dataId, group, "content");
System.out.println(isPublishOk);
Thread.sleep(3000);
// 查询配置
content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
// 删除配置
boolean isRemoveOk = configService.removeConfig(dataId, group);
System.out.println(isRemoveOk);
Thread.sleep(3000);
// 查询配置
content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
Thread.sleep(300000);
}
}
Nacos
配置模型
Nacos
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLwUGZjhTZmdDZyYmN3ADN4QDNxQzN5gzN4EWYxUTZyAzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
-
(Namspace
),命名空间(租户),默认命名空间是Tenant
,一个命名空间可以包含多个public
,在Group
源码里有些变量是Nacos
租户;tenant
-
:组,默认分组是Group
,一个组可以包含多个DEFAULT_GROUP
;dataId
-
:译为数据dataId
,在id
中nacos
代表一整个配置文件,是配置的最小单位;DataId
以上值共同构成一个配置或者一个服务的唯一标识。
ConfigService
ConfigService
-
是ConfigService
暴露给客户端的配置服务接口,一个Nacos
配置中心 + 一个Nacos
= 一个Namespace
实例;ConfigService
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
properties.put("namespace", namespace);
ConfigService configService = NacosFactory.createConfigService(properties);
-
通过ConfigService
获取,每次ConfigFactory
出来的都是不同的实例,内部没有做任何缓存;create
- 主要是为了将
层单独拆分出来;nacos-api
- 主要是为了将
public class NacosFactory {
/**
* Create config service.
*/
public static ConfigService createConfigService(Properties properties) throws NacosException {
return ConfigFactory.createConfigService(properties);
}
}
public class ConfigFactory {
/**
* Create Config.
*/
public static ConfigService createConfigService(Properties properties) throws NacosException {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
}
}
-
包括定义了配置的增删查改,以及监听操作;ConfigService
public interface ConfigService {
String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener) throws NacosException;
boolean publishConfig(String dataId, String group, String content) throws NacosException;
boolean publishConfig(String dataId, String group, String content, String type) throws NacosException;
boolean removeConfig(String dataId, String group) throws NacosException;
void addListener(String dataId, String group, Listener listener) throws NacosException;
void removeListener(String dataId, String group, Listener listener);
String getServerStatus();
void shutDown() throws NacosException;
}
配置查询
Nacos
配置来源
- 首先
会本地getConfig()
文件查询,在Failover
文件存在的情况下,Failover
不会向远程发起查询;Nacos-client
- 在
文件不存在时,就会通过Failover
向远程发起查询获取配置,并将查询结果保存在ClientWorker
文件中;snapshot
- 在远程服务器不可达时,就会直接采用本地
文件;snapshot
Failover
文件在
Nacos
里是优先级最高的,如果
Failover
文件存在则不会使用
nacos
服务端的配置,永远会使用
Failover
文件,即使服务端的配置发生了变化.
Nacos
的
Failover
文件内容没有更新的入口,也就是说这个文件只能在文件系统中修改生效,生效时机在长轮询过程中。
源码定义
Nacos
客户端获取配置的入口方法是
NacosConfigService#getConfigInner
。
private final ClientWorker worker;
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
// group默认设置为DEFAULT_GROUP
group = null2defaultGroup(group);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// LEVEL1 : 使用本地文件系统的failover配置
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
if (content != null) {
cr.setContent(content);
content = cr.getContent();
return content;
}
// LEVEL2 : 读取config-server实时配置,并将snapshot保存到本地文件系统
try {
String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(ct[0]);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
// 非403错误进入LEVEL3
LOGGER.warn(...);
}
// LEVEL3 : 如果读取config-server发生非403Forbidden错误,使用本地snapshot
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
content = cr.getContent();
return content;
}
和
Failover
文件存储位置
Snapshot
${user.home)
下,以
_nacos
结尾的目录下,以
config-data-{namespace}
的就是
Failover
文件目录,以
snapshoy-{namespace}
的就是快照文件目录。
配置监听
组件调用图
-
内维护着一张ClientWorker
。一个唯一的配置Map<GroupKey, CacheData>
,GroupKey
由groupKey
、namespace
、group
共同组成;dataId
-
是一个配置缓存的抽象,它可能来源于CacheData
文件或Failover
文件,在创建的时候,只需要指定组和数据Snapshot
,它会默认去加载id
文件,它维护着一个Snapshot
列表,在配置发生变更时,可以对Listener
进行通知;Listener
-
内的长轮询任务由两个线程池共同完成;ClientWorker
-
- 第一个线程池只有一个线程,
触发一次,负责去监听10ms
的数量变化,判断需要的长轮询任务数,默认情况下认为一个长轮询任务负责Map<GroupKey, CacheData>
个3000
的监听;CacheData
- 第一个线程池只有一个线程,
-
- 第二个线程池拥有和机器相同核数的线程数,由第一个线程池触发任务,真正进行轮询校验和通知的线程;
- 外部程序调用
层添加对某个配置的监听器,本质上也是给API
增加一个ClientWorker
,在长轮询任务被检测而已;CacheData
- 长轮询任务中去检测服务端的配置是否更新,采用的是
的方式,长轮询任务不是死循环的,但是在任务提交后,就会马上提交下一次轮询,所以本质上也类似死循环;Open API
- 如果长轮询任务出现异常,那么会延迟
才会提交下一次轮询,而不会马上提交;2s
- 如果长轮询任务出现异常,那么会延迟
- 每个长轮询任务
都有一个唯一的LongPollingRunnable
,taskId
也会有一个CacheData
,用来对整体的taskId
进行分组,一个长轮询任务负责一组(一组最多允许CacheData
个);3000
ClientWorker
的属性
public class ClientWorker implements Closeable {
// 检测是否需要提交longPolling任务到executorService,如果需要则提交
// 这个线程池只有一个线程
final ScheduledExecutorService executor;
// 执行长轮询,一般情况下执行listener回调也是在这个线程里
// 这个线程池有和机器核数一样多的线程
final ScheduledExecutorService executorService;
// groupKey -> cacheData
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<String, CacheData>();
// httpClient, 用来向Server发起请求的
private final HttpAgent agent;
// 钩子管理器, 内部含有多个Filter,用来对ConfigResponse进行处理加工的
private final ConfigFilterChainManager configFilterChainManager;
// nacos服务端是否健康
private boolean isHealthServer = true;
// 长轮询超时时间 默认30s
private long timeout;
// 当前长轮询任务数量
private double currentLongingTaskCount = 0;
// 长轮询发生异常,默认延迟2s进行下次长轮询
private int taskPenaltyTime;
// 是否在添加监听器时,主动获取最新配置
private boolean enableRemoteSyncConfig = false;
}
CacheData
的属性
public class CacheData {
// agentName
private final String name;
// dataId
public final String dataId;
// group
public final String group;
// namespace
public final String tenant;
// 注册在这个配置上的监听器
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
// 配置的md5
private volatile String md5;
// 是否使用failover配置文件
private volatile boolean isUseLocalConfig = false;
// failover配置文件的上次更新时间戳
private volatile long localConfigLastModified;
// 配置
private volatile String content;
// 所属长轮询任务id
private int taskId;
// 是否正在初始化
private volatile boolean isInitializing = true;
// 配置文件类型 如:TEXT、JSON、YAML
private String type;
// 对查询配置的请求和响应提供钩子处理
private final ConfigFilterChainManager configFilterChainManager;
public CacheData(ConfigFilterChainManager configFilterChainManager,
String name, String dataId, String group,
String tenant) {
if (null == dataId || null == group) {
throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group);
}
this.name = name;
this.configFilterChainManager = configFilterChainManager;
this.dataId = dataId;
this.group = group;
this.tenant = tenant;
listeners = new CopyOnWriteArrayList<ManagerListenerWrap>();
this.isInitializing = true;
// 这里会从本地文件系统加载配置内容,failover > snapshot
this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
this.md5 = getMd5String(content);
}
}
ClientWorker
长轮询线程池任务
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
// 以下是第一个线程池, 只有一个线程, 负责去监听Map<GroupKey, CacheData>
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
public void checkConfigInfo() {
// cacheMap大小
int listenerSize = cacheMap.size();
// (cacheMap大小 / 3000)向上取整, 一个长轮询任务负责监听3000个配置
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
// 如果算出来任务数大于当前已经创建的任务数, 那么就加到标准为止
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 开启新的长轮询任务
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
LongPollingRunnable
长轮询任务
class LongPollingRunnable implements Runnable {
private final int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
// 当前长轮询任务负责的CacheData集合
List<CacheData> cacheDatas = new ArrayList<CacheData>();
// 正在初始化的CacheData 即刚构建的CacheData,内部的content仍然是snapshot版本
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// 1. 对于failover配置文件的处理
for (CacheData cacheData : cacheMap.values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
// 判断cacheData是否需要使用failover配置,设置isUseLocalConfigInfo
// 如果需要则更新内存中的配置
checkLocalConfig(cacheData);
// 使用failover配置则检测content内容是否发生变化,如果变化则通知监听器
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// 2. 对于所有非failover配置,执行长轮询,返回发生改变的groupKey
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
// 3. 对于发生改变的配置,查询实时配置并保存snapshot
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
// 4. 更新内存中的配置
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
} catch (NacosException ioe) {
LOGGER.error(message, ioe);
}
}
// 5. 对于非failover配置,触发监听器
for (CacheData cacheData : cacheDatas) {
// 排除failover文件
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
// 校验md5是否发生变化,如果发生变化通知listener
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
// 6-1. 都执行完成以后,再次提交长轮询任务
executorService.execute(this);
} catch (Throwable e) {
// 6-2. 如果长轮询执行发生异常,延迟2s执行下一次长轮询
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
结尾
综上,就是
Nacos SDK
在
1.4.X
对配置管理的高可用实现,但是实际使用过程中,因为长轮询导致性能问题。
其实
Server
端有配置变更推送机制,那能不能为何还需要客户端长轮询呢?是因为
Server
只能采用
UDP
方式,不可靠传输导致部分通知无法到达客户端,所以才客户端主动轮询进行补偿。
而在
2.X
(目前是
Nacos
正在进行
3.0
版本建设)版本,使用
grpc
的方式,摒弃了长轮询,在性能上远远超过
1.4.x
。
虽然,通知的方式变化了,但是
1.4.x+
的客户端是完全兼容
2.x
的服务端的,但是为了更好地发挥
Grpc
的性能优势,建议还是使用
2.x
的客户端和服务端,
Nacos
支持从
1.4.x
升级到
2.1.x
,但是需要进行一定的配置,比如
2.1.x
默认关闭了双写,需要打开等,具体可参考官方网站的手册:Nacos手册。
最后,文章较长,新手文手,谢谢你的耐心观看!