天天看點

SpringCloud(3)負載均衡——Ribbon

一、Ribbon簡介

Spring Cloud Ribbon 是Spring Cloud Netflix 子項目的核心元件之一,主要給服務間調用及API網關轉發提供負載均衡的功能。

Ribbon是負載均衡+RestTemplate調用,實作了RPC的遠端調用。

二、項目搭建

pom.xml

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
           

新版的eureka中已經內建了ribbon,是以不需要添加其他依賴。

SpringCloud(3)負載均衡——Ribbon
  • 使用@LoadBalanced注解賦予RestTemplate負載均衡的能力
@SpringBootApplication
@EnableEurekaClient
public class ConsumerOrderRibbonApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerOrderRibbonApplication.class, args);
    }

    @Bean
    @LoadBalanced
    RestTemplate restTemplate() {
        return new RestTemplate();
    }
}
           
  • 使用RestTemplate調用HTTP接口
@GetMapping("/consumer/payment/get/{id}")
    public CommonResult<Payment> getPaymentById(@PathVariable("id") Long id) {
        return restTemplate.getForObject(PAYMENT_URL + "/payment/get/" + id, CommonResult.class);
    }
           

三、修改Ribbon負載均衡政策

  • 負載均衡政策
    SpringCloud(3)負載均衡——Ribbon
    ribbon預設使用的是RoundRobinRule,現在将預設政策修改為RandomRule.

在Ribbon的官方文檔中:

SpringCloud(3)負載均衡——Ribbon

即自定義的政策類不能放在@ComponentScan所掃描的目前包及其子包中,不然的話,相當于整個項目都采用了我們定制的政策,就達不到特殊定制的要求了。

建立額外的包,放置定制的政策:

SpringCloud(3)負載均衡——Ribbon

MySelfRule代碼:

@Configuration
public class MySelfRule {

    @Bean
    public IRule myRule() {
        return new RandomRule();
    }
}
           

在主啟動類中增加@RibbonClient注解

@SpringBootApplication
@EnableEurekaClient
@RibbonClient(name = "CLOUD-PAYMENT",configuration = MySelfRule.class) 
public class ConsumerOrderRibbonApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerOrderRibbonApplication.class, args);
    }

    @Bean
    @LoadBalanced
    RestTemplate restTemplate() {
        return new RestTemplate();
    }
}
           

測試,8001和8002端口不再是交替提供服務,而是随機提供服務。

四、重寫輪詢算法

  • 原輪詢算法代碼:
public class RoundRobinRule extends AbstractLoadBalancerRule {

    private AtomicInteger nextServerCyclicCounter;
    private static final boolean AVAILABLE_ONLY_SERVERS = true;
    private static final boolean ALL_SERVERS = false;

    private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);

    public RoundRobinRule() {
        nextServerCyclicCounter = new AtomicInteger(0);
    }

    public RoundRobinRule(ILoadBalancer lb) {
        this();
        setLoadBalancer(lb);
    }

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }

            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }

    /**
     * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
     *
     * @param modulo The modulo to bound the value of the counter.
     * @return The next value.
     */
    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextServerCyclicCounter.get();
            int next = (current + 1) % modulo;
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

    @Override
    public Server choose(Object key) {
        return choose(getLoadBalancer(), key);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}
           
  • 方案:第幾次請求數 % 伺服器叢集總數量 = 實際調用伺服器的位置下标
  1. 建立自定義負載均衡接口:
/**
 * 自定義負載均衡接口
 *
 * @author Castle
 * @Date 2021/3/31 11:17
 */
public interface LoadBalancer {
    /**
     * 擷取服務執行個體
     *
     * @param serviceInstances
     * @return
     */
    ServiceInstance instances(List<ServiceInstance> serviceInstances);
}
           
  1. 實作類
@Component
@Slf4j
public class MyLoadBalancer implements LoadBalancer {

    /**
     * 記錄目前請求的次數,預設為0,重新開機後重置為0
     */
    private AtomicInteger currentRequestTimes = new AtomicInteger(0);

    /**
     * 擷取目前的請求次數,并且請求次數加1
     * 
     * 原代碼是一個典型的Atomically increments by one the current value
     *
     * @return
     */
    private final int getCurrentTimesAndIncrement() {
        int current;
        int next;
        do {
            current = this.currentRequestTimes.get();
            log.info("=====AtomicInteger currentTimes的數值:" + current);
            next = current >= Integer.MAX_VALUE ? 0 : current + 1;
        } while (!this.currentRequestTimes.compareAndSet(current, next));
        //如果目前值與期望值一緻,則代表沒有其他使用者請求,可以修改,請求次數+1;
        //如果不一緻,則代表有其他使用者操作該值,自旋等待直到目前值與期望值一緻。
        log.info("=====目前通路次數=====:" + next);
        return next;
    }

    /**
     * 實際調用伺服器位置下标=第幾次請求數%叢集總數量
     *
     * @param serviceInstances
     * @return
     */
    @Override
    public ServiceInstance instances(List<ServiceInstance> serviceInstances) {
        int index = getCurrentTimesAndIncrement() % serviceInstances.size();
        return serviceInstances.get(index);
    }
}

           
  1. 在controller中引入并調用
@RestController
@Slf4j
public class ConsumerOrderController {

    @Resource
    private RestTemplate restTemplate;

    /**
     * 引入自實作的輪詢接口
     */
    @Resource
    private LoadBalancer loadBalancer;

    /**
     * 用于擷取服務資訊
     */
    @Resource
    private DiscoveryClient discoveryClient;

    private static final String PAYMENT_URL = "http://CLOUD-PAYMENT";

    @GetMapping("/consumer/payment/get/{id}")
    public CommonResult<Payment> getPaymentById(@PathVariable("id") Long id) {
        return restTemplate.getForObject(PAYMENT_URL + "/payment/get/" + id, CommonResult.class);
    }


    @GetMapping("/consumer/payment/lb")
    public String getPaymentLB() {
        //傳回伺服器清單
        List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT");
        log.info(instances.toString());
        if (instances == null || instances.size() <= 0) {
            log.info("無服務執行個體");
            return null;
        }
        ServiceInstance serviceInstance = loadBalancer.instances(instances);
        URI uri = serviceInstance.getUri();
        log.info("==========URI :" + uri);
        return restTemplate.getForObject(uri + "/payment/lb", String.class);
    }
}
           
  1. Resttemplate取消@LoadBalancer注解
@SpringBootApplication
@EnableEurekaClient
//@RibbonClient(name = "CLOUD-PAYMENT",configuration = MySelfRule.class) 測試自實作輪詢接口,需登出
public class ConsumerOrderRibbonApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerOrderRibbonApplication.class, args);
    }

    @Bean
        //@LoadBalanced 測試自實作輪詢接口,需登出
    RestTemplate restTemplate() {
        return new RestTemplate();
    }
}
           
  1. PaymentController中的方法
@GetMapping("/payment/lb")
    public String getLb() {
        return "目前服務端口:" + serverPort;
    }
           

測試:8001和8002交替提供服務。

完整源代碼:github