天天看点

高并发 服务雪崩、Hystrix熔断、Hystrix降级前言1 熔断、降级原理2 java 代码实现2.2 Hystrix 信号量方式

前言

听起来这几个词挺高大上的,以前都是盲区,现在我们好好梳理下这几个概念,熔断和降级是在一起使用的。先看个图

高并发 服务雪崩、Hystrix熔断、Hystrix降级前言1 熔断、降级原理2 java 代码实现2.2 Hystrix 信号量方式

订单服务和会员服务不在一个服务器,订单服务会调用会员服务,当会员服务返回阻滞时,订单服务的线程会阻滞,如果请求过多,服务器的线程都被占用了,这是我们调用 查询订单服务就会没有线程用来处理,该服务器所有的服务都将不可用,这就是服务雪崩。

  • 服务熔断

    为了解决上述问题,服务熔断的思想被提出来。类似现实世界中的“保险丝“,当某个异常条件被触发,直接熔断整个服务,而不是一直等到此服务超时。

    熔断的触发条件可以依据不同的场景有所不同,比如统计一个时间窗口内失败的调用次数。

  • 服务降级

    有了熔断,就得有降级。所谓降级,就是当某个服务熔断之后,服务器将不再被调用,此时客户端可以自己准备一个本地的fallback回调,返回一个缺省值。

1 熔断、降级原理

为了解决 上述问题,我们可以把 单个服务进行隔离,使其不能影响整个服务器的使用,常见的就是 线程隔离和信号量隔离。

1. 1 Hystrix实现服务隔离两种方案

Hystrix的资源隔离策略有两种,分别为:线程池和信号量

1.1.1 线程池方式

1、 使用线程池隔离可以完全隔离第三方应用,请求线程可以快速放回。

2、 请求线程可以继续接受新的请求,如果出现问题线程池隔离是独立的不会影响其他应用。

3、 当失败的应用再次变得可用时,线程池将清理并可立即恢复,而不需要一个长时间的恢复。

4、 独立的线程池提高了并发性

  • 缺点:

    线程池隔离的主要缺点是它们增加计算开销(CPU)。每个命令的执行涉及到排队、调度和上 下文切换都是在一个单独的线程上运行的。

1.1.2 信号量

使用一个原子计数器(或信号量)来记录当前有多少个线程在运行,当请求进来时先判断计数 器的数值,若超过设置的最大线程个数则拒绝该请求,若不超过则通行,这时候计数器+1,请求返 回成功后计数器-1。

与线程池隔离最大不同在于执行依赖代码的线程依然是请求线程

tips:信号量的大小可以动态调整, 线程池大小不可以

1.2 比较

线程池隔离:

1、 第三方应用或者接口

2、 并发量大

信号量隔离:

1、 内部应用或者中间件(redis)

2、 并发需求不大

2 java 代码实现

先增加maven 依赖

<!-- hystrix 服务熔断,降级用-->
		<dependency>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
		</dependency>
		<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.47</version>
		</dependency>
		<dependency>
			<groupId>com.netflix.hystrix</groupId>
			<artifactId>hystrix-metrics-event-stream</artifactId>
			<version>1.5.12</version>
		</dependency>
		<dependency>
			<groupId>com.netflix.hystrix</groupId>
			<artifactId>hystrix-javanica</artifactId>
			<version>1.5.12</version>
		</dependency>
           

会员服务代码

@RestController
@RequestMapping("/member")
public class MemberController {
    @RequestMapping("/memberIndex")
    public Object memberIndex() throws InterruptedException {
        Map<String, Object> hashMap = new HashMap<String, Object>();
        hashMap.put("code", 200);
        hashMap.put("msg", "memberIndex");
        Thread.sleep(1500);
        System.out.println("会员服务运行。。。。。");
        return hashMap;
    }

}
           

订单服务

@Autowired
    private MemberService memberService;

    @RequestMapping("/orderIndex")
    public Object orderIndex() throws InterruptedException {
        JSONObject member = memberService.getMember(); // 调用会员服务
        System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",订单服务调用会员服务:member:" + member);
        return member;
    }

           

server服务类

@Service
public class MemberService {
    public JSONObject getMember() {
        JSONObject result = HttpClientUtils.httpGet("http://127.0.0.1:8081/basic/member/memberIndex");
        return result;
    }

}
           

查询订单服务

@RequestMapping("/findOrderIndex")
    public Object findIndex() {
        System.out.println("当前线程:" + Thread.currentThread().getName() + ",findOrderIndex");
        return "findOrderIndex";
    }
           

这是简单搭建的,很容易实现调用,但是会出现雪崩。

2.1 Hystrix 线程池方式

我们改造 订单服务 接口为

// 线程池隔离
    @RequestMapping("/orderIndexHystrix")
    public Object orderIndexHystrix() throws InterruptedException {
        return new OrderHystrixCommand(memberService).execute();
    }
           

增加服务类

package com.xd.service.impl;

import com.alibaba.fastjson.JSONObject;
import com.netflix.hystrix.*;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * Created by Administrator on 2019/10/24.
 * 线程池隔离
 */

public class OrderHystrixCommand extends HystrixCommand<JSONObject> {
    @Autowired
    private MemberService memberService;

    /**
     * @param
     */
    public OrderHystrixCommand(MemberService memberService) {
        super(setter());
        this.memberService = memberService;
    }

    protected JSONObject run() throws Exception {
        JSONObject member = memberService.getMember();
        System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",订单服务调用会员服务:member:" + member);
        return member;
    }

    private static Setter setter() {

        // 服务分组
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("members");
        // 服务标识
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("member");
        // 线程池名称
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("member-pool");
        // #####################################################
        // 线程池配置 线程池大小为10,线程存活时间15秒 队列等待的阈值为100,超过100执行拒绝策略
        HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(10)
                .withKeepAliveTimeMinutes(15).withQueueSizeRejectionThreshold(100);
        // ########################################################
        // 命令属性配置Hystrix 开启超时
        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
                // 采用线程池方式实现服务隔离
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                // 禁止
                .withExecutionTimeoutEnabled(false);
        return HystrixCommand.Setter.withGroupKey(groupKey).andCommandKey(commandKey).andThreadPoolKey(threadPoolKey)
                .andThreadPoolPropertiesDefaults(threadPoolProperties).andCommandPropertiesDefaults(commandProperties);

    }

    @Override
    protected JSONObject getFallback() {
        // 如果Hystrix发生熔断,当前服务不可用,直接执行Fallback方法,进行降级处理
        System.out.println("系统错误!");
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", 500);
        jsonObject.put("msg", "系统错误!");
        return jsonObject;
    }


}

           

我设置了线程池大小为10,那么处理当前服务的线程已经10个,再有请求就要熔断降级了,用 jemeter 开启 15 个线程测试,我们看到有 5个已经熔断降级处理了。’

  • 结果

系统错误!

系统错误!

系统错误!

系统错误!

系统错误!

当前线程名称:hystrix-member-pool-1,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:hystrix-member-pool-2,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:hystrix-member-pool-3,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:hystrix-member-pool-4,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:hystrix-member-pool-5,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:hystrix-member-pool-6,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:hystrix-member-pool-7,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:hystrix-member-pool-8,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:hystrix-member-pool-9,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:hystrix-member-pool-10,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

2.2 Hystrix 信号量方式

和 上面的步骤一样,改造订单服务接口

// 信号量隔离
    @RequestMapping("/orderIndexHystrix2")
    public Object orderIndexHystrix2() throws InterruptedException {
        return new OrderHystrixCommand2(memberService).execute();
    }
           

增加服务类

package com.xd.service.impl;

import com.alibaba.fastjson.JSONObject;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * Created by Administrator on 2019/10/24.
 */
public class OrderHystrixCommand2 extends HystrixCommand<JSONObject> {
    @Autowired
    private MemberService memberService;

    /**
     */
    public OrderHystrixCommand2(MemberService memberService) {
        super(setter());
        this.memberService = memberService;
    }

    protected JSONObject run() throws Exception {

        // Thread.sleep(500);
        // System.out.println("orderIndex线程名称" +
        // Thread.currentThread().getName());
        // System.out.println("success");
        JSONObject member = memberService.getMember();
        System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",订单服务调用会员服务:member:" + member);
        return member;
    }

    private static Setter setter() {
        // 服务分组
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("members");
        // 命令属性配置 采用信号量模式
        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                // 使用一个原子计数器(或信号量)来记录当前有多少个线程在运行,当请求进来时先判断计数
                // 器的数值,若超过设置的最大线程个数则拒绝该请求,若不超过则通行,这时候计数器+1,请求返 回成功后计数器-1。
                .withExecutionIsolationSemaphoreMaxConcurrentRequests(10)
                .withExecutionTimeoutInMilliseconds(10000);// 设置超时熔断时间为 10 秒;
        return HystrixCommand.Setter.withGroupKey(groupKey).andCommandPropertiesDefaults(commandProperties);
    }

    @Override
    protected JSONObject getFallback() {
        // 如果Hystrix发生熔断,当前服务不可用,直接执行Fallback方法
        System.out.println("系统错误!");
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", 500);
        jsonObject.put("msg", "系统错误!");
        return jsonObject;
    }

}

           

设置了最大信号量是10,我们会员服务的等待时间是 1.5秒,同时发起 15 个线程,有10个是阻滞的,剩下的5个预计是熔断降级处理的,用jemeter 开启15 个线程验证

结果

系统错误!

系统错误!

系统错误!

系统错误!

系统错误!

当前线程名称:http-nio-8082-exec-2,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:http-nio-8082-exec-6,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:http-nio-8082-exec-4,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:http-nio-8082-exec-3,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:http-nio-8082-exec-7,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:http-nio-8082-exec-10,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:http-nio-8082-exec-11,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:http-nio-8082-exec-8,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:http-nio-8082-exec-1,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

当前线程名称:http-nio-8082-exec-9,订单服务调用会员服务:member:{“msg”:“memberIndex”,“code”:200}

跟我们的预期是一样的。

继续阅读