文章目錄
- SpringCloud元件之Ribbon使用及源碼分析
- 一、什麼是Ribbon?
- 二、Ribbon的基本使用
-
- 1.測試代碼編寫
- 2.結果測試
- 三、Ribbon源碼分析
-
- [email protected]注解
- 2.LoadBalancerAutoConfiguration自動裝配
- 3. LoadBalancerInterceptor攔截器
- 4.ILoadBalancer負載均衡器
- 5.服務清單更新與擷取
- 6.負載均衡擷取服務節點資訊
- 7.重構請求Url
- 四、總結
本文僅是學習記錄,如有錯誤,還請大佬們不吝指出.
SpringCloud元件之Ribbon使用及源碼分析
一、什麼是Ribbon?
Ribbon 是一個具有内置軟體負載平衡器的程序間通信(遠端過程調用)庫。簡單來說Ribbon就是一個用戶端負載均衡器,其作用就是解析目标服務的可調用的服務清單,基于負載均衡算法來實作請求的分發。
二、Ribbon的基本使用
1.測試代碼編寫
編寫一個簡單的user服務和一個order服務來測試,通過user服務來遠端調用order服務擷取使用者訂單資訊
order服務
@RestController
public class OrderService {
@Value("${server.port}")
private String port;
@GetMapping("/orders")
public String getAllOrder(){
System.err.println("order service port : " + port);
return "return all order..";
}
}
order服務配置檔案
server:
port: 9000
spring:
application:
name: order-service
user服務
@RestController
public class UserController{
@Autowired
private RestTemplate restTemplate;
@Bean
@LoadBalanced
public RestTemplate getRestTemplate(RestTemplateBuilder restTemplateBuilder){
return restTemplateBuilder.build();
}
@GetMapping("/user/{userId}")
public String getAllOrder(@PathVariable("userId") String userId){
//調用訂單服務查詢使用者訂單資訊
String result = restTemplate.getForObject("http://order-service/orders", String.class);
return "userId: " + userId + " order info : " + result;
}
}
user服務配置檔案
server:
port: 8000
spring:
application:
name: user-service
# 配置order服務的提供者的位址清單,這裡就相當于是注冊中心中的生産者的資訊
order-service:
ribbon:
listOfServers: localhost:9000,localhost:9100
2.結果測試
通路使用者服務接口測試
這裡我調用了兩次,可以看出在user服務調用order服務時,是負載均衡調用的
三、Ribbon源碼分析
[email protected]注解
為什麼僅僅加了一個LoadBalanced注解就實作了負載均衡,來看一下LoadBalanced注解
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
從LoadBalanced中可以看出,除了元注解資訊,就隻有一個Qualifier注解,這個Qualifier就相當于是一個标記,用來辨別目前的RestTemplate,在LoadBalancerAutoConfiguration自動裝配類中會去解析LoadBalanced注解,在這裡面會擷取到所有加了LoadBalanced注解的RestTemplate的Bean對象
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
//擷取所有加了LoadBalanced注解的RestTemplate對象
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
2.LoadBalancerAutoConfiguration自動裝配
那麼擷取到這些加了**@LoadBalanced**注解的RestTemplate有什麼作用?在LoadBalancerAutoConfiguration自動裝配類中,擷取到restTemplates後,對每個restTemplate做了一個包裝,增加了一個攔截器
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
//周遊restTemplates,對每個restTemplate進行包裝,其實就是對每個restTemplate都增加一個攔截器
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
//先擷取預設的攔截器,再将loadBalancerInterceptor添加進攔截器鍊
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
現在已經将所有添加了LoadBalanced注解的RestTemplate添加了一個攔截器,所有當user服務在調用order服務的時候一定會進入攔截器
3. LoadBalancerInterceptor攔截器
那麼進入攔截器之後,攔截器又做了什麼? 我們進到LoadBalancerInterceptor,
這裡LoadBalancerInterceptor将請求攔截到之後,就委托給了LoadBalancerClient去執行,LoadBalancerClient是一個接口,最終由RibbonLoadBalancerClient實作類執行
private LoadBalancerClient loadBalancer;
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,this.requestFactory.createRequest(request, body, execution));
}
4.ILoadBalancer負載均衡器
我們接着看RibbonLoadBalancerClient.execute()方法,這裡面做了兩個操作,一個就是根據服務名稱擷取負載均衡器,第二個就是根據負載均衡器獲得某個Server執行個體
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
//擷取負載均衡器
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//根據負載均衡器獲得一個Server執行個體,這裡的Server是從叢集中獲得的某個Server執行個體
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
我們先看擷取負載均衡器是如何操作的,跟進方法可以看到在getInstance()執行個體化過程中,基于工廠模式通過服務名稱,獲得一個上下文環境,然後再通過type(ILoadBalancer.class)從上下文環境中得到一個ILoadBalancer 的執行個體,這裡getInstance()的時候還做了一個操作就是基于SpringClientFactory初始化了RibbonClientConfiguration類
public SpringClientFactory() {
super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
}
public <C> C getInstance(String name, Class<C> type) {
C instance = super.getInstance(name, type);
if (instance != null) {
return instance;
}
IClientConfig config = getInstance(name, IClientConfig.class);
return instantiateWithConfig(getContext(name), type, config);
}
public <T> T getInstance(String name, Class<T> type) {
AnnotationConfigApplicationContext context = this.getContext(name);
return BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0 ? context.getBean(type) : null;
}
ILoadBalancer 的類繼承實作關系圖
這裡BEBUG可以看出,就構造出了一個執行個體ZoneAwareLoadBalancer
5.服務清單更新與擷取
拿到了負載均衡器後,緊接着看第二個操作,getServer(loadBalancer, hint),其中chooseServer()就是基于負載均衡算法選擇叢集中的某個服務節點
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// 這裡 default 就是采用預設的負載均衡算法,
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
//截取ZoneAwareLoadBalancer類chooseServer方法中部分代碼
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
//這裡調用了父類的chooseServer方法
return super.chooseServer(key);
}
protected IRule rule = DEFAULT_RULE;
//根據不同key選擇相應的負載均衡規則,再選擇出服務節點
public Server chooseServer(Object key) {
if (this.counter == null) {
this.counter = this.createCounter();
}
this.counter.increment();
if (this.rule == null) {
return null;
} else {
try {
//調用PredicateBasedRule中的choose(key)方法
return this.rule.choose(key);
} catch (Exception var3) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", new Object[]{this.name, key, var3});
return null;
}
}
}
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
//這裡lb.getAllServers()擷取了所有服務清單資訊,再對叢集中的order-servier服務進行輪訓篩選
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
從上面choose方法中可以看到調用了lb.getAllServers() 方法來擷取所有的服務清單資訊,那麼這裡到底是如何獲得服務清單資訊的嘞? 我們進入到getAllServers()方法中
/**
* @return All known servers, both reachable and unreachable.
*/
// ILoadBalancer 接口類中的一個方法
public List<Server> getAllServers();
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
@Override
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
可以看出getAllServers()方法操作了一個allServerList變量,并傳回一個不可修改的服務清單,這裡的allServerList就是用戶端用來緩存服務清單資訊的,那具體allServerList是什麼時候被指派得嘞?這時候我們就要回到ILoadBalancer執行個體初始化的時候了,我們回到RibbonClientConfiguration中ZoneAwareLoadBalancer被初始化的地方
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,serverListFilter, serverListUpdater);
}
當子類ZoneAwareLoadBalancer被初始化的時候會去調用父類的構造方法,進入父類DynamicServerListLoadBalancer
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
從這裡我們可以看出在父類初始化的時候調用了一個restOfInit()方法,在這個方法中有兩個操作方法enableAndInitLearnNewServersFeature()和updateListOfServers()
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
我們先看enableAndInitLearnNewServersFeature()方法,這個方法的作用就是啟動一個任務定時去更新服務清單資訊
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
//這裡開啟一個updateAction的任務
serverListUpdater.start(updateAction);
}
//updateAction任務
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
//由于服務清單資訊是配置在本地的我們進入到PollingServerListUpdater實作類
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//使用scheduled來做定時任務調用
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
//定時更新用戶端緩存中的服務清單資訊的任務
wrapperRunnable,
//任務延遲1s執行
initialDelayMs,
//會先讀取配置檔案中的ServerListRefreshInterval的資訊,沒有配置就預設30s執行一次
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
//擷取服務清單重新整理間隔時間
private static long getRefreshIntervalMs(IClientConfig clientConfig) {
return clientConfig.get(CommonClientConfigKey.ServerListRefreshInterval, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}
到這裡其實可以看到,最終執行的任務也是updateListOfServers() 目的就是不斷更新用戶端緩存中的服務清單資訊
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
//讀取配置檔案中配置的服務清單資訊
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
//将讀取到的服務清單資訊設定到用戶端緩存中,也就是将值指派給BaseLoadBalancer中的 allServerList
updateAllServerList(servers);
}
//讀取配置檔案中配置的服務清單資訊
@Override
public List<Server> getUpdatedListOfServers() {
String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
return derive(listOfServers);
}
protected void updateAllServerList(List<T> ls) {
//···
setServersList(ls);
//···
}
//DynamicServerListLoadBalancer
public void setServersList(List lsrv) {
//這裡調用父類進行指派
super.setServersList(lsrv);
//···
}
//BaseLoadBalancer中進行指派操作
public void setServersList(List lsrv) {
//···
allServerList = allServers;
//···
}
6.負載均衡擷取服務節點資訊
到這裡我們已經擷取到了服務清單資訊,接下來就是基于負載均衡算法,擷取某個服務節點
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
//擷取服務節點資訊
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
//這裡是輪訓算法實作,将服務節點數量傳進去,這裡incrementAndGetModulo(eligible.size())就是算出下一個服務節點的下标,eligible.get(index),傳回目标服務節點
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
//輪訓算法實作
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
List<Server> eligible
擷取的服務節點資訊,以及通過輪訓計算後獲得的order-service服務節點資訊
7.重構請求Url
拿到服務節點資訊之後,最後一個操作就是重構請求 Url ,将http://order-service/orders請求位址重構為服務節點資訊對應的位址http://localhost:9100/orders
@Override
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request, final byte[] body, final AsyncClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return this.loadBalancer.execute(serviceName, new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
@Override
public ListenableFuture<ClientHttpResponse> apply(final ServiceInstance instance) throws Exception {
//ServiceRequestWrapper中進行了請求位址的重構
HttpRequest serviceRequest = new ServiceRequestWrapper(request,instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
return execution.executeAsync(serviceRequest, body);
}
});
}
//請求重構方法
@Override
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
return uri;
}
四、總結
從整個流程上來看,Ribbon 先擷取所有帶 @LoadBalanced 注解的RestTemplate對象,并對其增加一個攔截器對其增強,當用戶端發起請求的時候,會被攔截器攔截。并委托給了LoadBalancerClient去執行,之後由實作類SpringClientFactory去擷取負載均衡器ILoadBalancer(接口) ,在擷取負載均衡器ZoneAwareLoadBalancer(實作類)的時候會去調用父類DynamicServerListLoadBalancer的構造方法,其中的restOfInit()的方法會被執行,該方法會去建立一個schedule,該任務會定時去拉取最新的服務端的清單資訊同步至用戶端緩存,之後服務端的服務狀态資訊會通過IPing不同的實作政策來定時檢測服務端服務是否有效。如果失效會被剔除。在擷取到服務端的有效清單之後,會通過負載均衡規則IRule來對服務進行篩選進而從叢集中獲得某個具體的Server執行個體,獲得Server執行個體後會去重構請求Url,将Url中的服務名稱替換為具體的Server服務位址,最後向目标服務發送請求.