天天看點

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

Spring Cloud LoadBalancer是Spring Cloud官方自己提供的用戶端負載均衡器,抽象和實作,用來替代Ribbon.本篇從常見使用示例開始,一步步分析源碼實作原理,最後還通過一個自定義負載均衡算法加深對源碼的了解

Spring Cloud LoadBalancer

概述

Spring Cloud LoadBalancer目前Spring官方是放在spring-cloud-commons裡,Spring Cloud最新版本為2021.0.2

Spring Cloud LoadBalancer 官網文檔位址 https://docs.spring.io/spring-cloud-commons/docs/3.1.2/reference/html/#spring-cloud-loadbalancer

Spring Cloud官網文檔位址 https://docs.spring.io/spring-cloud/docs/current/reference/html/

一方面Netflix Ribbon停止更新,Spring Cloud LoadBalancer是Spring Cloud官方自己提供的用戶端負載均衡器,抽象和實作,用來替代Ribbon。

  • 常見負載均衡器分為服務端負載均衡器(如網關層均衡負載)和用戶端層均衡負載。
    • 網關層如硬體層面的F5或軟體層面的LVS、或者nginx等。
    • 用戶端層就如Spring Cloud LoadBalancer,作為一個用戶端去發現更新維護服務清單,自定義服務的均衡負載政策(随機、輪詢、小流量的金絲雀等等)。

Spring Cloud提供了自己的用戶端負載平衡器抽象和實作。對于負載均衡機制,增加了ReactiveLoadBalancer接口,并提供了基于round-robin輪詢和Random随機的實作。為了從響應式ServiceInstanceListSupplier中選擇執行個體,需要使用ServiceInstanceListSupplier。目前支援ServiceInstanceListSupplier的基于服務發現的實作,該實作使用類路徑中的發現用戶端從Service Discovery中檢索可用的執行個體。

可以通過如下配置來禁用Spring Cloud LoadBalance

spring:
  cloud:
    loadbalancer:
      enabled: false
           

入門示例

前面simple-ecommerce項目建立已在父Pom引入三大父依賴,詳細可以看下前面的文章<<SpringCloudAlibaba注冊中心與配置中心之利器Nacos實戰與源碼分析>>,其中Spring Cloud的版本為2021.0.1,前面文章也已說過,Spring Cloud Alibaba整合在spring-cloud-starter-alibaba-nacos-discovery本身就依賴spring-cloud-loadbalancer。

注意如果是Hoxton之前的版本,預設負載均衡器為Ribbon,需要移除Ribbon引用和增加配置spring.cloud.loadbalancer.ribbon.enabled: false。

如果是在Spring Boot項目中添加下面的啟動器依賴,該starter也包含了Spring Boot Caching and Evictor.

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        </dependency>
           

我們使用Spring官方提供了負載均衡的用戶端之一RestTemplate,RestTemplate是Spring提供的用于通路Rest服務的用戶端,RestTemplate提供了多種便捷通路遠端Http服務的方法,能夠大大提高用戶端的編寫效率。預設情況下,RestTemplate預設依賴jdk的HTTP連接配接工具。建立RestTemplateConfig配置類,标注 @LoadBalanced注解,預設使用的ReactiveLoadBalancer實作是RoundRobinLoadBalancer。

package cn.itxs.ecom.order.config;

import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

@Configuration
public class RestTemplateConfig {
    @LoadBalanced
    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate() ;
    }
}
           

訂單微服務中訂單控制器增加deductRest方法

package cn.itxs.ecom.order.controller;

import cn.itxs.ecom.commons.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

/**
 * @Name :OrderController
 * @Description :訂單控制器
 * @Author :itxs
 * @Date :2022/4/10 20:15
 * @Version :1.0
 * @History :
 */
@RestController
public class OrderController {

    @Autowired
    OrderService orderService;

    @Autowired
    private RestTemplate restTemplate;

    @RequestMapping("/create/{userId}/{commodityCode}/{count}")
    public String create(@PathVariable("userId") String userId,@PathVariable("commodityCode") String commodityCode, @PathVariable("count") int count){
        return orderService.create(userId,commodityCode,count).toString();
    }

    @RequestMapping("/deductRest/{commodityCode}/{count}")
    public String deductRest(@PathVariable("commodityCode") String commodityCode, @PathVariable("count") int count){
        String url = "http://ecom-storage-service/deduct/"+commodityCode+"/"+count;
        return restTemplate.getForObject(url, String.class);
    }
}
           

前面server.port我們是放在Nacos配置中心裡,這裡我們注釋Nacos配置中心的配置放在本地配置檔案bootstrap.yml裡,分别配置為4080、4081、4082啟動3個庫存服務執行個體,并啟動訂單微服務

server:
  port: 4080
           
微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

檢視nacos服務管理-服務清單裡服務詳情,可以看到3個健康的庫存執行個體和1個訂單微服務執行個體

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

通路6次訂單dedect接口:http://localhost:4070/deductRest/1001/1 ,從測試的結果也驗證了LoadBalancer預設是輪詢負載均衡政策。

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

負載均衡算法切換

建立自定義負載均衡配置類CustomLoadBalancerConfiguration

package cn.itxs.ecom.order.config;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.loadbalancer.core.RandomLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;

public class CustomLoadBalancerConfiguration {

    @Bean
    ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment,
                                                            LoadBalancerClientFactory loadBalancerClientFactory) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RandomLoadBalancer(loadBalancerClientFactory
                .getLazyProvider(name, ServiceInstanceListSupplier.class),
                name);
    }
}
           

RestTemplateConfig配置類LoadBalancerClient指定随機的配置類,value的值為提供者也即是庫存微服務名稱。

package cn.itxs.ecom.order.config;

import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

@Configuration
@LoadBalancerClient(value = "ecom-storage-service", configuration = CustomLoadBalancerConfiguration.class)
public class RestTemplateConfig {

    @LoadBalanced
    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build() ;
    }
}
           

多次通路訂單dedect接口測試确認已切換為随機負載均衡政策。

內建方式

官網提供3中內建Spring Cloud LoadBalancer的方式,除了第一種上面已使用過,還支援Spring Web Flux響應式程式設計,WebClient是從Spring WebFlux 5.0版本開始提供的一個非阻塞的基于響應式程式設計的進行Http請求的用戶端工具。它的響應式程式設計的基于Reactor的。WebClient中提供了标準Http請求方式對應的get、post、put、delete等方法,可以用來發起相應的請求。

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

在訂單微服務中引入spring-boot-starter-webflux依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
           

訂單微服務中增加WebClientConfig配置類

package cn.itxs.ecom.order.config;

import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;

@Configuration
public class WebClientConfig {
    @LoadBalanced
    @Bean
    WebClient.Builder webClientBuilder() {
        return WebClient.builder();
    }

    @Bean
    WebClient webClient() {
        return webClientBuilder().build();
    }
}

           

訂單微服務訂單控制器中添加WebClient接口實作,代碼如下

@Autowired
    private WebClient webClient;   

	@RequestMapping(value = "/deductWebClient/{commodityCode}/{count}")
    public Mono<String> deductWebClient(@PathVariable("commodityCode") String commodityCode, @PathVariable("count") int count) {
        String url = "http://ecom-storage-service/deduct/"+commodityCode+"/"+count;
        // 基于WebClient
        Mono<String> result = webClient.get().uri(url)
                .retrieve().bodyToMono(String.class);
        return result;
    }
           

重新啟動訂單微服務

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

通路訂單控制器中的減庫存WebClient接口,http://localhost:4070/deductWebClient/1001/1 ,結果傳回成功

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

我們還可以配置基于過濾器的方式,通過WebClient使用ReactiveLoadBalancer。如果項目中添加了Spring Cloud LoadBalancer starter,并且Spring -webflux在類路徑中,ReactorLoadBalancerExchangeFilterFunction則是自動配置的。

訂單微服務訂單控制器中添加WebClient使用ReactiveLoadBalancer接口實作,代碼如下

@Autowired
    private ReactorLoadBalancerExchangeFilterFunction lbFunction;   

    @RequestMapping(value = "/deductWebFluxReactor/{commodityCode}/{count}")
    public Mono<String> deductWebFluxReactor(@PathVariable("commodityCode") String commodityCode, @PathVariable("count") int count) {
        String url = "/deduct/"+commodityCode+"/"+count;
        Mono<String> result = WebClient.builder().baseUrl("http://ecom-storage-service")
                .filter(lbFunction)
                .build()
                .get()
                .uri(url)
                .retrieve()
                .bodyToMono(String.class);
        return result;
    }
           

重新啟動訂單微服務

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

通路訂單控制器中的減庫存WebClient接口,http://localhost:4070/deductWebFluxReactor/1001/1 ,結果傳回成功

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

關于LoadBalancer官網還提供很多其他功能,有興趣可自行詳細查閱和動手實驗

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

原理

RestTemplate

Spring Cloud LoadBalancer源碼分析我們先從RestTemplate負載均衡的簡單實作來分析入手,除此之外其支援Spring Web Flux響應式程式設計的實作原理思想也是相同,都是通過用戶端添加攔截器,在攔截器中實作負載均衡。從RestTemplate的源碼中可以知道其繼承自InterceptingHttpAccessor抽象類

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

而InterceptingHttpAccessor抽象類則提供了一個方法setInterceptors,用于設定攔截器,攔截器需要實作ClientHttpRequestInterceptor接口即可,在實際遠端請求服務端接口之前會先調用攔截器的intercept方法。這裡的攔截器相當于Servlet技術中的Filter功能

// 代碼實作在抽象父類InterceptingHttpAccessor裡
	// RestTemplate.InterceptingHttpAccessor#setInterceptors
	public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
		Assert.noNullElements(interceptors, "'interceptors' must not contain null elements");
		// Take getInterceptors() List as-is when passed in here
		if (this.interceptors != interceptors) {
			this.interceptors.clear();
			this.interceptors.addAll(interceptors);
			AnnotationAwareOrderComparator.sort(this.interceptors);
		}
	}
           
微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

LoadBalancerAutoConfiguration

從官網可以知道Spring Cloud LoadBalancer放在spring-cloud-commons,是以也作為其核心的@LoadBalanced注解也就是由spring-cloud-commons來實作,依據SpringBoot自動裝配的原理先檢視依賴包的實作邏輯,不難發現spring-cloud-commons引入了自動配置類LoadBalancerAutoConfiguration和ReactorLoadBalancerClientAutoConfiguration。

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

當滿足上述的條件時(@Conditional為條件注解),将自動建立LoadBalancerInterceptor并注入到RestTemplate中。

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

LoadBalancerLnterceptor

LoadBalancerInterceptor實作了ClientHttpRequestInterceptor接口,是以也實作intercept方法,用于實作負載均衡的攔截處理。

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

LoadBalancerClient

LoadBalancerClient用于進行負載均衡邏輯,繼承自ServiceInstanceChooser接口,從服務清單中選擇出一個服務位址進行調用。在LoadBalancerClient種存在兩個execute()方法,均是用來執行請求的,reconstructURI()是用來重構URL。

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

對于LoadBalancerClient接口Spring Cloud LoadBalancer的提供預設實作為BlockingLoadBalancerClient

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析
@SuppressWarnings({ "unchecked", "rawtypes" })
public class BlockingLoadBalancerClient implements LoadBalancerClient {

	private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;

	/**
	 * @deprecated in favour of
	 * {@link BlockingLoadBalancerClient#BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory)}
	 */
	@Deprecated
	public BlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory,
			LoadBalancerProperties properties) {
		this.loadBalancerClientFactory = loadBalancerClientFactory;
	}

	public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
		this.loadBalancerClientFactory = loadBalancerClientFactory;
	}

	@Override
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
		String hint = getHint(serviceId);
		LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request,
				new DefaultRequestContext(request, hint));
		Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
		supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
		ServiceInstance serviceInstance = choose(serviceId, lbRequest);
        // 選擇服務
		if (serviceInstance == null) {
			supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
					new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		return execute(serviceId, serviceInstance, lbRequest);
	}

	@Override
	public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
			throws IOException {
		DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
		Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
		Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();
		supportedLifecycleProcessors
				.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));
		try {
			T response = request.apply(serviceInstance);
			Object clientResponse = getClientResponse(response);
			supportedLifecycleProcessors
					.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
							lbRequest, defaultResponse, clientResponse)));
			return response;
		}
		catch (IOException iOException) {
			supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
					new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));
			throw iOException;
		}
		catch (Exception exception) {
			supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
					new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));
			ReflectionUtils.rethrowRuntimeException(exception);
		}
		return null;
	}

	private <T> Object getClientResponse(T response) {
		ClientHttpResponse clientHttpResponse = null;
		if (response instanceof ClientHttpResponse) {
			clientHttpResponse = (ClientHttpResponse) response;
		}
		if (clientHttpResponse != null) {
			try {
				return new ResponseData(clientHttpResponse, null);
			}
			catch (IOException ignored) {
			}
		}
		return response;
	}

	private Set<LoadBalancerLifecycle> getSupportedLifecycleProcessors(String serviceId) {
		return LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(
				loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
				DefaultRequestContext.class, Object.class, ServiceInstance.class);
	}

	@Override
	public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
		return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
	}

	@Override
	public ServiceInstance choose(String serviceId) {
		return choose(serviceId, REQUEST);
	}

    // 通過不同的負載均衡用戶端實作選擇不同的服務
	@Override
	public <T> ServiceInstance choose(String serviceId, Request<T> request) {
		ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
		if (loadBalancer == null) {
			return null;
		}
		Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
		if (loadBalancerResponse == null) {
			return null;
		}
		return loadBalancerResponse.getServer();
	}

	private String getHint(String serviceId) {
		LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
		String defaultHint = properties.getHint().getOrDefault("default", "default");
		String hintPropertyValue = properties.getHint().get(serviceId);
		return hintPropertyValue != null ? hintPropertyValue : defaultHint;
	}

}

           

LoadBalancerClientFactory

BlockingLoadBalancerClient中持有LoadBalancerClientFactory通過調用其getInstance方法擷取具體的負載均衡用戶端。通過工廠類LoadBalancerClientFactory擷取具體的負載均衡器執行個體,後面的loadBalancer.choose(request)調用其接口choose()方法實作根據負載均衡算法選擇下一個伺服器完成負載均衡,而ReactiveLoadBalancer getInstance(String serviceId) 有預設實作LoadBalancerClientFactory

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

LoadBalancerClientFactory用戶端實作了不同的負載均衡算法,比如輪詢、随機等。LoadBalancerClientFactory繼承自NamedContextFactory,NamedContextFactory繼承ApplicationContextAware,實作Spring ApplicationContext容器操作。

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

ReactiveLoadBalancer

ReactiveLoadBalancer負載均衡器實作服務選擇,Spring Cloud Balancer中實作了輪詢RoundRobinLoadBalancer、随機RandomLoadBalancer、NacosLoadBalancer算法。

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

LoadBalancerClientConfiguration

如果沒有顯式指定負載均衡算法,預設預設值為RoundRobinLoadBalancer

@Bean
	@ConditionalOnMissingBean
	public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
			LoadBalancerClientFactory loadBalancerClientFactory) {
		String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
		return new RoundRobinLoadBalancer(
				loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
	}
           
微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

LoadBalancerRequestFactory

LoadBalancerRequest工廠類調用createRequest方法用于建立LoadBalancerRequest。其内部持有LoadBalancerClient對象也即持有BlockingLoadBalancerClient。

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

在日常項目中,一般負載均衡都是結合Feign使用,後續我們有時間再來分析Feign整合LoadBalancer的自動配置類FeignLoadBalancerAutoConfiguration的實作

ReactorLoadBalancerClientAutoConfiguration

我們也抛一下基于WebClient的@Loadbalanced的流程的引入,首先聲明負載均衡過濾器ReactorLoadBalancerClientAutoConfiguration是一個自動裝配器類,在項目中引入了 WebClient 和 ReactiveLoadBalancer 類之後,自動裝配流程就開始運作,它會初始化一個實作了 ExchangeFilterFunction 的執行個體,在後面該執行個體将作為過濾器被注入到WebClient。後續流程有興趣再自行研究

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

自定義負載均衡器

從上面可以知道LoadBalancerClientFactory是建立客戶機、負載均衡器和客戶機配置執行個體的工廠。它根據用戶端名稱建立一個Spring ApplicationContext,并從中提取所需的bean。是以進入到LoadBalancerClientFactory類中,需要去實作它的子接口ReactorServiceInstanceLoadBalancer,因為去擷取負載均衡器執行個體的時候,是通過去容器中查找ReactorServiceInstanceLoadBalancer類型的bean來實作的,可以參照RandomLoadBalancer實作代碼

package org.springframework.cloud.loadbalancer.core;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;

public class RandomLoadBalancer implements ReactorServiceInstanceLoadBalancer {

	private static final Log log = LogFactory.getLog(RandomLoadBalancer.class);

	private final String serviceId;

	private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

	/**
	 * @param serviceInstanceListSupplierProvider a provider of
	 * {@link ServiceInstanceListSupplier} that will be used to get available instances
	 * @param serviceId id of the service for which to choose an instance
	 */
	public RandomLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
			String serviceId) {
		this.serviceId = serviceId;
		this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
	}

	@SuppressWarnings("rawtypes")
	@Override
	public Mono<Response<ServiceInstance>> choose(Request request) {
		ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
				.getIfAvailable(NoopServiceInstanceListSupplier::new);
		return supplier.get(request).next()
				.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
	}

	private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
			List<ServiceInstance> serviceInstances) {
		Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
		if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
			((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
		}
		return serviceInstanceResponse;
	}

	private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
		if (instances.isEmpty()) {
			if (log.isWarnEnabled()) {
				log.warn("No servers available for service: " + serviceId);
			}
			return new EmptyResponse();
		}
		int index = ThreadLocalRandom.current().nextInt(instances.size());

		ServiceInstance instance = instances.get(index);

		return new DefaultResponse(instance);
	}

}
           

保留核心實作進行簡單仿寫如下

package cn.itxs.ecom.order.config;

import java.util.List;
import java.util.Random;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import reactor.core.publisher.Mono;

public class ItxsRandomLoadBalancerClient implements ReactorServiceInstanceLoadBalancer {
    // 服務清單
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

    public ItxsRandomLoadBalancerClient(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider) {
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
    }

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable();
        return supplier.get().next().map(this::getInstanceResponse);
    }

    /**
     * 使用随機數擷取服務
     * @param instances
     * @return
     */
    private Response<ServiceInstance> getInstanceResponse(
            List<ServiceInstance> instances) {
        System.out.println("ItxsRandomLoadBalancerClient start");
        if (instances.isEmpty()) {
            return new EmptyResponse();
        }

        System.out.println("ItxsRandomLoadBalancerClient random");
        // 随機算法
        int size = instances.size();
        Random random = new Random();
        ServiceInstance instance = instances.get(random.nextInt(size));

        return new DefaultResponse(instance);
    }
}
           

将上面CustomLoadBalancerConfiguration替換為如下内容

package cn.itxs.ecom.order.config;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.context.annotation.Bean;

public class CustomLoadBalancerConfiguration {

    @Bean
    public ReactorServiceInstanceLoadBalancer customLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider) {
        return new ItxsRandomLoadBalancerClient(serviceInstanceListSupplierProvider);
    }
}
           

啟動庫存微服務和訂單微服務,通路http://localhost:4070/deductRest/1001/1 ,控制台已列印自定義ItxsRandomLoadBalancerClient中的日志和成功通路結果

微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析
微服務生态元件之Spring Cloud LoadBalancer詳解和源碼分析

**本人部落格網站 **IT小神 www.itxiaoshen.com