天天看點

Ribbon 原理初探

Ribbon 原理初探

決心是成功的開始

Ribbon 使用方法

    Ribbon 的使用方法是在 RestTemplate 的方法上面加上一個 **@LoadBalanced **注解,之後這個方法進行請求的時候就可以進行負載均衡請求了

@Configuration
public class RestConfig {
    @LoadBalanced
    @Bean
    public RestTemplate restTemplate(){
        return new RestTemplate();
    }
}      

負載均衡原理圖解

Ribbon 原理初探

RestTemplate 方法使用

    如果不用 Ribbon 的話,在進行 http 請求的時候,直接 new 一個 RestTemplate 對象,通過相應的方法比如 getForEntity() 進行調用就可以了,這種情況直接請求就可以了,不會進行負載均衡的

Ribbon 原理初探
@Override
public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType,
                                          Object... uriVariables)
        throws RestClientException {
    RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
    ResponseExtractor<ResponseEntity<T>> responseExtractor =
        responseEntityExtractor(responseType);
    // 執行請求
    return nonNull(execute(url, HttpMethod.GET, requestCallback,
                           responseExtractor, uriVariables));
}      

@LoadBalanced 注解源碼

/**
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}      
public interface LoadBalancerClient extends ServiceInstanceChooser {
    // 執行負載均衡器指定的服務請求
    <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
    // 執行負載均衡器指定的服務請求
    <T> T execute(String serviceId, ServiceInstance serviceInstance,
            LoadBalancerRequest<T> request) throws IOException;
    // 建立一個具有真實主機和端口的适當URI,供系統使用。
    // 一些系統使用帶有邏輯服務名稱的URI作為主機,
    // 例如 http://myservice/path/to/service
    // 這将用ServiceInstan中的主機:端口替換服務名稱
    URI reconstructURI(ServiceInstance instance, URI original);
}
/**
 * 通過負載均衡器選擇一個服務執行個體
 */
public interface ServiceInstanceChooser {
    ServiceInstance choose(String serviceId);
}      

Ribbon用戶端配置類RibbonClientConfiguration

public class RibbonClientConfiguration {
// 用戶端連結逾時
public static final int DEFAULT_CONNECT_TIMEOUT = 1000;

// 用戶端讀取逾時
public static final int DEFAULT_READ_TIMEOUT = 1000;

@RibbonClientName
private String name = "client";

@Autowired
private PropertiesFactory propertiesFactory;
    ......
}      

1. RibbonClientConfig 用戶端配置類

// Ribbon 用戶端配置類
@Bean
@ConditionalOnMissingBean
public IClientConfig ribbonClientConfig() {
    DefaultClientConfigImpl config = new DefaultClientConfigImpl();
    config.loadProperties(this.name);
    config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
    config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
    config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
    return config;
}      

2. RibbonRule 規則

// Ribbon 規則類
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
    if (this.propertiesFactory.isSet(IRule.class, name)) {
        return this.propertiesFactory.get(IRule.class, config, name);
    }
    ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
    rule.initWithNiwsConfig(config);
    return rule;
}      

3. RibbonPing Ping方式

// Ribbon Ping類
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
    if (this.propertiesFactory.isSet(IPing.class, name)) {
        return this.propertiesFactory.get(IPing.class, config, name);
    }
    return new DummyPing();
}      

4. Ribbon ribbonServerList 服務清單

// Ribbon 服務清單
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
    if (this.propertiesFactory.isSet(ServerList.class, name)) {
        return this.propertiesFactory.get(ServerList.class, config, name);
    }
    ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
    serverList.initWithNiwsConfig(config);
    return serverList;
}      

5. RibbonServerListUpdater Ribbon 服務清單更新器

// Ribbon 服務清單更新器,預設選擇的是 PollingServerListUpdater 定時拉取
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
    return new PollingServerListUpdater(config);
}      

6. RibbonLoadBalancer 服務的負載均衡器

// Ribbon 服務的負載均衡器
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
            serverListFilter, serverListUpdater);
}      

7. RibbonServerListFilter 服務清單篩選器

// 服務清單篩選器
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
    if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
        return this.propertiesFactory.get(ServerListFilter.class, config, name);
    }
    ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
    filter.initWithNiwsConfig(config);
    return filter;
}      

負載均衡器初始化核心配置類

Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();

@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
// 負載均衡 RestTemplate 初始化配置
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
        final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
    return () -> restTemplateCustomizers.ifAvailable(customizers -> {
        for (RestTemplate restTemplate :
             LoadBalancerAutoConfiguration.this.restTemplates) {
            for (RestTemplateCustomizer customizer : customizers) {
                customizer.customize(restTemplate);
            }
        }
    });
}

@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
        LoadBalancerClient loadBalancerClient) {
    return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
// 負載均衡攔截器配置
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
    @Bean
    public LoadBalancerInterceptor ribbonInterceptor(
            LoadBalancerClient loadBalancerClient,
            LoadBalancerRequestFactory requestFactory) {
        return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public RestTemplateCustomizer restTemplateCustomizer(
            final LoadBalancerInterceptor loadBalancerInterceptor) {
        return restTemplate -> {
            List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                    restTemplate.getInterceptors());
            list.add(loadBalancerInterceptor);
            restTemplate.setInterceptors(list);
        };
    }
                        ......
}      

負載均衡攔截器 LoadBalancerInterceptor

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
        final ClientHttpRequestExecution execution) throws IOException {
    // 解析服務位址
    final URI originalUri = request.getURI();
    // 請求服務名稱
    String serviceName = originalUri.getHost();
    // 執行的請求流程
    return this.loadBalancer.execute(serviceName,
            this.requestFactory.createRequest(request, body, execution));
}      

負載均衡器執行請求 loadBalancer.execute

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
        throws IOException {
    // 擷取負載均衡器
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    // 通過負載均衡器選擇可利用的服務
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    // 建立 Ribbon 服務
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
            isSecure(server, serviceId),
            serverIntrospector(serviceId).getMetadata(server));
    // 真正的執行請求
    return execute(serviceId, ribbonServer, request);
}      

真正的執行請求

@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
        LoadBalancerRequest<T> request) throws IOException {
    Server server = null;
    if (serviceInstance instanceof RibbonServer) {
        server = ((RibbonServer) serviceInstance).getServer();
    }
    // 如果服務為空的情況下那麼就抛出異常沒有執行個體可以使用
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }

    RibbonLoadBalancerContext context = this.clientFactory
            .getLoadBalancerContext(serviceId);
    RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

    try {
        // 真正的請求方法
        T returnVal = request.apply(serviceInstance);
        statsRecorder.recordStats(returnVal);
        return returnVal;
    }
    return null;
}      

ServiceRequestWrapper 封裝請求進行服務請求

@Override
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request,
        final byte[] body, final AsyncClientHttpRequestExecution execution)
        throws IOException {
    // 擷取請求的路徑URI
    final URI originalUri = request.getURI();
    // 擷取服務的位址
    String serviceName = originalUri.getHost();
    return this.loadBalancer.execute(serviceName,
            new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
                // 執行的方法
                @Override
                public ListenableFuture<ClientHttpResponse> apply(
                        final ServiceInstance instance) throws Exception {
                    // 封裝請求
                    HttpRequest serviceRequest = new ServiceRequestWrapper(request,
                            instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
                    // 進行服務調用執行通路
                    return execution.executeAsync(serviceRequest, body);
                }

            });
}      

小結