天天看點

nacos解析-詳解配置自動重新整理原理1、@NacosPropertySource自動重新整理原理2、@NacosValue自動重新整理原理3、總結

本文基于nacos-2.0.3版本

當用戶端設定

autoRefreshed = true

時,比如:

@NacosValue(value = "${XXX:XX}", autoRefreshed = true)
或者
@NacosPropertySource(dataId = "XXX", autoRefreshed = true)
           

服務端配置值發生變化,用戶端的屬性值也會跟着發生變化。這是如何做到的?本文将首先介紹@NacosPropertySource的原理,之後介紹@NacosValue。

本文目錄

  • 1、@NacosPropertySource自動重新整理原理
  • 2、@NacosValue自動重新整理原理
  • 3、總結

1、@NacosPropertySource自動重新整理原理

在@NacosPropertySource的自動重新整理中,ClientWorker類起着非常關鍵的作用,其作用如下:

  1. 當設定

    autoRefreshed = true

    ,ClientWorker提供了cacheMap在本地緩存這些需要自動重新整理的配置資料;
  2. 提供了從服務端和本地擷取配置資料的方法,并提供一個定時任務,定時從服務端拉取配置資料。

先來看一下ClientWorker的構造方法:

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
            final Properties properties) {
        //代碼删減
        
        this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }
           

ClientWorker的構造方法建立了一個定時任務(每10ms運作一次),定時任務每次調用checkConfigInfo()方法:

public void checkConfigInfo() {
        // Dispatch taskes.
        //cacheMap裡面存放的是CacheData對象,
        //當配置需要自動重新整理時,會在cacheMap裡面增加一條記錄
        //cacheMap的key由groupId和dataId組成,value是CacheData
        int listenerSize = cacheMap.size();
        // Round up the longingTaskCount.
        //将需要重新整理的資料分組,每3000個為一組,一組由一個線程處理
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // The task list is no order.So it maybe has issues when changing.
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }
           

需要重新整理的資料分好組之後,交給LongPollingRunnable類處理。

在介紹LongPollingRunnable之前,我們先回過頭看一下NacosPropertySourcePostProcessor後處理器,該處理器專門處理注解@NacosPropertySource,該類提供了一個處理autoRefreshed的方法:

public static void addListenerIfAutoRefreshed(
			final NacosPropertySource nacosPropertySource, final Properties properties,
			final ConfigurableEnvironment environment) {
		//如果設定不自動重新整理,直接傳回
		if (!nacosPropertySource.isAutoRefreshed()) { // Disable Auto-Refreshed
			return;
		}

		//代碼删減
		
		try {

			ConfigService configService = nacosServiceFactory
					.createConfigService(properties);
			//建立監聽器
			Listener listener = new AbstractListener() {

				@Override
				public void receiveConfigInfo(String config) {
					String name = nacosPropertySource.getName();
					NacosPropertySource newNacosPropertySource = new NacosPropertySource(
							dataId, groupId, name, config, type);
					newNacosPropertySource.copy(nacosPropertySource);
					MutablePropertySources propertySources = environment
							.getPropertySources();
					propertySources.replace(name, newNacosPropertySource);
				}
			};
			//添加監聽器
			if (configService instanceof EventPublishingConfigService) {
				((EventPublishingConfigService) configService).addListener(dataId,
						groupId, type, listener);
			}
			else {
				configService.addListener(dataId, groupId, listener);
			}

		}
		//代碼删減
	}
           

addListenerIfAutoRefreshed()方法的最後調用了configService.addListener()方法,而configService.addListener()方法最終又會調用ClientWorker.addTenantListeners():

public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
            throws NacosException {
        group = null2defaultGroup(group);
        String tenant = agent.getTenant();
        //建立CacheData
        CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
        for (Listener listener : listeners) {
        	//将監聽器添加到CacheData中,當資料發生變化時,CacheData會通知這些監聽器
            cache.addListener(listener);
        }
    }
    public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
        String key = GroupKey.getKeyTenant(dataId, group, tenant);
        CacheData cacheData = cacheMap.get(key);
        if (cacheData != null) {
            return cacheData;
        }
    
        cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
        //cacheMap的key是由dataId, group, tenant三個參數組成的
        CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
        if (lastCacheData == null) {
            if (enableRemoteSyncConfig) {
            	//通路服務端,從服務端拉取dataId, group對應的配置資料
                String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                cacheData.setContent(ct[0]);//配置内容儲存到CacheData中
            }
            //對目前的CacheData對象分組
            int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
            cacheData.setTaskId(taskId);
            lastCacheData = cacheData;
        }
        //代碼删除
        return lastCacheData;
    }
           

通過addCacheDataIfAbsent()方法可以清晰的看到CacheData如何被建立以及儲存了哪些資料,而且CacheData的分組規則與LongPollingRunnable的分組規則一樣。

下面繼續分析LongPollingRunnable類。LongPollingRunnable實作了Runnable接口,下面我們重點分析其run()方法。

public void run() {
      List<CacheData> cacheDatas = new ArrayList<CacheData>();
      List<String> inInitializingCacheList = new ArrayList<String>();
      try {
          //代碼删減
          
          //從伺服器上批量拉取本組内的配置發生變化的groupId和dataId
          List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
         //周遊發生變化的配置
          for (String groupKey : changedGroupKeys) {
              String[] key = GroupKey.parseKey(groupKey);
              String dataId = key[0];
              String group = key[1];
              String tenant = null;
              if (key.length == 3) {
                  tenant = key[2];
              }
              try {
              		//從伺服器拉取發生變化的配置資料
                  String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                  CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                  cache.setContent(ct[0]);//更新
                  if (null != ct[1]) {
                      cache.setType(ct[1]);
                  }
                 
              } catch (NacosException ioe) {
                  //代碼删減
              }
          }
          for (CacheData cacheData : cacheDatas) {
              if (!cacheData.isInitializing() || inInitializingCacheList
                      .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                  //通知CacheData的監聽器
                  //監聽器的作用有:
                  //1、更改對象的屬性值
                  //2、替換spring容器的Environment裡面的PropertySource
                  cacheData.checkListenerMd5();
                  cacheData.setInitializing(false);
              }
          }
          inInitializingCacheList.clear();
          executorService.execute(this);//直接開始運作下次任務
      } catch (Throwable e) {
          //代碼删減
      }
  }
           

從上面的介紹可以看到,nacos配置自動更新依靠的是兩個定時任務,第一個定時任務是檢查是否有新的需要自動重新整理的配置;第二個定時任務是不斷通路服務端檢查是否有新的配置更新。

2、@NacosValue自動重新整理原理

在NacosValue中也有一個

autoRefreshed = true

的配置,這個配置起什麼作用,它和NacosPropertySource之間是什麼關系?要回答這些問題,先看一下nacos如何處理該注解。

nacos提供了NacosValueAnnotationBeanPostProcessor後處理器處理注解NacosValue,并且提供了doWithAnnotation()方法處理autoRefreshed ,下面看一下該方法源碼:

private void doWithAnnotation(String beanName, Object bean, NacosValue annotation,
			int modifiers, Method method, Field field) {
		if (annotation != null) {
			if (Modifier.isStatic(modifiers)) {
				return;
			}
			//判斷是否是自動重新整理
			if (annotation.autoRefreshed()) {
				String placeholder = resolvePlaceholder(annotation.value());

				if (placeholder == null) {
					return;
				}

				NacosValueTarget nacosValueTarget = new NacosValueTarget(bean, beanName,
						method, field, annotation.value());
				//如果屬性需自動重新整理,那麼将配置名字和nacosValueTarget放到placeholderNacosValueTargetMap中,
				//placeholderNacosValueTargetMap是Map類型,其key為String,value為List<NacosValueTarget>
				put2ListMap(placeholderNacosValueTargetMap, placeholder,
						nacosValueTarget);
			}
		}
	}
           

doWithAnnotation()将需要重新整理的對象和屬性放到了一個Map中。

如果我們看一下NacosValueAnnotationBeanPostProcessor處理器的定義,會發現該類實作了ApplicationListener<NacosConfigReceivedEvent>,這說明該處理器還監聽了NacosConfigReceivedEvent事件,而從伺服器拉取了更新的配置資料後,通知CacheData的監聽器時也會釋出NacosConfigReceivedEvent,是以當伺服器有更新的配置時,就會通知NacosValueAnnotationBeanPostProcessor。下面在來看一下該類的onApplicationEvent方法:

public void onApplicationEvent(NacosConfigReceivedEvent event) {
		//周遊需要自動重新整理的屬性
		for (Map.Entry<String, List<NacosValueTarget>> entry : placeholderNacosValueTargetMap
				.entrySet()) {
			String key = environment.resolvePlaceholders(entry.getKey());
			String newValue = environment.getProperty(key);

			if (newValue == null) {
				continue;
			}
			List<NacosValueTarget> beanPropertyList = entry.getValue();
			for (NacosValueTarget target : beanPropertyList) {
				String md5String = MD5Utils.md5Hex(newValue, "UTF-8");
				boolean isUpdate = !target.lastMD5.equals(md5String);
				if (isUpdate) {
					target.updateLastMD5(md5String);
					Object evaluatedValue = resolveNotifyValue(target.nacosValueExpr, key, newValue);
					if (target.method == null) {
						setField(target, evaluatedValue);//更新屬性值
					}
					else {
						setMethod(target, evaluatedValue);//調用方法更新
					}
				}
			}
		}
	}
           

在onApplicationEvent()方法中,可以清晰的看到重新整理配置的邏輯。

3、總結

通過上面的介紹,可以發現自動重新整理是NacosValue和NacosPropertySource兩個共同作用的結果,配置NacosPropertySource自動重新整理,可以定時從伺服器拉取groupId和dataId對應的配置内容,配置NacosValue自動重新整理,可以更新指定對象的屬性。