一、Ribbon简介
Spring Cloud Ribbon 是Spring Cloud Netflix 子项目的核心组件之一,主要给服务间调用及API网关转发提供负载均衡的功能。
Ribbon是负载均衡+RestTemplate调用,实现了RPC的远程调用。
二、项目搭建
pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
新版的eureka中已经集成了ribbon,因此不需要添加其他依赖。
- 使用@LoadBalanced注解赋予RestTemplate负载均衡的能力
@SpringBootApplication
@EnableEurekaClient
public class ConsumerOrderRibbonApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerOrderRibbonApplication.class, args);
}
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
}
- 使用RestTemplate调用HTTP接口
@GetMapping("/consumer/payment/get/{id}")
public CommonResult<Payment> getPaymentById(@PathVariable("id") Long id) {
return restTemplate.getForObject(PAYMENT_URL + "/payment/get/" + id, CommonResult.class);
}
三、修改Ribbon负载均衡策略
- 负载均衡策略 ribbon默认使用的是RoundRobinRule,现在将默认策略修改为RandomRule.
在Ribbon的官方文档中:
即自定义的策略类不能放在@ComponentScan所扫描的当前包及其子包中,不然的话,相当于整个项目都采用了我们定制的策略,就达不到特殊定制的要求了。
创建额外的包,放置定制的策略:
MySelfRule代码:
@Configuration
public class MySelfRule {
@Bean
public IRule myRule() {
return new RandomRule();
}
}
在主启动类中增加@RibbonClient注解
@SpringBootApplication
@EnableEurekaClient
@RibbonClient(name = "CLOUD-PAYMENT",configuration = MySelfRule.class)
public class ConsumerOrderRibbonApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerOrderRibbonApplication.class, args);
}
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
}
测试,8001和8002端口不再是交替提供服务,而是随机提供服务。
四、重写轮询算法
- 原轮询算法代码:
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public RoundRobinRule() {
nextServerCyclicCounter = new AtomicInteger(0);
}
public RoundRobinRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
/**
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
- 方案:第几次请求数 % 服务器集群总数量 = 实际调用服务器的位置下标
- 新建自定义负载均衡接口:
/**
* 自定义负载均衡接口
*
* @author Castle
* @Date 2021/3/31 11:17
*/
public interface LoadBalancer {
/**
* 获取服务实例
*
* @param serviceInstances
* @return
*/
ServiceInstance instances(List<ServiceInstance> serviceInstances);
}
- 实现类
@Component
@Slf4j
public class MyLoadBalancer implements LoadBalancer {
/**
* 记录当前请求的次数,默认为0,重启后重置为0
*/
private AtomicInteger currentRequestTimes = new AtomicInteger(0);
/**
* 获取当前的请求次数,并且请求次数加1
*
* 原代码是一个典型的Atomically increments by one the current value
*
* @return
*/
private final int getCurrentTimesAndIncrement() {
int current;
int next;
do {
current = this.currentRequestTimes.get();
log.info("=====AtomicInteger currentTimes的数值:" + current);
next = current >= Integer.MAX_VALUE ? 0 : current + 1;
} while (!this.currentRequestTimes.compareAndSet(current, next));
//如果当前值与期望值一致,则代表没有其他用户请求,可以修改,请求次数+1;
//如果不一致,则代表有其他用户操作该值,自旋等待直到当前值与期望值一致。
log.info("=====当前访问次数=====:" + next);
return next;
}
/**
* 实际调用服务器位置下标=第几次请求数%集群总数量
*
* @param serviceInstances
* @return
*/
@Override
public ServiceInstance instances(List<ServiceInstance> serviceInstances) {
int index = getCurrentTimesAndIncrement() % serviceInstances.size();
return serviceInstances.get(index);
}
}
- 在controller中引入并调用
@RestController
@Slf4j
public class ConsumerOrderController {
@Resource
private RestTemplate restTemplate;
/**
* 引入自实现的轮询接口
*/
@Resource
private LoadBalancer loadBalancer;
/**
* 用于获取服务信息
*/
@Resource
private DiscoveryClient discoveryClient;
private static final String PAYMENT_URL = "http://CLOUD-PAYMENT";
@GetMapping("/consumer/payment/get/{id}")
public CommonResult<Payment> getPaymentById(@PathVariable("id") Long id) {
return restTemplate.getForObject(PAYMENT_URL + "/payment/get/" + id, CommonResult.class);
}
@GetMapping("/consumer/payment/lb")
public String getPaymentLB() {
//返回服务器列表
List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT");
log.info(instances.toString());
if (instances == null || instances.size() <= 0) {
log.info("无服务实例");
return null;
}
ServiceInstance serviceInstance = loadBalancer.instances(instances);
URI uri = serviceInstance.getUri();
log.info("==========URI :" + uri);
return restTemplate.getForObject(uri + "/payment/lb", String.class);
}
}
- Resttemplate取消@LoadBalancer注解
@SpringBootApplication
@EnableEurekaClient
//@RibbonClient(name = "CLOUD-PAYMENT",configuration = MySelfRule.class) 测试自实现轮询接口,需注销
public class ConsumerOrderRibbonApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerOrderRibbonApplication.class, args);
}
@Bean
//@LoadBalanced 测试自实现轮询接口,需注销
RestTemplate restTemplate() {
return new RestTemplate();
}
}
- PaymentController中的方法
@GetMapping("/payment/lb")
public String getLb() {
return "当前服务端口:" + serverPort;
}
测试:8001和8002交替提供服务。
完整源代码:github