天天看點

圖解+源碼講解 Ribbon 服務清單更新

圖解+源碼講解 Ribbon 服務清單更新

構成天才的決定因素就應是勤奮 —— 郭沫若

動态服務負載均衡器 DynamicServerListLoadBalancer

    在這個負載均衡器裡面有一個 restOfInit 這個方法,這個方法涉及到注冊中心的服務清單的拉取,以及拉取或者監聽更新清單的變化,之後設定到負載均衡器裡面,提供給請求執行的時候,進行服務選擇

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule,
    IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
    ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping);
    this.serverListImpl = serverList;
    this.filter = filter;
    this.serverListUpdater = serverListUpdater;
    if (filter instanceof AbstractServerListFilter) {
        ((AbstractServerListFilter) filter).
            setLoadBalancerStats(getLoadBalancerStats());
    }
    // 核心方法,其他的方法先不細看
    restOfInit(clientConfig);
}      

    這個方法裡面 enableAndInitLearnNewServersFeature 這個是一個核心的方法,用來進行指定拉取或者監聽服務變化清單

void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }      

服務清單變更操作 enableAndInitLearnNewServersFeature

    開啟服務清單變化更新通知操作

public void enableAndInitLearnNewServersFeature() {
    // 服務清單更新器開啟 updateAction 更新操作
    serverListUpdater.start(updateAction);
}      
protected final ServerListUpdater.UpdateAction updateAction =
        new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {
        // 這個就是上面講解的服務執行個體拉取操作
        updateListOfServers();
    }
};      

服務清單變更政策

定時拉取政策

    serverListUpdater.start 方法有兩種實作政策,一個是 eureka 服務通知操作,一個是主動的去拉取服務變更操作,預設的是 PollingServerListUpdater 更新服務政策【在 RibbonClientConfiguration 裡面已經進行建立了PollingServerListUpdater 該政策】如下圖

圖解+源碼講解 Ribbon 服務清單更新
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
    return new PollingServerListUpdater(config);
}      
圖解+源碼講解 Ribbon 服務清單更新

定時拉取更新的服務清單 PollingServerListUpdater

private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
// 30s 去拉取一次
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;


public PollingServerListUpdater() {
    this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}
public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
    this.initialDelayMs = initialDelayMs;
    this.refreshIntervalMs = refreshIntervalMs;
}      

PollingServerListUpdater#start 方法

@Override
public synchronized void start(final UpdateAction updateAction) {
    // 建立一個線程
    final Runnable wrapperRunnable = new Runnable() {
        @Override
        public void run() {
            if (!isActive.get()) {
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(true);
                }
                return;
            }
            try {
                updateAction.doUpdate();
                lastUpdated = System.currentTimeMillis();
            } catch (Exception e) {
                logger.warn("Failed one update cycle", e);
            }
        }
    };
    // 線程排程器排程 wrapperRunnable 任務
    scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
            wrapperRunnable,
            initialDelayMs,
            refreshIntervalMs,
            TimeUnit.MILLISECONDS
    );

}      

updateAction.doUpdate() 定時更新服務清單

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {
        updateListOfServers();
    }
};      

服務監聽政策

Eureka通知服務清單更新器 EurekaNotificationServerListUpdater

// eureka事件監聽器
private volatile EurekaEventListener updateListener;
// eureka 用戶端
private volatile EurekaClient eurekaClient;

public EurekaNotificationServerListUpdater() {
    this(new LegacyEurekaClientProvider());
}

public EurekaNotificationServerListUpdater(final Provider<EurekaClient>
                                eurekaClientProvider) {
    this(eurekaClientProvider, getDefaultRefreshExecutor());
}
// 構造 eureka通知服務清單監聽器
public EurekaNotificationServerListUpdater(final Provider<EurekaClient>
            eurekaClientProvider, ExecutorService refreshExecutor) {
    this.eurekaClientProvider = eurekaClientProvider;
    this.refreshExecutor = refreshExecutor;
}      

EurekaNotificationServerListUpdater#start 方法

public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
    // 建立了一個eureka事件監聽器
    this.updateListener = new EurekaEventListener() {
        @Override
    public void onEvent(EurekaEvent event) {
        // 緩存重新整理事件
        if (event instanceof CacheRefreshedEvent) {
            if (!refreshExecutor.isShutdown()) {
                refreshExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 進行服務清單更新
                            updateAction.doUpdate();
                            lastUpdated.set(System.currentTimeMillis());
                        } catch (Exception e) {
                            logger.warn("Failed to update serverList", e);
                        } finally {
                            updateQueued.set(false);
                        }
                    }
                });
            }
            else {
                stop();
            }
        }
    }
    };
    if (eurekaClient == null) {
        // 擷取 eureka 用戶端 
        eurekaClient = eurekaClientProvider.get();
    }
    if (eurekaClient != null) {
        // 注冊監聽器
        eurekaClient.registerEventListener(updateListener);
    }
}      

updateAction.doUpdate() 這個方法和上面的後續一樣

小結

  1. 兩種方式進行服務清單更新
  2. 一種是定時拉取更新清單操作
  3. 一種是 eureka 緩存重新整理通知事件進行服務清單更新