天天看點

Nacos源碼分析十五、服務端處理配置監聽

還記得NacosConfigService裡的那個長輪詢監聽配置變更麼?ClientWorker類裡的LongPollingRunnable内部類。我們看一下它的run方法裡的這句:

這個方法就是調用服務端的/v1/cs/configs/listener接口,以監聽可能出現變更的配置資料。往裡跟一下代碼:

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    
    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    headers.put("Long-Pulling-Timeout", "" + timeout);
    
    // told server do not hang me up if new initializing cacheData added in
    //是初始化的會設定一個請求頭标記
    // 初始化時不挂起
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup", "true");
    }
    
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }
    ... 
           

這裡關注往header裡寫了兩個标記:

  1. Long-Pulling-Timeout 挂起的最大時長
  2. Long-Pulling-Timeout-No-Hangup 如果是初始化的請求,則不挂起。

猜一下幹嘛用的?我們簡單分析一下:

首先我們知道用戶端的長輪詢任務執行完成後立馬又丢回了線程池

if (taskIdSet.contains(taskId)) {
    executorService.execute(this);
}
           

也就是說服務端會不停的收到監聽請求。而實際上大部分時間上并沒有配置變更,如果不停的發送消息,大部分的請求是無效的。而如果我們降低用戶端發送請求的頻率,這樣就會導緻配置變更時用戶端會有一定的延遲。nacos的做法是,服務端通過異步servlet技術,挂起了沒有配置變更的請求,當有配置變更時會通知這些挂起任務立即傳回給用戶端,而始終沒有變更的話則按設定的挂起逾時時間傳回。這樣既保證了用戶端能夠實時收到配置變更響應,又能夠有效的降低無效請求。

接下來我們看看服務端是怎麼做的。

先看一下ConfigController的listener接口:

@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    ...
    // do long-polling
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
           

忽略不重要的代碼,就是調用ConfigServletInner的doPollingConfig方法:

public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
        Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
    
    // Long polling.
    // 長輪詢
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        // 傳回成功
        return HttpServletResponse.SC_OK + "";
    }
    
    // Compatible with short polling logic.
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
    
    // Compatible with short polling result.
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);
    
    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);
    
    // Befor 2.0.4 version, return value is put into header.
    if (versionNum < START_LONG_POLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }
    
    Loggers.AUTH.info("new content:" + newResult);
    
    // Disable cache.
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}
           

LongPollingService.isSupportLongPolling(request) 就是判斷header中是否包含Long-Pulling-Timeout,如果沒有,說明不需要挂起,走下半段代碼,直接檢查配置變更後傳回,就是同步傳回。

如果header中有Long-Pulling-Timeout,進入

longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
           

這行代碼,進行異步挂起處理:

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
        int probeRequestSize) {
    
    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
    // 首先會看是否有不挂起的标志,也是用戶端傳來的,緩存配置初始化的時候就不挂起,否則會挂起。
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    String tag = req.getHeader("Vipserver-Tag");
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
    
    // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
    // 預設挂起時間29.5秒:
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    // 如果支援固定輪詢的話
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // Do nothing but set fix polling timeout.
    } else {
        long start = System.currentTimeMillis();
        // 比對用戶端發來的MD5,是否有改變,有改變的話就立即生成響應,
        // 否則就判斷是否有不挂起标記,有的話就直接傳回,因為沒有改變,也不挂起,就傳回了
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) {
            generateResponse(req, rsp, changedGroups);
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
            // 如果設定了不挂起,則直接傳回。
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        }
    }
    // 如果沒有改變,另外沒有設定不挂起,則進行異步任務

    String ip = RequestUtil.getRemoteIp(req);

    // 建立一個異步的上下文,然後建立ClientLongPolling任務,将上下文,逾時等資訊封裝進去,然後排程ClientLongPolling任務
    // Must be called by http thread, or send response.
    // servlet3.0的異步消息
    final AsyncContext asyncContext = req.startAsync();
    
    // AsyncContext.setTimeout() is incorrect, Control by oneself
    asyncContext.setTimeout(0L);

    ConfigExecutor.executeLongPolling(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
           

我們畫個簡單的流程圖:

Nacos源碼分析十五、服務端處理配置監聽

也就是說通過用戶端給過來的辨別和服務端自身配置結合判斷是否需要建立長輪詢任務,如果不需要則直接傳回使用者。如果此時已經發現有配置變更了也不需要挂起而直接傳回。

下面我們看看ClientLongPolling任務:

@Override
public void run() {
    // 又起了一個異步任務,延遲時間是傳過來的timeout,即挂起時間
    // 延遲時間過後執行
    asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
        @Override
        public void run() {
            ...
        }
        
    }, timeoutTime, TimeUnit.MILLISECONDS);

    // 放入監聽隊列裡
    allSubs.add(this);
}
           

這裡比較有意思的,又起了一個任務,延遲時間就是之前設定的逾時時間。實際上這個任務能執行到的情況就是這段時間沒有配置變更到最後的兜底操作。而這段時間如果有配置變更,會通過調用asyncTimeoutFuture.cancel來取消這個任務。

另外一個allSubs是一個隊列,這裡把自身添加進隊列。

我們看一下逾時後的任務操作:

@Override
public void run() {
    try {
        getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
        
        // Delete subsciber's relations.
        // 移除訂閱
        allSubs.remove(ClientLongPolling.this);
        
        if (isFixedPolling()) {
            LogUtil.CLIENT_LOG
                    .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
                            RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                            "polling", clientMd5Map.size(), probeRequestSize);
            // 如果是固定的,生成響應的時候會去比對MD5
            List<String> changedGroups = MD5Util
                    .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                            (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
            if (changedGroups.size() > 0) {
                sendResponse(changedGroups);
            } else {
                sendResponse(null);
            }
        } else {
            // 不是固定輪詢,到點就移除。asyncContext.complete發送http響應
            // 如果不是固定輪詢的,就直接傳回了,因為在這個任務之前已經判斷過沒有改變,才會挂起,為了有改變的時候直接響應
            // 挂起到點說明這段時間沒有改變,則直接發送http響應。 如果這段時間有改變,則由監聽器調用sendResponse直接傳回消息。調用時會删除目前任務。
            LogUtil.CLIENT_LOG
                    .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
                            RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                            "polling", clientMd5Map.size(), probeRequestSize);
            sendResponse(null);
        }
    } catch (Throwable t) {
        LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
    }
    
}
           

首先既然已經逾時了,也就不需要訂閱變更事件了,直接移除。

如果是固定輪詢事件的,也就是說之前即使有變更事件也不會處理,這裡做最終處理。 就是看看有沒有變更的groups,然後發送響應消息。

如果不是固定輪詢的,說明到最後都沒有變更事件,那麼直接傳回空。

看一下sendResponse方法:

void sendResponse(List<String> changedGroups) {
    
    // Cancel time out task.
    // 移除輪詢任務
    if (null != asyncTimeoutFuture) {
        asyncTimeoutFuture.cancel(false);
    }
    generateResponse(changedGroups);
}
           

既然要傳回了,那麼如果還有future就取消掉,然後是generateResponse:

void generateResponse(List<String> changedGroups) {
    if (null == changedGroups) {
        
        // Tell web container to send http response.
        asyncContext.complete();
        return;
    }
    
    HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
    
    try {
        final String respString = MD5Util.compareMd5ResultString(changedGroups);
        
        // Disable cache.
        response.setHeader("Pragma", "no-cache");
        response.setDateHeader("Expires", 0);
        response.setHeader("Cache-Control", "no-cache,no-store");
        response.setStatus(HttpServletResponse.SC_OK);
        response.getWriter().println(respString);
        asyncContext.complete();
    } catch (Exception ex) {
        PULL_LOG.error(ex.toString(), ex);
        asyncContext.complete();
    }
}
           

就是異步上下文的complete。

那麼如果在逾時時間到達之前有配置變更會怎麼樣呢?我們首先看一下這個LongPollingService的初始化方法:

public LongPollingService() {
    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
    
    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
    
    // Register LocalDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
    
    // Register A Subscriber to subscribe LocalDataChangeEvent.

    // 這裡 釋出-訂閱模式
    NotifyCenter.registerSubscriber(new Subscriber() {

        // 這裡監聽配置資訊改變事件
        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // Ignore.
            } else {
                if (event instanceof LocalDataChangeEvent) {
                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                    ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                }
            }
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return LocalDataChangeEvent.class;
        }
    });
    
}
           

首先allSubs隊列的初始化,這是一個LinkedQueue。

然後往通知中心注冊了一個訂閱LocalDataChangeEvent的監聽器,當有配置變更時,通知中心會發出LocalDataChangeEvent事件,這裡接收事件。

如果是固定輪詢模式,則啥也不做,交由兜底任務asyncTimeoutFuture處理。

如果不是,則送出一個DataChangeTask任務:

@Override
public void run() {
    try {
        ConfigCacheService.getContentBetaMd5(groupKey);
        //周遊所有的訂閱ClientLongPolling
        for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
            ClientLongPolling clientSub = iter.next();
            // 确定是相同group的用戶端
            if (clientSub.clientMd5Map.containsKey(groupKey)) {
                // If published tag is not in the beta list, then it skipped.
                // 如果beta釋出且不在beta清單直接跳過
                if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
                    continue;
                }
                
                // If published tag is not in the tag list, then it skipped.
                // 如果tag釋出且不在tag清單直接跳過
                if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                    continue;
                }
                
                getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                // 删除訂閱關系
                iter.remove(); // Delete subscribers' relationships.
                LogUtil.CLIENT_LOG
                        .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
                                RequestUtil
                                        .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
                                "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                // 發送響應消息,立即傳回響應
                clientSub.sendResponse(Arrays.asList(groupKey));
            }
        }
    } catch (Throwable t) {
        LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
    }
}
           

周遊隊列,判斷是否是相同group的用戶端,如果是,從隊列中移除,然後發送響應消息。

sendResponse時會取消掉對應的兜底任務asyncTimeoutFuture。

畫個圖:

Nacos源碼分析十五、服務端處理配置監聽

總結一下

  1. 首先根據用戶端給的header和自身的配置來确定是否需要進行挂起
  2. 所謂挂起就是起了個ClientLongPolling任務。 這個任務又起了個asyncTimeoutFuture來做逾時兜底。然後将自身加入到監聽隊列中來監聽過程變化。
  3. 服務端啟動時注冊了一個監聽LocalDataChangeEvent事件的監聽器,如果有配置變更則會啟動DataChangeTask任務
  4. DataChangeTask任務周遊監聽隊列,從中找到對應的用戶端,取消對應的asyncTimeoutFuture任務同時直接傳回response響應。