天天看點

Hystrix入門指南

Introduction

1、Where does the name come from?

hystrix對應的中文名字是“豪豬”,豪豬周身長滿了刺,能保護自己不受天敵的傷害,代表了一種防禦機制,這與hystrix本身的功能不謀而合,是以Netflix團隊将該架構命名為Hystrix,并使用了對應的卡通形象做作為logo。

2、What Is Hystrix?

在一個分布式系統裡,許多依賴不可避免的會調用失敗,比如逾時、異常等,如何能夠保證在一個依賴出問題的情況下,不會導緻整體服務失敗,這個就是Hystrix需要做的事情。Hystrix提供了熔斷、隔離、Fallback、cache、監控等功能,能夠在一個、或多個依賴同時出現問題時保證系統依然可用。

3、Hello Hystrix

1public class CommandHelloWorld extends HystrixCommand<String> {
 2
 3    private final String name;
 4
 5    public CommandHelloWorld(String name) {
 6        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); //必須
 7        this.name = name;
 8    }
 9
10    @Override
11    protected String run() {
12        /*
13         網絡調用 或者其他一些業務邏輯,可能會逾時或者抛異常
14        */
15        return "Hello " + name + "!";
16    }
17}
18
19String s = new CommandHelloWorld("Bob").execute(); //
20Future<String> s = new CommandHelloWorld("Bob").queue();
21Observable<String> s = new CommandHelloWorld("Bob").observe();
22Observable<String> s = new CommandHelloWorld("Bob").toObservable()      

4、Flow Chart

Hystrix入門指南

 說明:

  1. Construct a HystrixCommand or HystrixObservableCommand Object
  2. Execute the Command
  3. Is the Response Cached?
  4. Is the Circuit Open?
  5. Is the Thread Pool/Queue/Semaphore Full?
  6. HystrixObservableCommand.construct() or HystrixCommand.run()
  7. Calculate Circuit Health
  8. Get the Fallback
  9. Return the Successful Response
Hystrix入門指南

常用功能介紹

依賴隔離

一個使用者請求的成功執行,肯能依賴數十上百個外部服務,如果沒有隔離,單個依賴的失敗,可能會印象其他依賴的正常執行。如下圖所示,為每個依賴配置了單獨線程池

在下圖中,當Dep I 出現問題時,DepA 和Dep M大以來可以正常執行

線程池隔離的使用例子

1public class CommandHelloWorld extends HystrixCommand<String> {
 2
 3    private final String name;
 4
 5    public CommandHelloWorld(String name) {
 6        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必須
 7                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可選,預設 使用 this.getClass().getSimpleName();
 8                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)));
 9
10        this.name = name;
11    }
12
13    @Override
14    protected String run() throws InterruptedException {
15        System.out.println("running");
16        TimeUnit.MILLISECONDS.sleep(1000);
17        return "Hello " + name + "!";
18    }
19
20}      

線程池常用參數設定:

實作類:HystrixThreadPoolProperties

Hystrix入門指南
1Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必須
2 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
3                .withExecutionTimeoutInMilliseconds(500))  //逾時時間
4 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可選,預設 使用 this.getClass().getSimpleName();
5 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)
6        .withMaxQueueSize(10).withQueueSizeRejectionThreshold(7))      

Q: 怎麼設定線程池大小?

A:Qps* Tp99 +備援線程

信号量隔離

線程池隔離中,發起請求的線程和真實執行的線程不是同一個線程,使用信号量隔離時,它們是同一個線程, 兩種隔離的差別如下圖:

1public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> {
 2
 3    private final int id;
 4    private long start,end ;
 5
 6    public CommandUsingSemaphoreIsolation(int id) {
 7        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
 8                // since we're doing an in-memory cache lookup we choose SEMAPHORE isolation
 9                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
10                        .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) //設定使用信号量隔離政策
11                                .withExecutionIsolationSemaphoreMaxConcurrentRequests(3)  //設定信号量隔離時的最大并發請求數
12                                .withFallbackIsolationSemaphoreMaxConcurrentRequests(5)     //設定fallback的最大并發數
13                        .withExecutionTimeoutInMilliseconds(300)));   //設定逾時時間
14        this.id = id;
15        this.start = System.currentTimeMillis();
16    }
17
18    @Override
19    protected String run() throws InterruptedException {
20        // a real implementation would retrieve data from in memory data structure
21        TimeUnit.MILLISECONDS.sleep(id*30);
22        System.out.println("running normal, id="+id);
23        return "ValueFromHashMap_" + id;
24    }
25
26    @Override
27    protected String getFallback(){
28        System.out.println(" fallback, id="+id);
29        return "fallback:"+id;
30    }
31
32}
33
34@Test
35public void maxCurrentRequst() throws InterruptedException {
36    int count =10;
37    while (count >0){
38        int id = count--;
39        new Thread(() -> {
40            try {
41                new CommandUsingSemaphoreIsolation(id).execute();
42            }catch (Exception ex){
43                System.out.println("Exception:"+ex.getMessage()+" id="+id);
44            }
45        }).start();
46    }
47
48    TimeUnit.SECONDS.sleep(100);
49}
50 //注:使用信号量隔離,在同一個線程中即使循環調用new CommandUsingSemaphoreIsolation(id).queue(),run方法也是順序執行;      
Hystrix入門指南

//控制台輸出

fallback, id=10

fallback, id=9

fallback, id=5

fallback, id=8

fallback, id=1

Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=4

Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=7

running normal, id=2

running normal, id=3

running normal, id=6

Hystrix入門指南

Q: 什麼時候使用線程池隔離,什麼使用信号量隔離?

A:  線程池隔離缺點是帶來一定的開銷,但不會阻塞請求線程,适合于于IO密集型的任務

信号量隔離使用使用者請求線程,沒有格外線程切換開銷,使用與執行時間和執行邏輯都比較短的本地計算。比如CPU密集型的任務

Fallback

Q1: 為什麼需要fallback?

簡單來說,在依賴調用失敗時,我們一般會需要提供降級方案,Hystrix對此提供了支援

降級 

1public class CommandHelloWorld extends HystrixCommand<String> {
 2
 3    private final String name;
 4
 5    public CommandHelloWorld(String name) {
 6        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必須
 7                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
 8                        .withExecutionTimeoutInMilliseconds(500))  //逾時時間
 9                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可選,預設 使用 this.getClass().getSimpleName();
10                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)));
11
12        this.name = name;
13    }
14
15    @Override
16    protected String run() throws InterruptedException {
17        System.out.println("running");
18        TimeUnit.MILLISECONDS.sleep(1000);
19        return "Hello " + name + "!";
20    }
21
22    @Override
23    protected String getFallback() {
24        return "Hello "+"Fallback";
25    }
26}
27
28@Test
29public void fallbackTest(){
30    assertEquals("Hello Fallback",new CommandHelloWorld("World").execute());
31}      

 Q2:什麼情況下會觸發fallback?

簡單來說,就是run方法抛異常,逾時,線程/信号量reject、短路

Failure Type Exception class Exception.cause subject to fallback
FAILURE

​HystrixRuntimeException​

underlying exception (user-controlled) YES
TIMEOUT

​HystrixRuntimeException​

​j.u.c.TimeoutException​

YES
SHORT_CIRCUITED

​HystrixRuntimeException​

​j.l.RuntimeException​

YES
THREAD_POOL_REJECTED

​HystrixRuntimeException​

​j.u.c.RejectedExecutionException​

YES
SEMAPHORE_REJECTED

​HystrixRuntimeException​

​j.l.RuntimeException​

YES
BAD_REQUEST

​HystrixBadRequestException​

underlying exception (user-controlled) NO

以下為測試的主程式:

1public class CommandHelloFailure extends HystrixCommand<String> {
 2
 3    private final String name;
 4
 5    public CommandHelloFailure(String name) {
 6        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必須
 7                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
 8                        .withExecutionTimeoutInMilliseconds(1000))  //逾時時間
 9                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))
10                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(3)));
11
12        this.name = name;
13    }
14
15    @Override
16    protected String run() throws InterruptedException {
17        String theadName = this.getThreadPoolKey().name();
18        String cmdKey=this.getThreadPoolKey().name();
19        System.out.println("running begin , threadPool="+theadName+" cmdKey="+cmdKey+" name="+name);
20
21        if("Exception".equals(name)) {
22            throw new RuntimeException("this command always fails");
23        }else if("Timeout".equals(name)){
24            TimeUnit.SECONDS.sleep(2);
25        }else if("Reject".equals(name)){
26            TimeUnit.MILLISECONDS.sleep(800);
27        }
28        System.out.println(" run end");
29
30        return "Hello " + name + "!";
31    }
32
33    @Override
34    protected String getFallback() {
35        StringBuilder sb = new StringBuilder("running fallback");
36        boolean isRejected = isResponseRejected();
37        boolean isException = isFailedExecution();
38        boolean isTimeout= isResponseTimedOut();
39        boolean isCircut = isCircuitBreakerOpen();
40
41        sb.append(", isRejected:").append(isRejected);
42        sb.append(", isException:"+isException);
43        if(isException){
44            sb.append(" msg=").append(getExecutionException().getMessage());
45        }
46        sb.append(",  isTimeout: "+isTimeout);
47        sb.append(",  isCircut:"+isCircut);
48
49        sb.append(", group:").append(this.getCommandGroup().name());
50        sb.append(", threadpool:").append(getThreadPoolKey().name());
51        System.out.println(sb.toString());
52
53        String msg="Hello Failure " + name + "!";
54        return msg;
55    }
56}      

FAILURE 

測試由異常導緻的fallback

1 @Test
2 public void expTest() {
3     assertEquals("Hello Failure Exception!", new CommandHelloFailure("Exception").execute());
4 }
5      

//控制台輸出

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Exception

running fallback, isRejected:false, isException:true msg=this command always fails, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

TIMEOUT

測試有逾時導緻的fallback

@Test
public void timeOutTest() {
    assertEquals("Hello Failure Timeout!", new CommandHelloFailure("Timeout").execute());
}      

//控制台輸出

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Timeout

running fallback, isRejected:false, isException:false, isTimeout: true, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

THREAD_POOL_REJECTED

并發執行的任務數超過線程池和隊列之和會被reject,導緻fallback

1@Test
2public void rejectTest() throws InterruptedException {
3    int count = 5;
4    while (count-- > 0){
5        new CommandHelloFailure("Reject").queue();
6        TimeUnit.MILLISECONDS.sleep(100);
7    }
8}      

//控制台輸出

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject

running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

SEMAPHORE_REJECTED  與 THREAD_POOL_REJECTED 類似,不再示範

SHORT_CIRCUITED

在一定時間内,使用者請求超過一定的比例失敗時(timeout, failure, reject),斷路器就會打開;短路器打開後所有請求直接走fallback

參數設定

名稱 類型 含義 預設值
circuitBreakerEnabled Boolean 是否啟用斷路器 true
circuitBreakerErrorThresholdPercentage Integer 錯誤百分比,超過該值打開斷路器 50
circuitBreakerForceClosed Boolean 強制斷路器打開 false
circuitBreakerForceOpen Boolean 強制短路器關閉 false
circuitBreakerRequestVolumeThreshold Integer 10s中内最少的請求量,大于該值,斷路器配置才會生效 20
circuitBreakerSleepWindowInMilliseconds Integer 短路器打開後多長時間嘗試關閉(Half open) 5s

一般配置如下:

1Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必須
2        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
3                .withExecutionTimeoutInMilliseconds(50)//逾時時間
4                .withCircuitBreakerRequestVolumeThreshold(5)
5                .withCircuitBreakerSleepWindowInMilliseconds(1000)
6                .withCircuitBreakerErrorThresholdPercentage(50))
7        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可選,預設 使用 this.getClass().getSimpleName();
8        .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4));      

以上配置的含義是: 在10s内,如果請求在5個及以上,且有50%失敗的情況下,開啟斷路器;斷路器開啟1000ms後嘗試關閉

短路器的工作機制,引用自官方文檔:

The precise way that the circuit opening and closing occurs is as follows:

Assuming the volume across a circuit meets a certain threshold (HystrixCommandProperties.circuitBreakerRequestVolumeThreshold())...

And assuming that the error percentage exceeds the threshold error percentage (HystrixCommandProperties.circuitBreakerErrorThresholdPercentage())...

Then the circuit-breaker transitions from CLOSED to OPEN.

While it is open, it short-circuits all requests made against that circuit-breaker.

After some amount of time (HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()), the next single request is let through (this is the HALF-OPEN state). If the request fails, the circuit-breaker returns to the OPEN state for the duration of the sleep window. If the request succeeds, the circuit-breaker transitions to CLOSED and the logic in 1. takes over again.

Q3:fallback時我們應該怎麼辦?

一般有以下幾種政策:

1、不實作getFallback方法:依賴調用失敗時直接抛出異常

2、實作getFallback方法,傳回預設值:這是一種常見的政策

3、實作getFallback方法,走降級方案

此外,生産環境中,fallback時,一般需要打點記錄

請求合并

簡單來說,就是将一段時間内的多次請求合并為一次請求,常用于網絡IO中,能減少IO次數,缺點是增加平均延遲

以下是測試代碼主程式:

1public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {
 2
 3    private final Integer key;
 4
 5    public CommandCollapserGetValueForKey(Integer key) {
 6        super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser"))
 7                .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
 8                        .withMaxRequestsInBatch(3)
 9                .withTimerDelayInMilliseconds(10)));
10        this.key = key;
11    }
12
13    @Override
14    public Integer getRequestArgument() {
15        return key;
16    }
17
18    @Override
19    protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
20        return new BatchCommand(requests);
21    }
22
23    @Override
24    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
25        int count = 0;
26        for (CollapsedRequest<String, Integer> request : requests) {
27            request.setResponse(batchResponse.get(count++));
28        }
29    }
30
31    private static final class BatchCommand extends HystrixCommand<List<String>> {
32        private final Collection<CollapsedRequest<String, Integer>> requests;
33
34        private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
35            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
36                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
37            this.requests = requests;
38        }
39
40        @Override
41        protected List<String> run() {
42            System.out.println("BatchCommand run  "+requests.size());
43            ArrayList<String> response = new ArrayList<String>();
44            for (CollapsedRequest<String, Integer> request : requests) {
45                // artificial response for each argument received in the batch
46                response.add("ValueForKey: " + request.getArgument());
47            }
48            return response;
49        }
50    }
51}
52
53
54@Test
55public void testCollapser() throws Exception {
56    HystrixRequestContext context = HystrixRequestContext.initializeContext();
57    try {
58        Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
59        Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
60        Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
61        Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();
62
63
64        assertEquals("ValueForKey: 1", f1.get());
65        assertEquals("ValueForKey: 2", f2.get());
66        assertEquals("ValueForKey: 3", f3.get());
67        assertEquals("ValueForKey: 4", f4.get());
68
69        // assert that the batch command 'GetValueForKey' was in fact
70        // executed and that it executed only once
71        assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
72        HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
73        // assert the command is the one we're expecting
74        assertEquals("GetValueForKey", command.getCommandKey().name());
75        // confirm that it was a COLLAPSED command execution
76        assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
77        // and that it was successful
78        assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
79    } finally {
80        context.shutdown();
81    }
82}
83
84//控制輸出
85BatchCommand run  3
86BatchCommand run  1      

執行流程:

使用該特性

1、必須繼承HystrixCollapser類,

2、實作以下方法:

getRequestArgument: 傳回請求參數對象

createCommand : 傳回BatchCommand

mapResponseToRequests:實作Response和Request的映射

3、建立對應的BatchCommand類:批量請求的具體實作

參數配置:

名稱 類型 含義 預設值
maxRequestsInBatch      
Integer      
每個批次最大的請求數,超過該值,建立新的batch請求
Integer.MAX_VALUE      
timerDelayInMilliseconds      
Integer      
等待時間視窗,超過該值,建立新的batch請求 10ms
requestCacheEnabled      
Boolean      
是否啟用cache true

一般配置如下

Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser"))
       .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
               .withMaxRequestsInBatch(3)
       .withTimerDelayInMilliseconds(5));      

請求cache

1public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
 2    private final int value;
 3
 4    public CommandUsingRequestCache(int value) {
 5        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
 6        this.value = value;
 7    }
 8
 9    @Override
10    public Boolean run() {
11        return value == 0 || value % 2 == 0;
12    }
13
14   //使用cache功能,必須實作該方法
15    @Override
16    public String getCacheKey() {
17        return String.valueOf(value);
18    }
19}
20
21@Test
22public void testWithCacheHits() {
23    HystrixRequestContext context = HystrixRequestContext.initializeContext();
24    try {
25        CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
26        CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);
27
28        assertTrue(command2a.execute());
29        //第一次請求,沒有cache
30        assertFalse(command2a.isResponseFromCache());
31
32        assertTrue(command2b.execute());
33        // 第二次請求,從cache中拿的結果
34        assertTrue(command2b.isResponseFromCache());
35    } finally {
36        context.shutdown();
37    }
38
39    context = HystrixRequestContext.initializeContext();
40    try {
41        CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);
42        assertTrue(command3b.execute());
43        // this is a new request context so this
44        //new了新的 request context後,之前的cache失效
45        assertFalse(command3b.isResponseFromCache());
46    } finally {
47        context.shutdown();
48    }
49}      

Hystrix Context

Global Context

UserRequest Context

使用與監控

1、工程中使用

使用Hystrix很簡單,隻需要添加相應依賴即可,以Maven為例:

1<!-- hystrix 依賴 -->
 2<dependency>
 3    <groupId>com.netflix.hystrix</groupId>
 4    <artifactId>hystrix-core</artifactId>
 5    <version>1.5.9</version>
 6</dependency>
 7<dependency>
 8    <groupId>com.netflix.hystrix</groupId>
 9    <artifactId>hystrix-metrics-event-stream</artifactId>
10    <version>1.5.9</version>
11</dependency>      

2、DashBoard使用 

web.xml中配置相應的Servlet

1<servlet>
2          <display-name>HystrixMetricsStreamServlet</display-name>
3          <servlet-name>HystrixMetricsStreamServlet</servlet-name>
4          <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
5</servlet>
6<servlet-mapping>
7          <servlet-name>HystrixMetricsStreamServlet</servlet-name>
8          <url-pattern>/hystrix.stream</url-pattern>
9</servlet-mapping>      

下載下傳附件中的war檔案和jar檔案到任意目錄,執行

java -jar jetty-runner-9.2.10.v20150310.jar --port 8410 hystrix-dashboard-1.5.1.war      

然後在浏覽器中打開:http://localhost:8410/  ,在輸入框中填寫 http://hostname:port/application/hystrix.stream, 點選 Add Stream ,然後在點選Monitor Stream

一般來說: Thread-pool Rejections  和Failuress/Exception應該是0,Thread timeouts是個很小的值。

來源:部落格園--李平

若有技術難題

可聯系小編進技術交流群

若有生活難題

也可聯系小編

我們可以從詩詞歌賦聊到人生哲學

Hystrix入門指南

說了這麼多我隻要你關注我