圖解+源碼講解 Ribbon 服務清單更新
構成天才的決定因素就應是勤奮 —— 郭沫若
動态服務負載均衡器 DynamicServerListLoadBalancer
在這個負載均衡器裡面有一個 restOfInit 這個方法,這個方法涉及到注冊中心的服務清單的拉取,以及拉取或者監聽更新清單的變化,之後設定到負載均衡器裡面,提供給請求執行的時候,進行服務選擇
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).
setLoadBalancerStats(getLoadBalancerStats());
}
// 核心方法,其他的方法先不細看
restOfInit(clientConfig);
}
這個方法裡面 enableAndInitLearnNewServersFeature 這個是一個核心的方法,用來進行指定拉取或者監聽服務變化清單
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
服務清單變更操作 enableAndInitLearnNewServersFeature
開啟服務清單變化更新通知操作
public void enableAndInitLearnNewServersFeature() {
// 服務清單更新器開啟 updateAction 更新操作
serverListUpdater.start(updateAction);
}
protected final ServerListUpdater.UpdateAction updateAction =
new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
// 這個就是上面講解的服務執行個體拉取操作
updateListOfServers();
}
};
服務清單變更政策
定時拉取政策
serverListUpdater.start 方法有兩種實作政策,一個是 eureka 服務通知操作,一個是主動的去拉取服務變更操作,預設的是 PollingServerListUpdater 更新服務政策【在 RibbonClientConfiguration 裡面已經進行建立了PollingServerListUpdater 該政策】如下圖
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
定時拉取更新的服務清單 PollingServerListUpdater
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
// 30s 去拉取一次
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
public PollingServerListUpdater() {
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}
public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
this.initialDelayMs = initialDelayMs;
this.refreshIntervalMs = refreshIntervalMs;
}
PollingServerListUpdater#start 方法
@Override
public synchronized void start(final UpdateAction updateAction) {
// 建立一個線程
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
// 線程排程器排程 wrapperRunnable 任務
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
}
updateAction.doUpdate() 定時更新服務清單
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
服務監聽政策
Eureka通知服務清單更新器 EurekaNotificationServerListUpdater
// eureka事件監聽器
private volatile EurekaEventListener updateListener;
// eureka 用戶端
private volatile EurekaClient eurekaClient;
public EurekaNotificationServerListUpdater() {
this(new LegacyEurekaClientProvider());
}
public EurekaNotificationServerListUpdater(final Provider<EurekaClient>
eurekaClientProvider) {
this(eurekaClientProvider, getDefaultRefreshExecutor());
}
// 構造 eureka通知服務清單監聽器
public EurekaNotificationServerListUpdater(final Provider<EurekaClient>
eurekaClientProvider, ExecutorService refreshExecutor) {
this.eurekaClientProvider = eurekaClientProvider;
this.refreshExecutor = refreshExecutor;
}
EurekaNotificationServerListUpdater#start 方法
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
// 建立了一個eureka事件監聽器
this.updateListener = new EurekaEventListener() {
@Override
public void onEvent(EurekaEvent event) {
// 緩存重新整理事件
if (event instanceof CacheRefreshedEvent) {
if (!refreshExecutor.isShutdown()) {
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
// 進行服務清單更新
updateAction.doUpdate();
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
} finally {
updateQueued.set(false);
}
}
});
}
else {
stop();
}
}
}
};
if (eurekaClient == null) {
// 擷取 eureka 用戶端
eurekaClient = eurekaClientProvider.get();
}
if (eurekaClient != null) {
// 注冊監聽器
eurekaClient.registerEventListener(updateListener);
}
}
updateAction.doUpdate() 這個方法和上面的後續一樣
小結
- 兩種方式進行服務清單更新
- 一種是定時拉取更新清單操作
- 一種是 eureka 緩存重新整理通知事件進行服務清單更新