天天看點

SpringCloud元件之Ribbon使用及源碼分析SpringCloud元件之Ribbon使用及源碼分析一、什麼是Ribbon?二、Ribbon的基本使用三、Ribbon源碼分析四、總結

文章目錄

  • SpringCloud元件之Ribbon使用及源碼分析
  • 一、什麼是Ribbon?
  • 二、Ribbon的基本使用
    • 1.測試代碼編寫
    • 2.結果測試
  • 三、Ribbon源碼分析
    • [email protected]注解
    • 2.LoadBalancerAutoConfiguration自動裝配
    • 3. LoadBalancerInterceptor攔截器
    • 4.ILoadBalancer負載均衡器
    • 5.服務清單更新與擷取
    • 6.負載均衡擷取服務節點資訊
    • 7.重構請求Url
  • 四、總結
本文僅是學習記錄,如有錯誤,還請大佬們不吝指出.

SpringCloud元件之Ribbon使用及源碼分析

一、什麼是Ribbon?

Ribbon 是一個具有内置軟體負載平衡器的程序間通信(遠端過程調用)庫。簡單來說Ribbon就是一個用戶端負載均衡器,其作用就是解析目标服務的可調用的服務清單,基于負載均衡算法來實作請求的分發。

二、Ribbon的基本使用

1.測試代碼編寫

編寫一個簡單的user服務和一個order服務來測試,通過user服務來遠端調用order服務擷取使用者訂單資訊

order服務

@RestController
public class OrderService {

    @Value("${server.port}")
    private String port;
    
    @GetMapping("/orders")
    public String getAllOrder(){
        System.err.println("order service port : " + port);
        return "return all order..";
    }
}
           

order服務配置檔案

server:
  port: 9000
spring:
  application:
    name: order-service
           

user服務

@RestController
public class UserController{

    @Autowired
    private RestTemplate restTemplate;

    @Bean
    @LoadBalanced
    public RestTemplate getRestTemplate(RestTemplateBuilder restTemplateBuilder){
        return restTemplateBuilder.build();
    }

    @GetMapping("/user/{userId}")
    public String getAllOrder(@PathVariable("userId") String userId){
        //調用訂單服務查詢使用者訂單資訊
        String result = restTemplate.getForObject("http://order-service/orders", String.class);
        return "userId: " + userId + "  order info : "  + result;
    }

}
           

user服務配置檔案

server:
  port: 8000
spring:
  application:
    name: user-service
# 配置order服務的提供者的位址清單,這裡就相當于是注冊中心中的生産者的資訊
order-service:
  ribbon:
    listOfServers: localhost:9000,localhost:9100
           

2.結果測試

通路使用者服務接口測試

SpringCloud元件之Ribbon使用及源碼分析SpringCloud元件之Ribbon使用及源碼分析一、什麼是Ribbon?二、Ribbon的基本使用三、Ribbon源碼分析四、總結

這裡我調用了兩次,可以看出在user服務調用order服務時,是負載均衡調用的

SpringCloud元件之Ribbon使用及源碼分析SpringCloud元件之Ribbon使用及源碼分析一、什麼是Ribbon?二、Ribbon的基本使用三、Ribbon源碼分析四、總結

三、Ribbon源碼分析

[email protected]注解

為什麼僅僅加了一個LoadBalanced注解就實作了負載均衡,來看一下LoadBalanced注解

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
           

從LoadBalanced中可以看出,除了元注解資訊,就隻有一個Qualifier注解,這個Qualifier就相當于是一個标記,用來辨別目前的RestTemplate,在LoadBalancerAutoConfiguration自動裝配類中會去解析LoadBalanced注解,在這裡面會擷取到所有加了LoadBalanced注解的RestTemplate的Bean對象

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

	//擷取所有加了LoadBalanced注解的RestTemplate對象
	@LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();
           

2.LoadBalancerAutoConfiguration自動裝配

那麼擷取到這些加了**@LoadBalanced**注解的RestTemplate有什麼作用?在LoadBalancerAutoConfiguration自動裝配類中,擷取到restTemplates後,對每個restTemplate做了一個包裝,增加了一個攔截器

@Bean
	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
			//周遊restTemplates,對每個restTemplate進行包裝,其實就是對每個restTemplate都增加一個攔截器
			for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
				for (RestTemplateCustomizer customizer : customizers) {
					customizer.customize(restTemplate);
				}
			}
		});
	}
	
	@Bean
	@ConditionalOnMissingBean
	public RestTemplateCustomizer restTemplateCustomizer(final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
		//先擷取預設的攔截器,再将loadBalancerInterceptor添加進攔截器鍊
		return restTemplate -> {
			List<ClientHttpRequestInterceptor> list = new ArrayList<>(
					restTemplate.getInterceptors());
			list.add(loadBalancerInterceptor);
			restTemplate.setInterceptors(list);
		};
	}
           

現在已經将所有添加了LoadBalanced注解的RestTemplate添加了一個攔截器,所有當user服務在調用order服務的時候一定會進入攔截器

3. LoadBalancerInterceptor攔截器

那麼進入攔截器之後,攔截器又做了什麼? 我們進到LoadBalancerInterceptor,

這裡LoadBalancerInterceptor将請求攔截到之後,就委托給了LoadBalancerClient去執行,LoadBalancerClient是一個接口,最終由RibbonLoadBalancerClient實作類執行

private LoadBalancerClient loadBalancer;
	
	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null,"Request URI does not contain a valid hostname: " + originalUri);
		return this.loadBalancer.execute(serviceName,this.requestFactory.createRequest(request, body, execution));
	}
           

4.ILoadBalancer負載均衡器

我們接着看RibbonLoadBalancerClient.execute()方法,這裡面做了兩個操作,一個就是根據服務名稱擷取負載均衡器,第二個就是根據負載均衡器獲得某個Server執行個體

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
			throws IOException {
		//擷取負載均衡器
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		//根據負載均衡器獲得一個Server執行個體,這裡的Server是從叢集中獲得的某個Server執行個體
		Server server = getServer(loadBalancer, hint);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server,
				isSecure(server, serviceId),
				serverIntrospector(serviceId).getMetadata(server));
		return execute(serviceId, ribbonServer, request);
	}
           

我們先看擷取負載均衡器是如何操作的,跟進方法可以看到在getInstance()執行個體化過程中,基于工廠模式通過服務名稱,獲得一個上下文環境,然後再通過type(ILoadBalancer.class)從上下文環境中得到一個ILoadBalancer 的執行個體,這裡getInstance()的時候還做了一個操作就是基于SpringClientFactory初始化了RibbonClientConfiguration類

public SpringClientFactory() {
		super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
	}
	
	public <C> C getInstance(String name, Class<C> type) {
		C instance = super.getInstance(name, type);
		if (instance != null) {
			return instance;
		}
		IClientConfig config = getInstance(name, IClientConfig.class);
		return instantiateWithConfig(getContext(name), type, config);
	}

    public <T> T getInstance(String name, Class<T> type) {
        AnnotationConfigApplicationContext context = this.getContext(name);
        return BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0 ? context.getBean(type) : null;
    }
           

ILoadBalancer 的類繼承實作關系圖

SpringCloud元件之Ribbon使用及源碼分析SpringCloud元件之Ribbon使用及源碼分析一、什麼是Ribbon?二、Ribbon的基本使用三、Ribbon源碼分析四、總結

這裡BEBUG可以看出,就構造出了一個執行個體ZoneAwareLoadBalancer

SpringCloud元件之Ribbon使用及源碼分析SpringCloud元件之Ribbon使用及源碼分析一、什麼是Ribbon?二、Ribbon的基本使用三、Ribbon源碼分析四、總結

5.服務清單更新與擷取

拿到了負載均衡器後,緊接着看第二個操作,getServer(loadBalancer, hint),其中chooseServer()就是基于負載均衡算法選擇叢集中的某個服務節點

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
		if (loadBalancer == null) {
			return null;
		}
		// 這裡 default 就是采用預設的負載均衡算法,
		return loadBalancer.chooseServer(hint != null ? hint : "default");
	}
		
	//截取ZoneAwareLoadBalancer類chooseServer方法中部分代碼
	if (server != null) {
         return server;
     } else {
         logger.debug("Zone avoidance logic is not invoked.");
         //這裡調用了父類的chooseServer方法
         return super.chooseServer(key);
     }

    protected IRule rule = DEFAULT_RULE;
	//根據不同key選擇相應的負載均衡規則,再選擇出服務節點
    public Server chooseServer(Object key) {
        if (this.counter == null) {
            this.counter = this.createCounter();
        }
        this.counter.increment();
        if (this.rule == null) {
            return null;
        } else {
            try {
            	//調用PredicateBasedRule中的choose(key)方法
                return this.rule.choose(key);
            } catch (Exception var3) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", new Object[]{this.name, key, var3});
                return null;
            }
        }
    }
    
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        //這裡lb.getAllServers()擷取了所有服務清單資訊,再對叢集中的order-servier服務進行輪訓篩選
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }
           

從上面choose方法中可以看到調用了lb.getAllServers() 方法來擷取所有的服務清單資訊,那麼這裡到底是如何獲得服務清單資訊的嘞? 我們進入到getAllServers()方法中

/**
     * @return All known servers, both reachable and unreachable.
     */
     // ILoadBalancer 接口類中的一個方法
	public List<Server> getAllServers();
	
    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
    
    @Override
    public List<Server> getAllServers() {
        return Collections.unmodifiableList(allServerList);
    }
           

可以看出getAllServers()方法操作了一個allServerList變量,并傳回一個不可修改的服務清單,這裡的allServerList就是用戶端用來緩存服務清單資訊的,那具體allServerList是什麼時候被指派得嘞?這時候我們就要回到ILoadBalancer執行個體初始化的時候了,我們回到RibbonClientConfiguration中ZoneAwareLoadBalancer被初始化的地方

@Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
		}
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,serverListFilter, serverListUpdater);
	}
           

當子類ZoneAwareLoadBalancer被初始化的時候會去調用父類的構造方法,進入父類DynamicServerListLoadBalancer

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }
           

從這裡我們可以看出在父類初始化的時候調用了一個restOfInit()方法,在這個方法中有兩個操作方法enableAndInitLearnNewServersFeature()和updateListOfServers()

void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }
           

我們先看enableAndInitLearnNewServersFeature()方法,這個方法的作用就是啟動一個任務定時去更新服務清單資訊

public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        //這裡開啟一個updateAction的任務
        serverListUpdater.start(updateAction);
    }
    
    //updateAction任務
    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
	    @Override
	    public void doUpdate() {
	        updateListOfServers();
	    }
    };
    
	//由于服務清單資訊是配置在本地的我們進入到PollingServerListUpdater實作類
    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };
			
			//使用scheduled來做定時任務調用
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
            		//定時更新用戶端緩存中的服務清單資訊的任務
                    wrapperRunnable,
                    //任務延遲1s執行
                    initialDelayMs,
                    //會先讀取配置檔案中的ServerListRefreshInterval的資訊,沒有配置就預設30s執行一次
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }
	
	//擷取服務清單重新整理間隔時間
    private static long getRefreshIntervalMs(IClientConfig clientConfig) {
       return clientConfig.get(CommonClientConfigKey.ServerListRefreshInterval, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
    }
    
           

到這裡其實可以看到,最終執行的任務也是updateListOfServers() 目的就是不斷更新用戶端緩存中的服務清單資訊

@VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
       		 //讀取配置檔案中配置的服務清單資訊
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        } 
        //将讀取到的服務清單資訊設定到用戶端緩存中,也就是将值指派給BaseLoadBalancer中的 allServerList
        updateAllServerList(servers);
    }
    
    //讀取配置檔案中配置的服務清單資訊
	@Override
	public List<Server> getUpdatedListOfServers() {
        String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
        return derive(listOfServers);
	}
	
    protected void updateAllServerList(List<T> ls) {
		//···
	             setServersList(ls);
		//···
    }
  
    //DynamicServerListLoadBalancer
    public void setServersList(List lsrv) {
        //這裡調用父類進行指派
        super.setServersList(lsrv);
        //···
    }

	//BaseLoadBalancer中進行指派操作
    public void setServersList(List lsrv) {
        //···
        allServerList = allServers;
        //···
	}
           

6.負載均衡擷取服務節點資訊

到這裡我們已經擷取到了服務清單資訊,接下來就是基于負載均衡算法,擷取某個服務節點

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        //擷取服務節點資訊
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        //這裡是輪訓算法實作,将服務節點數量傳進去,這裡incrementAndGetModulo(eligible.size())就是算出下一個服務節點的下标,eligible.get(index),傳回目标服務節點
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }
    
	//輪訓算法實作
    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextIndex.get();
            int next = (current + 1) % modulo;
            if (nextIndex.compareAndSet(current, next) && current < modulo)
                return current;
        }
    }
           

List<Server> eligible

擷取的服務節點資訊,以及通過輪訓計算後獲得的order-service服務節點資訊

SpringCloud元件之Ribbon使用及源碼分析SpringCloud元件之Ribbon使用及源碼分析一、什麼是Ribbon?二、Ribbon的基本使用三、Ribbon源碼分析四、總結

7.重構請求Url

拿到服務節點資訊之後,最後一個操作就是重構請求 Url ,将http://order-service/orders請求位址重構為服務節點資訊對應的位址http://localhost:9100/orders

@Override
	public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request, final byte[] body, final AsyncClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		return this.loadBalancer.execute(serviceName, new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
			@Override
			public ListenableFuture<ClientHttpResponse> apply(final ServiceInstance instance) throws Exception {
				//ServiceRequestWrapper中進行了請求位址的重構
				HttpRequest serviceRequest = new ServiceRequestWrapper(request,instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
				return execution.executeAsync(serviceRequest, body);
			}
		});
	}
	
	//請求重構方法
	@Override
	public URI getURI() {
		URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
		return uri;
	}
           
SpringCloud元件之Ribbon使用及源碼分析SpringCloud元件之Ribbon使用及源碼分析一、什麼是Ribbon?二、Ribbon的基本使用三、Ribbon源碼分析四、總結

四、總結

從整個流程上來看,Ribbon 先擷取所有帶 @LoadBalanced 注解的RestTemplate對象,并對其增加一個攔截器對其增強,當用戶端發起請求的時候,會被攔截器攔截。并委托給了LoadBalancerClient去執行,之後由實作類SpringClientFactory去擷取負載均衡器ILoadBalancer(接口) ,在擷取負載均衡器ZoneAwareLoadBalancer(實作類)的時候會去調用父類DynamicServerListLoadBalancer的構造方法,其中的restOfInit()的方法會被執行,該方法會去建立一個schedule,該任務會定時去拉取最新的服務端的清單資訊同步至用戶端緩存,之後服務端的服務狀态資訊會通過IPing不同的實作政策來定時檢測服務端服務是否有效。如果失效會被剔除。在擷取到服務端的有效清單之後,會通過負載均衡規則IRule來對服務進行篩選進而從叢集中獲得某個具體的Server執行個體,獲得Server執行個體後會去重構請求Url,将Url中的服務名稱替換為具體的Server服務位址,最後向目标服務發送請求.