天天看點

Nacos源碼分析三、配置中心(2)

從NacosConfigService的構造方法裡

this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
           

這行代碼開始,看下ClientWorker的初始化做了什麼:

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
    // http代理
    this.agent = agent;
    // 過濾器
    this.configFilterChainManager = configFilterChainManager;
    
    // Initialize the timeout parameter
    // 初始化配置
    init(properties);

    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    // cpu核數的線程,用來做長輪詢的,每次檢查配置,如果LongPollingRunnable任務的配置緩存超過一定數量,預設3000個,就要去開啟一個新任務去檢查配置
    // Runtime.getRuntime().availableProcessors()擷取cpu核數
    this.executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });

    // 10毫秒的任務,檢查配置資訊 LongPollingRunnable
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}
           

這裡定義了兩個線程池,executorService這裡隻定義了,還沒放入線程,用來做長輪詢的,核心線程數是CPU核數;executor每10毫秒執行一次checkConfigInfo方法。

我們看一下checkConfigInfo這個方法:

public void checkConfigInfo() {
    // Dispatch taskes.
    // 監聽的數量
    int listenerSize = cacheMap.get().size();
    // Round up the longingTaskCount.
    // 監聽數量/3000 向上取整
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            taskIdSet.add(i);
            // 循環3000次,建一個新的
            // LongPollingRunnable 長連結監聽
            // 每個LongPollingRunnable預設可以負責3000個監聽器的輪詢
            executorService.execute(new LongPollingRunnable(i));
        }
    } else if (longingTaskCount < currentLongingTaskCount) {
        for (int i = longingTaskCount; i < (int) currentLongingTaskCount; i++) {
            taskIdSet.remove(i);
        }
    }
    currentLongingTaskCount = longingTaskCount;
}
           

ParamUtil.getPerTaskConfigSize()這個預設是3000。

這裡的邏輯是這樣的,根據監聽器的數量建立長輪詢任務,每3000個監聽建一個任務并放入到executorService裡。

監聽器在cacheMap裡,後面我們看addListener方法時會看到寫入這個緩存的操作。

我們看一下LongPollingRunnable這個:

@Override
public void run() {
    
    List<CacheData> cacheDatas = new ArrayList<CacheData>();

    List<String> inInitializingCacheList = new ArrayList<String>();
    try {
        // check failover config
        for (CacheData cacheData : cacheMap.get().values()) {
            //屬于目前長輪詢任務的
            if (cacheData.getTaskId() == taskId) {
                cacheDatas.add(cacheData);
                try {
                    checkLocalConfig(cacheData);
                    //用本地配置
                    if (cacheData.isUseLocalConfigInfo()) {
                        //有改變的話會通知
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception e) {
                    LOGGER.error("get local config info error", e);
                }
            }
        }

        //擷取有變化的配置清單dataid+group,通路的url是/listener
        // check server config
        List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
        if (!CollectionUtils.isEmpty(changedGroupKeys)) {
            LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
        }

        //輪詢有配置改變的,然後去擷取内容
        for (String groupKey : changedGroupKeys) {
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                //有更新的就擷取一次配置
                String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                //設定配置内容
                cache.setContent(ct[0]);
                if (null != ct[1]) {
                    //設定配置類型
                    cache.setType(ct[1]);
                }
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                        agent.getName(), dataId, group, tenant, cache.getMd5(),
                        ContentUtils.truncateContent(ct[0]), ct[1]);
            } catch (NacosException ioe) {
                String message = String
                        .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                agent.getName(), dataId, group, tenant);
                LOGGER.error(message, ioe);
            }
        }
        //不是初始化中的或者初始化集合裡存在的
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isInitializing() || inInitializingCacheList
                    .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                //檢查是否有變化,有變化就通知
                cacheData.checkListenerMd5();
                //請求過了後就設定為不在初始化中,這樣就會被挂起,如果伺服器配置有更新,就會立即傳回,這樣就可以實作動态配置更新,又不會太多的空輪詢消耗
                cacheData.setInitializing(false);
            }
        }
        inInitializingCacheList.clear();
        
        if (taskIdSet.contains(taskId)) {
            executorService.execute(this);
        }
        
    } catch (Throwable e) {
        
        // If the rotation training task is abnormal, the next execution time of the task will be punished
        LOGGER.error("longPolling error : ", e);
        executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}
           
  1. 目前taskId下的所有cacheData都取出來
  2. 本地配置驗證,有需要發送監聽通知checkListenerMd5
  3. checkUpdateDataIds遠端調用服務端檢查是否有變更的
  4. 有的話getServerConfig擷取新的配置,有需要的話發送監聽通知checkListenerMd5
  5. 将目前線程再放回線程池中executorService.execute(this);

所謂的長輪詢,簡單來了解就是這個LongPollingRunnable線程會一直執行,每次都會去服務端擷取和目前taskId相關的配置變更清單,如發現變更就取新的來進行更新。

我們看一下checkUpdateDataIds方法:

List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
    //把配置資訊都連起來,一次請求
    StringBuilder sb = new StringBuilder();
    for (CacheData cacheData : cacheDatas) {
        //不用本地的
        if (!cacheData.isUseLocalConfigInfo()) {
            sb.append(cacheData.dataId).append(WORD_SEPARATOR);
            sb.append(cacheData.group).append(WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            if (cacheData.isInitializing()) {
                // It updates when cacheData occours in cacheMap by first time.
                // cacheData 首次出現在cacheMap中&首次check更新
                inInitializingCacheList
                        .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
            }
        }
    }
    //是否是初始化的擷取标記
    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
           

然後是checkUpdateConfigStr方法:

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    
    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    headers.put("Long-Pulling-Timeout", "" + timeout);
    
    // told server do not hang me up if new initializing cacheData added in
    //是初始化的會設定一個請求頭标記
    // 初始化時不挂起
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup", "true");
    }
    
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }
    
    try {
        // In order to prevent the server from handling the delay of the client's long task,
        // increase the client's read timeout to avoid this problem.
        // 增加逾時時間,防止被挂起,隻有初始化的時候isInitializingCacheList=true不會挂起,
        // 應該是伺服器看了請求頭Long-Pulling-Timeout-No-Hangup才不會挂起
        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
        HttpRestResult<String> result = agent
                .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                        readTimeoutMs);
        
        if (result.ok()) {
            setHealthServer(true);
            return parseUpdateDataIdResponse(result.getData());
        } else {
            setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),
                    result.getCode());
        }
    } catch (Exception e) {
        setHealthServer(false);
        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
        throw e;
    }
    return Collections.emptyList();
}
           

可以看到就是/v1/cs/configs/listener接口。響應實際上是dataId和group集合,也就是配置的坐标資源清單。

回來看getServerConfig方法:

public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)
        throws NacosException {
    String[] ct = new String[2];
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }
    
    HttpRestResult<String> result = null;
    try {
        Map<String, String> params = new HashMap<String, String>(3);
        if (StringUtils.isBlank(tenant)) {
            params.put("dataId", dataId);
            params.put("group", group);
        } else {
            params.put("dataId", dataId);
            params.put("group", group);
            params.put("tenant", tenant);
        }
        result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
    } catch (Exception ex) {
        String message = String
                .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s",
                        agent.getName(), dataId, group, tenant);
        LOGGER.error(message, ex);
        throw new NacosException(NacosException.SERVER_ERROR, ex);
    }
    
    switch (result.getCode()) {
        case HttpURLConnection.HTTP_OK:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData());
            ct[0] = result.getData();
            if (result.getHeader().getValue(CONFIG_TYPE) != null) {
                ct[1] = result.getHeader().getValue(CONFIG_TYPE);
            } else {
                ct[1] = ConfigType.TEXT.getType();
            }
            return ct;
        case HttpURLConnection.HTTP_NOT_FOUND:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
            return ct;
        case HttpURLConnection.HTTP_CONFLICT: {
            LOGGER.error(
                    "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
                            + "tenant={}", agent.getName(), dataId, group, tenant);
            throw new NacosException(NacosException.CONFLICT,
                    "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }
        case HttpURLConnection.HTTP_FORBIDDEN: {
            LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(),
                    dataId, group, tenant);
            throw new NacosException(result.getCode(), result.getMessage());
        }
        default: {
            LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(),
                    dataId, group, tenant, result.getCode());
            throw new NacosException(result.getCode(),
                    "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant="
                            + tenant);
        }
    }
}
           

/v1/cs/configs接口的調用。取相應最新的配置資料。

最後是checkListenerMd5方法:

void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        //有改變的話就通知
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, type, md5, wrap);
        }
    }
}
           

MD5校驗,有變更就通知

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
        final String md5, final ManagerListenerWrap listenerWrap) {

    /**
     * 建立了一個任務,封裝好資訊,調用監聽器的receiveConfigInfo方法接受資料處理。然後修改内容和MD5。
     * 這裡他設定了一下類加載器,包裝和監聽器的類加載器一樣,可能跟SPI反射調用相關。
     */
    final Listener listener = listenerWrap.listener;
    
    Runnable job = new Runnable() {
        @Override
        public void run() {
            ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
            ClassLoader appClassLoader = listener.getClass().getClassLoader();
            try {
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                    LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                }
                // 執行回調之前先将線程classloader設定為具體webapp的classloader,以免回調方法中調用spi接口是出現異常或錯用(多應用部署才會有該問題)。
                Thread.currentThread().setContextClassLoader(appClassLoader);
                
                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                listener.receiveConfigInfo(contentTmp);
                
                // compare lastContent and content
                if (listener instanceof AbstractConfigChangeListener) {
                    Map data = ConfigChangeHandler.getInstance()
                            .parseChangeData(listenerWrap.lastContent, content, type);
                    ConfigChangeEvent event = new ConfigChangeEvent(data);
                    ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                    listenerWrap.lastContent = content;
                }
                
                listenerWrap.lastCallMd5 = md5;
                LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                        listener);
            } catch (NacosException ex) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                        name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                        group, md5, listener, t.getCause());
            } finally {
                Thread.currentThread().setContextClassLoader(myClassLoader);
            }
        }
    };
    
    final long startNotify = System.currentTimeMillis();
    try {
        if (null != listener.getExecutor()) {
            listener.getExecutor().execute(job);
        } else {
            job.run();
        }
    } catch (Throwable t) {
        LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                group, md5, listener, t.getCause());
    }
    final long finishNotify = System.currentTimeMillis();
    LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
            name, (finishNotify - startNotify), dataId, group, md5, listener);
}
           

如果監聽器有線程池,則把通知任務丢線程池中,如果沒有直接job.run.

job裡listener.receiveConfigInfo(contentTmp);調用了監聽器的receiveConfigInfo方法,也就是我們addListener時定義的實作。

回來看addListener方法:

@Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
    worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}
           
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
        throws NacosException {
    group = null2defaultGroup(group);
    String tenant = agent.getTenant();
    CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
    for (Listener listener : listeners) {
        cache.addListener(listener);
    }
}
           
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
    CacheData cache = getCache(dataId, group, tenant);
    if (null != cache) {
        return cache;
    }
    String key = GroupKey.getKeyTenant(dataId, group, tenant);
    synchronized (cacheMap) {
        CacheData cacheFromMap = getCache(dataId, group, tenant);
        // multiple listeners on the same dataid+group and race condition,so
        // double check again
        // other listener thread beat me to set to cacheMap
        if (null != cacheFromMap) {
            cache = cacheFromMap;
            // reset so that server not hang this check
            cache.setInitializing(true);
        } else {
            cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
            int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
            cache.setTaskId(taskId);
            // fix issue # 1317
            if (enableRemoteSyncConfig) {
                String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                cache.setContent(ct[0]);
            }
        }
        
        Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
        copy.put(key, cache);
        cacheMap.set(copy);
    }
    LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
    
    MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
    
    return cache;
}
           

這裡就是cacheMap的添加過程。

public void addListener(Listener listener) {
    if (null == listener) {
        throw new IllegalArgumentException("listener is null");
    }
    /**
     * 根據傳入的類型,調用不同的ManagerListenerWrap構造函數
     */
    ManagerListenerWrap wrap =
            (listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content)
                    : new ManagerListenerWrap(listener, md5);

    // listeners 是CopyOnWriteArrayList
    // 可以在寫的時候提高性能,寫的時候是複制一份去改的,原來的資料也能讀,
    // 但是是舊的值,不過沒關系,一般隻修改一個元素,不影響到其他元素,其他元素照樣可以讀,舊的更新的是一樣的資料
    if (listeners.addIfAbsent(wrap)) {
        LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
                listeners.size());
    }
}
           

我們一下listeners的定義

是一個CopyOnWriteArrayList,這樣保證了在添加監聽的時候不會引起建立長輪詢任務的地方出問題。因為listeners在寫的時候是copy了一個副本,讀的地方還是使用原來的list。

總結一下:

  1. ClientWorker的建立過程 – 主要是建構長輪詢任務
  2. 長輪詢任務如何建立的 – 3000個監聽一組啟動一個長輪詢任務。任務跑完會寫回到線程池中繼續再次使用。
  3. 配置變更如何通知的 – 先去服務端拿到有變更的配置坐标,然後再發起調用擷取配置内容。

另外ConfigService對配置的增删改查就是調用對應的服務端接口,這個就沒什麼好分析的了,要注意一個本地緩存的問題,優先使用本地緩存,是緩存檔案。

LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
           

下面的問題是當監聽器收到變更通知時如何更新到應用服務層的?主要是Spring-cloud相關的内容了,後面篇幅開始分析這部分。