天天看點

圖解+源碼講解 Ribbon 如何擷取注冊中心的執行個體

圖解+源碼講解 Ribbon 如何擷取注冊中心的執行個體

行動是成功的階梯,行動越多,登得越高

想像一下在哪裡開始

    其實我們之前都說過就是負載均衡器從擷取的系統資料庫中選取一個可以使用的服務,那我用了Eureka 注冊中心的話,應該也是通過注冊中心的用戶端擷取的系統資料庫才對,那我們就看看在進行負載均衡器初始化的地方有沒有線索

擷取配置流程圖

圖解+源碼講解 Ribbon 如何擷取注冊中心的執行個體

Ribbon 用戶端配置類

    RibbonClientConfiguration 用戶端配置類,看看這裡面都初始化什麼東西了,看看主要的邏輯部分,裡面建立了路由規則、ping規則、服務清單、服務清單更新器、負載均衡器建立、過濾器以及上下文等等

public class RibbonClientConfiguration {
    // 連接配接逾時時間
    public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
    // 讀取逾時時間
    public static final int DEFAULT_READ_TIMEOUT = 1000;
    @RibbonClientName
    private String name = "client";
    // ribbon 用戶端配置
    @Bean
    @ConditionalOnMissingBean
    public IClientConfig ribbonClientConfig() {
        ....
    }
    // ribbon 路由規則
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        ....
    }
    // 路由 ping 操作
    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
         ....
    }
    // ribbon 服務清單
    @Bean
    @ConditionalOnMissingBean
    @SuppressWarnings("unchecked")
    public ServerList<Server> ribbonServerList(IClientConfig config) {
         ....
    }
    // ribbon 服務清單更新器
    @Bean
    @ConditionalOnMissingBean
    public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
        ....
    }
    // 負載均衡器
    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }
    // 服務清單過濾器
    @Bean
    @ConditionalOnMissingBean
    @SuppressWarnings("unchecked")
    public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
        ....
    }
    // ribbon 負載均衡上下文
    @Bean
    @ConditionalOnMissingBean
    public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
            IClientConfig config, RetryHandler retryHandler) {
         ....
    }
    // 重試處理器
    @Bean
    @ConditionalOnMissingBean
    public RetryHandler retryHandler(IClientConfig config) {
        ....
    }

}      

建立負載均衡器 ribbonLoadBalancer

    根據其他方法初始化好的參數進行 ribbonLoadBalancer 構造,這裡面有Rule 規則、Ping機制、以及你要通路的服務位址等參數,實際建立的負載均衡器預設建立的是 ZoneAwareLoadBalancer 負載均衡器

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
            serverListFilter, serverListUpdater);
}      

預設負載均衡器 ZoneAwareLoadBalancer

    這個負載均衡器繼承了 DynamicServerListLoadBalancer動态服務清單負載均衡器,一看就是我們需要的負載均衡器

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                             IPing ping, ServerList<T> serverList,
                             ServerListFilter<T> filter,
                             ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}      

    這裡面的 super 方法就是 **DynamicServerListLoadBalancer **這裡面的方法

動态服務清單負載均衡器 DynamicServerListLoadBalancer

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule,
    IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
    ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping);
    this.serverListImpl = serverList;
    this.filter = filter;
    this.serverListUpdater = serverListUpdater;
    if (filter instanceof AbstractServerListFilter) {
        ((AbstractServerListFilter) filter).
            setLoadBalancerStats(getLoadBalancerStats());
    }
    // 核心方法,其他的方法先不細看
    restOfInit(clientConfig);
}      

restOfInit 方法

    這個方法裡面是真正的擷取服務執行個體清單操作,和開啟後續如注冊中心推送新的執行個體變更資訊或者拉取執行個體變更操作

void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // 開啟并且初始化加載新的服務執行個體,比如注冊中心推送或者拉取操作,後面講解
    enableAndInitLearnNewServersFeature();
    // 擷取服務清單操作
    updateListOfServers();
    ......
}      

更新服務清單 updateListOfServers()

@VisibleForTesting
public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
        //擷取服務清單資訊
        servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                getIdentifier(), servers);
    }
    // 更新LoadBalancer中的所有服務清單
    updateAllServerList(servers);
}      

擷取更新服務清單 getUpdatedListOfServers()

    走的是 DiscoveryEnabledNIWSServerList 這個類下面的 getUpdatedListOfServers() 操作,走的是注冊中心的服務清單

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
    // 獲得服務方法
    return obtainServersViaDiscovery();
}      

擷取服務發現清單 obtainServersViaDiscovery

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
    List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
    // 如果用戶端為提供者為空的話那麼就傳回一個空的清單
    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
        logger.warn("EurekaClient has not been initialized yet,
                    returning an empty list");
        return new ArrayList<DiscoveryEnabledServer>();
    }
    // 擷取 eureka 用戶端操作
    EurekaClient eurekaClient = eurekaClientProvider.get();
    // 服務位址不為空的情況下
    if (vipAddresses!=null){
        for (String vipAddress : vipAddresses.split(",")) {
            // 通過 eureka 用戶端擷取服務執行個體操作
        List<InstanceInfo> listOfInstanceInfo =
            eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
        for (InstanceInfo ii : listOfInstanceInfo) {
                // 服務執行個體必須是啟動狀态的或者正常狀态的
            if (ii.getStatus().equals(InstanceStatus.UP)) {
                // 建立服務操作,加入到服務清單中
                DiscoveryEnabledServer des =
                    createServer(ii, isSecure, shouldUseIpAddr);
                serverList.add(des);
                }
            }
        }
    }
    // 傳回服務清單
    return serverList;
}      

更新LoadBalancer中的所有服務清單 updateAllServerList(servers)

protected void updateAllServerList(List<T> ls) {
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true);
            }
            setServersList(ls);
            // ping 服務後期講解
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}      

設定服務清單 setServersList

@Override
public void setServersList(List lsrv) {
    // 将服務用戶端擷取的服務清單執行個體放入到 BaseLoadBalancer 的 allServerList 中
    super.setServersList(lsrv);
    // 下面是将用戶端擷取的服務清單執行個體放入到 serversInZones 這個haspMap中
    List<T> serverList = (List<T>) lsrv;
    Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
    for (Server server : serverList) {
        getLoadBalancerStats().getSingleServerStat(server);
        String zone = server.getZone();
        if (zone != null) {
            zone = zone.toLowerCase();
            List<Server> servers = serversInZones.get(zone);
            if (servers == null) {
                servers = new ArrayList<Server>();
                serversInZones.put(zone, servers);
            }
            servers.add(server);
        }
    }
    setServerListForZones(serversInZones);
}      

小結

  1. 建立一些必要的前提元件,比如 ZoneAwareLoadBalancer、PollingServerListUpdater 等操作
  2. 通過eureka client 用戶端元件擷取服務清單
  3. 根據擷取的服務清單資訊進行負載均衡器内部内部的服務清單進行更新

繼續閱讀