天天看点

Hystrix入门指南

Introduction

1、Where does the name come from?

hystrix对应的中文名字是“豪猪”,豪猪周身长满了刺,能保护自己不受天敌的伤害,代表了一种防御机制,这与hystrix本身的功能不谋而合,因此Netflix团队将该框架命名为Hystrix,并使用了对应的卡通形象做作为logo。

2、What Is Hystrix?

在一个分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等,如何能够保证在一个依赖出问题的情况下,不会导致整体服务失败,这个就是Hystrix需要做的事情。Hystrix提供了熔断、隔离、Fallback、cache、监控等功能,能够在一个、或多个依赖同时出现问题时保证系统依然可用。

3、Hello Hystrix

1public class CommandHelloWorld extends HystrixCommand<String> {
 2
 3    private final String name;
 4
 5    public CommandHelloWorld(String name) {
 6        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); //必须
 7        this.name = name;
 8    }
 9
10    @Override
11    protected String run() {
12        /*
13         网络调用 或者其他一些业务逻辑,可能会超时或者抛异常
14        */
15        return "Hello " + name + "!";
16    }
17}
18
19String s = new CommandHelloWorld("Bob").execute(); //
20Future<String> s = new CommandHelloWorld("Bob").queue();
21Observable<String> s = new CommandHelloWorld("Bob").observe();
22Observable<String> s = new CommandHelloWorld("Bob").toObservable()      

4、Flow Chart

Hystrix入门指南

 说明:

  1. Construct a HystrixCommand or HystrixObservableCommand Object
  2. Execute the Command
  3. Is the Response Cached?
  4. Is the Circuit Open?
  5. Is the Thread Pool/Queue/Semaphore Full?
  6. HystrixObservableCommand.construct() or HystrixCommand.run()
  7. Calculate Circuit Health
  8. Get the Fallback
  9. Return the Successful Response
Hystrix入门指南

常用功能介绍

依赖隔离

一个用户请求的成功执行,肯能依赖数十上百个外部服务,如果没有隔离,单个依赖的失败,可能会印象其他依赖的正常执行。如下图所示,为每个依赖配置了单独线程池

在下图中,当Dep I 出现问题时,DepA 和Dep M大以来可以正常执行

线程池隔离的使用例子

1public class CommandHelloWorld extends HystrixCommand<String> {
 2
 3    private final String name;
 4
 5    public CommandHelloWorld(String name) {
 6        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
 7                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可选,默认 使用 this.getClass().getSimpleName();
 8                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)));
 9
10        this.name = name;
11    }
12
13    @Override
14    protected String run() throws InterruptedException {
15        System.out.println("running");
16        TimeUnit.MILLISECONDS.sleep(1000);
17        return "Hello " + name + "!";
18    }
19
20}      

线程池常用参数设置:

实现类:HystrixThreadPoolProperties

Hystrix入门指南
1Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
2 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
3                .withExecutionTimeoutInMilliseconds(500))  //超时时间
4 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可选,默认 使用 this.getClass().getSimpleName();
5 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)
6        .withMaxQueueSize(10).withQueueSizeRejectionThreshold(7))      

Q: 怎么设置线程池大小?

A:Qps* Tp99 +冗余线程

信号量隔离

线程池隔离中,发起请求的线程和真实执行的线程不是同一个线程,使用信号量隔离时,它们是同一个线程, 两种隔离的区别如下图:

1public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> {
 2
 3    private final int id;
 4    private long start,end ;
 5
 6    public CommandUsingSemaphoreIsolation(int id) {
 7        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
 8                // since we're doing an in-memory cache lookup we choose SEMAPHORE isolation
 9                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
10                        .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) //设置使用信号量隔离策略
11                                .withExecutionIsolationSemaphoreMaxConcurrentRequests(3)  //设置信号量隔离时的最大并发请求数
12                                .withFallbackIsolationSemaphoreMaxConcurrentRequests(5)     //设置fallback的最大并发数
13                        .withExecutionTimeoutInMilliseconds(300)));   //设置超时时间
14        this.id = id;
15        this.start = System.currentTimeMillis();
16    }
17
18    @Override
19    protected String run() throws InterruptedException {
20        // a real implementation would retrieve data from in memory data structure
21        TimeUnit.MILLISECONDS.sleep(id*30);
22        System.out.println("running normal, id="+id);
23        return "ValueFromHashMap_" + id;
24    }
25
26    @Override
27    protected String getFallback(){
28        System.out.println(" fallback, id="+id);
29        return "fallback:"+id;
30    }
31
32}
33
34@Test
35public void maxCurrentRequst() throws InterruptedException {
36    int count =10;
37    while (count >0){
38        int id = count--;
39        new Thread(() -> {
40            try {
41                new CommandUsingSemaphoreIsolation(id).execute();
42            }catch (Exception ex){
43                System.out.println("Exception:"+ex.getMessage()+" id="+id);
44            }
45        }).start();
46    }
47
48    TimeUnit.SECONDS.sleep(100);
49}
50 //注:使用信号量隔离,在同一个线程中即使循环调用new CommandUsingSemaphoreIsolation(id).queue(),run方法也是顺序执行;      
Hystrix入门指南

//控制台输出

fallback, id=10

fallback, id=9

fallback, id=5

fallback, id=8

fallback, id=1

Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=4

Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=7

running normal, id=2

running normal, id=3

running normal, id=6

Hystrix入门指南

Q: 什么时候使用线程池隔离,什么使用信号量隔离?

A:  线程池隔离缺点是带来一定的开销,但不会阻塞请求线程,适合于于IO密集型的任务

信号量隔离使用用户请求线程,没有格外线程切换开销,使用与执行时间和执行逻辑都比较短的本地计算。比如CPU密集型的任务

Fallback

Q1: 为什么需要fallback?

简单来说,在依赖调用失败时,我们一般会需要提供降级方案,Hystrix对此提供了支持

降级 

1public class CommandHelloWorld extends HystrixCommand<String> {
 2
 3    private final String name;
 4
 5    public CommandHelloWorld(String name) {
 6        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
 7                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
 8                        .withExecutionTimeoutInMilliseconds(500))  //超时时间
 9                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可选,默认 使用 this.getClass().getSimpleName();
10                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)));
11
12        this.name = name;
13    }
14
15    @Override
16    protected String run() throws InterruptedException {
17        System.out.println("running");
18        TimeUnit.MILLISECONDS.sleep(1000);
19        return "Hello " + name + "!";
20    }
21
22    @Override
23    protected String getFallback() {
24        return "Hello "+"Fallback";
25    }
26}
27
28@Test
29public void fallbackTest(){
30    assertEquals("Hello Fallback",new CommandHelloWorld("World").execute());
31}      

 Q2:什么情况下会触发fallback?

简单来说,就是run方法抛异常,超时,线程/信号量reject、短路

Failure Type Exception class Exception.cause subject to fallback
FAILURE

​HystrixRuntimeException​

underlying exception (user-controlled) YES
TIMEOUT

​HystrixRuntimeException​

​j.u.c.TimeoutException​

YES
SHORT_CIRCUITED

​HystrixRuntimeException​

​j.l.RuntimeException​

YES
THREAD_POOL_REJECTED

​HystrixRuntimeException​

​j.u.c.RejectedExecutionException​

YES
SEMAPHORE_REJECTED

​HystrixRuntimeException​

​j.l.RuntimeException​

YES
BAD_REQUEST

​HystrixBadRequestException​

underlying exception (user-controlled) NO

以下为测试的主程序:

1public class CommandHelloFailure extends HystrixCommand<String> {
 2
 3    private final String name;
 4
 5    public CommandHelloFailure(String name) {
 6        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
 7                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
 8                        .withExecutionTimeoutInMilliseconds(1000))  //超时时间
 9                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))
10                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(3)));
11
12        this.name = name;
13    }
14
15    @Override
16    protected String run() throws InterruptedException {
17        String theadName = this.getThreadPoolKey().name();
18        String cmdKey=this.getThreadPoolKey().name();
19        System.out.println("running begin , threadPool="+theadName+" cmdKey="+cmdKey+" name="+name);
20
21        if("Exception".equals(name)) {
22            throw new RuntimeException("this command always fails");
23        }else if("Timeout".equals(name)){
24            TimeUnit.SECONDS.sleep(2);
25        }else if("Reject".equals(name)){
26            TimeUnit.MILLISECONDS.sleep(800);
27        }
28        System.out.println(" run end");
29
30        return "Hello " + name + "!";
31    }
32
33    @Override
34    protected String getFallback() {
35        StringBuilder sb = new StringBuilder("running fallback");
36        boolean isRejected = isResponseRejected();
37        boolean isException = isFailedExecution();
38        boolean isTimeout= isResponseTimedOut();
39        boolean isCircut = isCircuitBreakerOpen();
40
41        sb.append(", isRejected:").append(isRejected);
42        sb.append(", isException:"+isException);
43        if(isException){
44            sb.append(" msg=").append(getExecutionException().getMessage());
45        }
46        sb.append(",  isTimeout: "+isTimeout);
47        sb.append(",  isCircut:"+isCircut);
48
49        sb.append(", group:").append(this.getCommandGroup().name());
50        sb.append(", threadpool:").append(getThreadPoolKey().name());
51        System.out.println(sb.toString());
52
53        String msg="Hello Failure " + name + "!";
54        return msg;
55    }
56}      

FAILURE 

测试由异常导致的fallback

1 @Test
2 public void expTest() {
3     assertEquals("Hello Failure Exception!", new CommandHelloFailure("Exception").execute());
4 }
5      

//控制台输出

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Exception

running fallback, isRejected:false, isException:true msg=this command always fails, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

TIMEOUT

测试有超时导致的fallback

@Test
public void timeOutTest() {
    assertEquals("Hello Failure Timeout!", new CommandHelloFailure("Timeout").execute());
}      

//控制台输出

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Timeout

running fallback, isRejected:false, isException:false, isTimeout: true, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

THREAD_POOL_REJECTED

并发执行的任务数超过线程池和队列之和会被reject,导致fallback

1@Test
2public void rejectTest() throws InterruptedException {
3    int count = 5;
4    while (count-- > 0){
5        new CommandHelloFailure("Reject").queue();
6        TimeUnit.MILLISECONDS.sleep(100);
7    }
8}      

//控制台输出

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject

running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

SEMAPHORE_REJECTED  与 THREAD_POOL_REJECTED 类似,不再演示

SHORT_CIRCUITED

在一定时间内,用户请求超过一定的比例失败时(timeout, failure, reject),断路器就会打开;短路器打开后所有请求直接走fallback

参数设置

名称 类型 含义 默认值
circuitBreakerEnabled Boolean 是否启用断路器 true
circuitBreakerErrorThresholdPercentage Integer 错误百分比,超过该值打开断路器 50
circuitBreakerForceClosed Boolean 强制断路器打开 false
circuitBreakerForceOpen Boolean 强制短路器关闭 false
circuitBreakerRequestVolumeThreshold Integer 10s中内最少的请求量,大于该值,断路器配置才会生效 20
circuitBreakerSleepWindowInMilliseconds Integer 短路器打开后多长时间尝试关闭(Half open) 5s

一般配置如下:

1Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
2        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
3                .withExecutionTimeoutInMilliseconds(50)//超时时间
4                .withCircuitBreakerRequestVolumeThreshold(5)
5                .withCircuitBreakerSleepWindowInMilliseconds(1000)
6                .withCircuitBreakerErrorThresholdPercentage(50))
7        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可选,默认 使用 this.getClass().getSimpleName();
8        .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4));      

以上配置的含义是: 在10s内,如果请求在5个及以上,且有50%失败的情况下,开启断路器;断路器开启1000ms后尝试关闭

短路器的工作机制,引用自官方文档:

The precise way that the circuit opening and closing occurs is as follows:

Assuming the volume across a circuit meets a certain threshold (HystrixCommandProperties.circuitBreakerRequestVolumeThreshold())...

And assuming that the error percentage exceeds the threshold error percentage (HystrixCommandProperties.circuitBreakerErrorThresholdPercentage())...

Then the circuit-breaker transitions from CLOSED to OPEN.

While it is open, it short-circuits all requests made against that circuit-breaker.

After some amount of time (HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()), the next single request is let through (this is the HALF-OPEN state). If the request fails, the circuit-breaker returns to the OPEN state for the duration of the sleep window. If the request succeeds, the circuit-breaker transitions to CLOSED and the logic in 1. takes over again.

Q3:fallback时我们应该怎么办?

一般有以下几种策略:

1、不实现getFallback方法:依赖调用失败时直接抛出异常

2、实现getFallback方法,返回默认值:这是一种常见的策略

3、实现getFallback方法,走降级方案

此外,生产环境中,fallback时,一般需要打点记录

请求合并

简单来说,就是将一段时间内的多次请求合并为一次请求,常用于网络IO中,能减少IO次数,缺点是增加平均延迟

以下是测试代码主程序:

1public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {
 2
 3    private final Integer key;
 4
 5    public CommandCollapserGetValueForKey(Integer key) {
 6        super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser"))
 7                .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
 8                        .withMaxRequestsInBatch(3)
 9                .withTimerDelayInMilliseconds(10)));
10        this.key = key;
11    }
12
13    @Override
14    public Integer getRequestArgument() {
15        return key;
16    }
17
18    @Override
19    protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
20        return new BatchCommand(requests);
21    }
22
23    @Override
24    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
25        int count = 0;
26        for (CollapsedRequest<String, Integer> request : requests) {
27            request.setResponse(batchResponse.get(count++));
28        }
29    }
30
31    private static final class BatchCommand extends HystrixCommand<List<String>> {
32        private final Collection<CollapsedRequest<String, Integer>> requests;
33
34        private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
35            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
36                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
37            this.requests = requests;
38        }
39
40        @Override
41        protected List<String> run() {
42            System.out.println("BatchCommand run  "+requests.size());
43            ArrayList<String> response = new ArrayList<String>();
44            for (CollapsedRequest<String, Integer> request : requests) {
45                // artificial response for each argument received in the batch
46                response.add("ValueForKey: " + request.getArgument());
47            }
48            return response;
49        }
50    }
51}
52
53
54@Test
55public void testCollapser() throws Exception {
56    HystrixRequestContext context = HystrixRequestContext.initializeContext();
57    try {
58        Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
59        Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
60        Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
61        Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();
62
63
64        assertEquals("ValueForKey: 1", f1.get());
65        assertEquals("ValueForKey: 2", f2.get());
66        assertEquals("ValueForKey: 3", f3.get());
67        assertEquals("ValueForKey: 4", f4.get());
68
69        // assert that the batch command 'GetValueForKey' was in fact
70        // executed and that it executed only once
71        assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
72        HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
73        // assert the command is the one we're expecting
74        assertEquals("GetValueForKey", command.getCommandKey().name());
75        // confirm that it was a COLLAPSED command execution
76        assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
77        // and that it was successful
78        assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
79    } finally {
80        context.shutdown();
81    }
82}
83
84//控制输出
85BatchCommand run  3
86BatchCommand run  1      

执行流程:

使用该特性

1、必须继承HystrixCollapser类,

2、实现以下方法:

getRequestArgument: 返回请求参数对象

createCommand : 返回BatchCommand

mapResponseToRequests:实现Response和Request的映射

3、创建对应的BatchCommand类:批量请求的具体实现

参数配置:

名称 类型 含义 默认值
maxRequestsInBatch      
Integer      
每个批次最大的请求数,超过该值,创建新的batch请求
Integer.MAX_VALUE      
timerDelayInMilliseconds      
Integer      
等待时间窗口,超过该值,创建新的batch请求 10ms
requestCacheEnabled      
Boolean      
是否启用cache true

一般配置如下

Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser"))
       .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
               .withMaxRequestsInBatch(3)
       .withTimerDelayInMilliseconds(5));      

请求cache

1public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
 2    private final int value;
 3
 4    public CommandUsingRequestCache(int value) {
 5        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
 6        this.value = value;
 7    }
 8
 9    @Override
10    public Boolean run() {
11        return value == 0 || value % 2 == 0;
12    }
13
14   //使用cache功能,必须实现该方法
15    @Override
16    public String getCacheKey() {
17        return String.valueOf(value);
18    }
19}
20
21@Test
22public void testWithCacheHits() {
23    HystrixRequestContext context = HystrixRequestContext.initializeContext();
24    try {
25        CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
26        CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);
27
28        assertTrue(command2a.execute());
29        //第一次请求,没有cache
30        assertFalse(command2a.isResponseFromCache());
31
32        assertTrue(command2b.execute());
33        // 第二次请求,从cache中拿的结果
34        assertTrue(command2b.isResponseFromCache());
35    } finally {
36        context.shutdown();
37    }
38
39    context = HystrixRequestContext.initializeContext();
40    try {
41        CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);
42        assertTrue(command3b.execute());
43        // this is a new request context so this
44        //new了新的 request context后,之前的cache失效
45        assertFalse(command3b.isResponseFromCache());
46    } finally {
47        context.shutdown();
48    }
49}      

Hystrix Context

Global Context

UserRequest Context

使用与监控

1、工程中使用

使用Hystrix很简单,只需要添加相应依赖即可,以Maven为例:

1<!-- hystrix 依赖 -->
 2<dependency>
 3    <groupId>com.netflix.hystrix</groupId>
 4    <artifactId>hystrix-core</artifactId>
 5    <version>1.5.9</version>
 6</dependency>
 7<dependency>
 8    <groupId>com.netflix.hystrix</groupId>
 9    <artifactId>hystrix-metrics-event-stream</artifactId>
10    <version>1.5.9</version>
11</dependency>      

2、DashBoard使用 

web.xml中配置相应的Servlet

1<servlet>
2          <display-name>HystrixMetricsStreamServlet</display-name>
3          <servlet-name>HystrixMetricsStreamServlet</servlet-name>
4          <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
5</servlet>
6<servlet-mapping>
7          <servlet-name>HystrixMetricsStreamServlet</servlet-name>
8          <url-pattern>/hystrix.stream</url-pattern>
9</servlet-mapping>      

下载附件中的war文件和jar文件到任意目录,执行

java -jar jetty-runner-9.2.10.v20150310.jar --port 8410 hystrix-dashboard-1.5.1.war      

然后在浏览器中打开:http://localhost:8410/  ,在输入框中填写 http://hostname:port/application/hystrix.stream, 点击 Add Stream ,然后在点击Monitor Stream

一般来说: Thread-pool Rejections  和Failuress/Exception应该是0,Thread timeouts是个很小的值。

来源:博客园--李平

若有技术难题

可联系小编进技术交流群

若有生活难题

也可联系小编

我们可以从诗词歌赋聊到人生哲学

Hystrix入门指南

说了这么多我只要你关注我