本文基于nacos-2.0.3版本
當用戶端設定
autoRefreshed = true
時,比如:
@NacosValue(value = "${XXX:XX}", autoRefreshed = true)
或者
@NacosPropertySource(dataId = "XXX", autoRefreshed = true)
服務端配置值發生變化,用戶端的屬性值也會跟着發生變化。這是如何做到的?本文将首先介紹@NacosPropertySource的原理,之後介紹@NacosValue。
本文目錄
- 1、@NacosPropertySource自動重新整理原理
- 2、@NacosValue自動重新整理原理
- 3、總結
1、@NacosPropertySource自動重新整理原理
在@NacosPropertySource的自動重新整理中,ClientWorker類起着非常關鍵的作用,其作用如下:
- 當設定
,ClientWorker提供了cacheMap在本地緩存這些需要自動重新整理的配置資料;autoRefreshed = true
- 提供了從服務端和本地擷取配置資料的方法,并提供一個定時任務,定時從服務端拉取配置資料。
先來看一下ClientWorker的構造方法:
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
//代碼删減
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
ClientWorker的構造方法建立了一個定時任務(每10ms運作一次),定時任務每次調用checkConfigInfo()方法:
public void checkConfigInfo() {
// Dispatch taskes.
//cacheMap裡面存放的是CacheData對象,
//當配置需要自動重新整理時,會在cacheMap裡面增加一條記錄
//cacheMap的key由groupId和dataId組成,value是CacheData
int listenerSize = cacheMap.size();
// Round up the longingTaskCount.
//将需要重新整理的資料分組,每3000個為一組,一組由一個線程處理
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// The task list is no order.So it maybe has issues when changing.
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
需要重新整理的資料分好組之後,交給LongPollingRunnable類處理。
在介紹LongPollingRunnable之前,我們先回過頭看一下NacosPropertySourcePostProcessor後處理器,該處理器專門處理注解@NacosPropertySource,該類提供了一個處理autoRefreshed的方法:
public static void addListenerIfAutoRefreshed(
final NacosPropertySource nacosPropertySource, final Properties properties,
final ConfigurableEnvironment environment) {
//如果設定不自動重新整理,直接傳回
if (!nacosPropertySource.isAutoRefreshed()) { // Disable Auto-Refreshed
return;
}
//代碼删減
try {
ConfigService configService = nacosServiceFactory
.createConfigService(properties);
//建立監聽器
Listener listener = new AbstractListener() {
@Override
public void receiveConfigInfo(String config) {
String name = nacosPropertySource.getName();
NacosPropertySource newNacosPropertySource = new NacosPropertySource(
dataId, groupId, name, config, type);
newNacosPropertySource.copy(nacosPropertySource);
MutablePropertySources propertySources = environment
.getPropertySources();
propertySources.replace(name, newNacosPropertySource);
}
};
//添加監聽器
if (configService instanceof EventPublishingConfigService) {
((EventPublishingConfigService) configService).addListener(dataId,
groupId, type, listener);
}
else {
configService.addListener(dataId, groupId, listener);
}
}
//代碼删減
}
addListenerIfAutoRefreshed()方法的最後調用了configService.addListener()方法,而configService.addListener()方法最終又會調用ClientWorker.addTenantListeners():
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
throws NacosException {
group = null2defaultGroup(group);
String tenant = agent.getTenant();
//建立CacheData
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
for (Listener listener : listeners) {
//将監聽器添加到CacheData中,當資料發生變化時,CacheData會通知這些監聽器
cache.addListener(listener);
}
}
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
String key = GroupKey.getKeyTenant(dataId, group, tenant);
CacheData cacheData = cacheMap.get(key);
if (cacheData != null) {
return cacheData;
}
cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
//cacheMap的key是由dataId, group, tenant三個參數組成的
CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
if (lastCacheData == null) {
if (enableRemoteSyncConfig) {
//通路服務端,從服務端拉取dataId, group對應的配置資料
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
cacheData.setContent(ct[0]);//配置内容儲存到CacheData中
}
//對目前的CacheData對象分組
int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
cacheData.setTaskId(taskId);
lastCacheData = cacheData;
}
//代碼删除
return lastCacheData;
}
通過addCacheDataIfAbsent()方法可以清晰的看到CacheData如何被建立以及儲存了哪些資料,而且CacheData的分組規則與LongPollingRunnable的分組規則一樣。
下面繼續分析LongPollingRunnable類。LongPollingRunnable實作了Runnable接口,下面我們重點分析其run()方法。
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
//代碼删減
//從伺服器上批量拉取本組内的配置發生變化的groupId和dataId
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
//周遊發生變化的配置
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
//從伺服器拉取發生變化的配置資料
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);//更新
if (null != ct[1]) {
cache.setType(ct[1]);
}
} catch (NacosException ioe) {
//代碼删減
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
//通知CacheData的監聽器
//監聽器的作用有:
//1、更改對象的屬性值
//2、替換spring容器的Environment裡面的PropertySource
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);//直接開始運作下次任務
} catch (Throwable e) {
//代碼删減
}
}
從上面的介紹可以看到,nacos配置自動更新依靠的是兩個定時任務,第一個定時任務是檢查是否有新的需要自動重新整理的配置;第二個定時任務是不斷通路服務端檢查是否有新的配置更新。
2、@NacosValue自動重新整理原理
在NacosValue中也有一個
autoRefreshed = true
的配置,這個配置起什麼作用,它和NacosPropertySource之間是什麼關系?要回答這些問題,先看一下nacos如何處理該注解。
nacos提供了NacosValueAnnotationBeanPostProcessor後處理器處理注解NacosValue,并且提供了doWithAnnotation()方法處理autoRefreshed ,下面看一下該方法源碼:
private void doWithAnnotation(String beanName, Object bean, NacosValue annotation,
int modifiers, Method method, Field field) {
if (annotation != null) {
if (Modifier.isStatic(modifiers)) {
return;
}
//判斷是否是自動重新整理
if (annotation.autoRefreshed()) {
String placeholder = resolvePlaceholder(annotation.value());
if (placeholder == null) {
return;
}
NacosValueTarget nacosValueTarget = new NacosValueTarget(bean, beanName,
method, field, annotation.value());
//如果屬性需自動重新整理,那麼将配置名字和nacosValueTarget放到placeholderNacosValueTargetMap中,
//placeholderNacosValueTargetMap是Map類型,其key為String,value為List<NacosValueTarget>
put2ListMap(placeholderNacosValueTargetMap, placeholder,
nacosValueTarget);
}
}
}
doWithAnnotation()将需要重新整理的對象和屬性放到了一個Map中。
如果我們看一下NacosValueAnnotationBeanPostProcessor處理器的定義,會發現該類實作了ApplicationListener<NacosConfigReceivedEvent>,這說明該處理器還監聽了NacosConfigReceivedEvent事件,而從伺服器拉取了更新的配置資料後,通知CacheData的監聽器時也會釋出NacosConfigReceivedEvent,是以當伺服器有更新的配置時,就會通知NacosValueAnnotationBeanPostProcessor。下面在來看一下該類的onApplicationEvent方法:
public void onApplicationEvent(NacosConfigReceivedEvent event) {
//周遊需要自動重新整理的屬性
for (Map.Entry<String, List<NacosValueTarget>> entry : placeholderNacosValueTargetMap
.entrySet()) {
String key = environment.resolvePlaceholders(entry.getKey());
String newValue = environment.getProperty(key);
if (newValue == null) {
continue;
}
List<NacosValueTarget> beanPropertyList = entry.getValue();
for (NacosValueTarget target : beanPropertyList) {
String md5String = MD5Utils.md5Hex(newValue, "UTF-8");
boolean isUpdate = !target.lastMD5.equals(md5String);
if (isUpdate) {
target.updateLastMD5(md5String);
Object evaluatedValue = resolveNotifyValue(target.nacosValueExpr, key, newValue);
if (target.method == null) {
setField(target, evaluatedValue);//更新屬性值
}
else {
setMethod(target, evaluatedValue);//調用方法更新
}
}
}
}
}
在onApplicationEvent()方法中,可以清晰的看到重新整理配置的邏輯。
3、總結
通過上面的介紹,可以發現自動重新整理是NacosValue和NacosPropertySource兩個共同作用的結果,配置NacosPropertySource自動重新整理,可以定時從伺服器拉取groupId和dataId對應的配置内容,配置NacosValue自動重新整理,可以更新指定對象的屬性。