天天看点

不懂Ribbon原理的可以进来看看哦,分析RibbonClientConfiguration完成了哪些核心初始操作

不懂Ribbon原理的可以进来看看哦,分析RibbonClientConfiguration完成了哪些核心初始操作

 ​

RibbonClientConfiguration

  RibbonClientConfiguration是一个非常中的Ribbon配置类,在第一个发起Ribbon请求的时候会完成对应的初始化操作。会完成多个相关的默认设置。

接口 默认实现 描述
IClientConfig DefaultClientConfigImpl 管理配置接口
IRule ZoneAvoidanceRule 均衡策略接口
IPing DummyPing 检查服务可用性接口
ServerList<Server> ConfigurationBasedServerList 获取服务列表接口
ILoadBalancer ZoneAwareLoadBalancer 负载均衡接口
ServerListUpdater PollingServerListUpdater 定时更新服务列表接口
ServerIntrospector DefaultServerIntrospector 安全端口接口
@Bean
@ConditionalOnMissingBean
public IClientConfig ribbonClientConfig() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadProperties(this.name);
config.set(CommonClientConfigKey.ConnectTimeout, 1000);
config.set(CommonClientConfigKey.ReadTimeout, 1000);
config.set(CommonClientConfigKey.GZipPayload, true);
return config;
    }

@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, this.name)) {
return (IRule)this.propertiesFactory.get(IRule.class, config, this.name);
        } else {
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
        }
    }

@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
return (IPing)(this.propertiesFactory.isSet(IPing.class, this.name) ? (IPing)this.propertiesFactory.get(IPing.class, config, this.name) : new DummyPing());
    }

@Bean
@ConditionalOnMissingBean
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, this.name)) {
return (ServerList)this.propertiesFactory.get(ServerList.class, config, this.name);
        } else {
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
        }
    }

@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
    }

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
      

  在众多的默认实现中比较重要的是【ILoadBalancer】对象的实现。即【ZoneAwareLoadBalancer】的实现。实现的原理图为:

不懂Ribbon原理的可以进来看看哦,分析RibbonClientConfiguration完成了哪些核心初始操作

在【ZoneAwareLoadBalancer】里面完成了服务地址动态获取和服务地址更新定时任务的配置。首先会进入【ZoneAwareLoadBalancer】的构造方法中

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
    }
      

  通过源码能够发现会调用父类中的构造方法。

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
// 继续调用父类中的方法
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
// 完成相关的初始操作  服务地址获取和更新
restOfInit(clientConfig);
    }
      

  在上面的源码中我们先继续跟踪父类中的方法。

void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
this.config = clientConfig;
String clientName = clientConfig.getClientName();
this.name = clientName;
// 设置了定时任务的间隔时间为30秒。
int pingIntervalTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerPingInterval,
Integer.parseInt("30")));
int maxTotalPingTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
Integer.parseInt("2")));

setPingInterval(pingIntervalTime);
setMaxTotalPingTime(maxTotalPingTime);

// cross associate with each other
// i.e. Rule,Ping meet your container LB
// LB, these are your Ping and Rule guys ...
setRule(rule);
setPing(ping);

setLoadBalancerStats(stats);
rule.setLoadBalancer(this);
if (ping instanceof AbstractLoadBalancerPing) {
            ((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
        }
logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
boolean enablePrimeConnections = clientConfig.get(
CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);

if (enablePrimeConnections) {
this.setEnablePrimingConnections(true);
PrimeConnections primeConnections = new PrimeConnections(
this.getName(), clientConfig);
this.setPrimeConnections(primeConnections);
        }
init();

    }
      

  在initWithConfig方法中比较中的就是设置了定时任务的间隔时间。然后我们再回到restOfInit方法中。(一起来进阶提升吧:463257262)

void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
// 设置定时任务
enableAndInitLearnNewServersFeature();
// 获取并更新服务地址
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }
      

  先看enableAndInitLearnNewServersFeature方法

public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
    }
      

  start方法的实现有多种,根据我们的服务选择对应的选择即可。比如本地就使用PollingServerListUpdater,如果是Eureka注册中心就选择EurekaNotificationServerListUpdater.

不懂Ribbon原理的可以进来看看哦,分析RibbonClientConfiguration完成了哪些核心初始操作

以本地为例:

@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
// 定时任务的  任务体
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
                        }
return;
                    }
try {
// doUpdate()的方法体要注意
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
logger.warn("Failed one update cycle", e);
                    }
                }
            };
// 设置定时任务 10秒开始第一次检查,间隔时间是30秒
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
            );
        } else {
logger.info("Already active, no-op");
        }
    }
      

  此处要注意定时任务的具体内容,以本地为例。

不懂Ribbon原理的可以进来看看哦,分析RibbonClientConfiguration完成了哪些核心初始操作

 所以定时任务执行的方法也就是【updateListOfServers】方法,也就是:

不懂Ribbon原理的可以进来看看哦,分析RibbonClientConfiguration完成了哪些核心初始操作

emsp; 所以我们继续来看看【updateListOfServers】方法中的逻辑

@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
// 从本地或者Eureka或者Nacos等各个配置中心中获取对应的服务地址信息
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);

if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
            }
        }
// 更新服务地址信息
updateAllServerList(servers);
    }
      

  上面代码中重要的方法是【getUpdatedListOfServers】和【updateAllServerList】,先来看【getUpdatedListOfServers】方法

不懂Ribbon原理的可以进来看看哦,分析RibbonClientConfiguration完成了哪些核心初始操作

  查看本地的逻辑,Eureka的自行查看

@Override
public List<Server> getUpdatedListOfServers() {
// 从本地配置中获取
String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
return derive(listOfServers);
  }
      

  然后就是【updateAllServerList】方法

protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
// 通过CAS保证操作的原子性
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
                }
// 更新服务地址信息
setServersList(ls);
// 强制ping服务地址
super.forceQuickPing();
            } finally {
serverListUpdateInProgress.set(false);
            }
        }
    }