Introduction
1、Where does the name come from?
hystrix對應的中文名字是“豪豬”,豪豬周身長滿了刺,能保護自己不受天敵的傷害,代表了一種防禦機制,這與hystrix本身的功能不謀而合,是以Netflix團隊将該架構命名為Hystrix,并使用了對應的卡通形象做作為logo。
2、What Is Hystrix?
在一個分布式系統裡,許多依賴不可避免的會調用失敗,比如逾時、異常等,如何能夠保證在一個依賴出問題的情況下,不會導緻整體服務失敗,這個就是Hystrix需要做的事情。Hystrix提供了熔斷、隔離、Fallback、cache、監控等功能,能夠在一個、或多個依賴同時出現問題時保證系統依然可用。
3、Hello Hystrix
1public class CommandHelloWorld extends HystrixCommand<String> {
2
3 private final String name;
4
5 public CommandHelloWorld(String name) {
6 super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); //必須
7 this.name = name;
8 }
9
10 @Override
11 protected String run() {
12 /*
13 網絡調用 或者其他一些業務邏輯,可能會逾時或者抛異常
14 */
15 return "Hello " + name + "!";
16 }
17}
18
19String s = new CommandHelloWorld("Bob").execute(); //
20Future<String> s = new CommandHelloWorld("Bob").queue();
21Observable<String> s = new CommandHelloWorld("Bob").observe();
22Observable<String> s = new CommandHelloWorld("Bob").toObservable()
4、Flow Chart
說明:
- Construct a HystrixCommand or HystrixObservableCommand Object
- Execute the Command
- Is the Response Cached?
- Is the Circuit Open?
- Is the Thread Pool/Queue/Semaphore Full?
- HystrixObservableCommand.construct() or HystrixCommand.run()
- Calculate Circuit Health
- Get the Fallback
- Return the Successful Response
常用功能介紹
依賴隔離
一個使用者請求的成功執行,肯能依賴數十上百個外部服務,如果沒有隔離,單個依賴的失敗,可能會印象其他依賴的正常執行。如下圖所示,為每個依賴配置了單獨線程池
在下圖中,當Dep I 出現問題時,DepA 和Dep M大以來可以正常執行
線程池隔離的使用例子
1public class CommandHelloWorld extends HystrixCommand<String> {
2
3 private final String name;
4
5 public CommandHelloWorld(String name) {
6 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必須
7 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) //可選,預設 使用 this.getClass().getSimpleName();
8 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)));
9
10 this.name = name;
11 }
12
13 @Override
14 protected String run() throws InterruptedException {
15 System.out.println("running");
16 TimeUnit.MILLISECONDS.sleep(1000);
17 return "Hello " + name + "!";
18 }
19
20}
線程池常用參數設定:
實作類:HystrixThreadPoolProperties
1Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必須
2 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
3 .withExecutionTimeoutInMilliseconds(500)) //逾時時間
4 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) //可選,預設 使用 this.getClass().getSimpleName();
5 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)
6 .withMaxQueueSize(10).withQueueSizeRejectionThreshold(7))
Q: 怎麼設定線程池大小?
A:Qps* Tp99 +備援線程
信号量隔離
線程池隔離中,發起請求的線程和真實執行的線程不是同一個線程,使用信号量隔離時,它們是同一個線程, 兩種隔離的差別如下圖:
1public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> {
2
3 private final int id;
4 private long start,end ;
5
6 public CommandUsingSemaphoreIsolation(int id) {
7 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
8 // since we're doing an in-memory cache lookup we choose SEMAPHORE isolation
9 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
10 .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) //設定使用信号量隔離政策
11 .withExecutionIsolationSemaphoreMaxConcurrentRequests(3) //設定信号量隔離時的最大并發請求數
12 .withFallbackIsolationSemaphoreMaxConcurrentRequests(5) //設定fallback的最大并發數
13 .withExecutionTimeoutInMilliseconds(300))); //設定逾時時間
14 this.id = id;
15 this.start = System.currentTimeMillis();
16 }
17
18 @Override
19 protected String run() throws InterruptedException {
20 // a real implementation would retrieve data from in memory data structure
21 TimeUnit.MILLISECONDS.sleep(id*30);
22 System.out.println("running normal, id="+id);
23 return "ValueFromHashMap_" + id;
24 }
25
26 @Override
27 protected String getFallback(){
28 System.out.println(" fallback, id="+id);
29 return "fallback:"+id;
30 }
31
32}
33
34@Test
35public void maxCurrentRequst() throws InterruptedException {
36 int count =10;
37 while (count >0){
38 int id = count--;
39 new Thread(() -> {
40 try {
41 new CommandUsingSemaphoreIsolation(id).execute();
42 }catch (Exception ex){
43 System.out.println("Exception:"+ex.getMessage()+" id="+id);
44 }
45 }).start();
46 }
47
48 TimeUnit.SECONDS.sleep(100);
49}
50 //注:使用信号量隔離,在同一個線程中即使循環調用new CommandUsingSemaphoreIsolation(id).queue(),run方法也是順序執行;
//控制台輸出
fallback, id=10
fallback, id=9
fallback, id=5
fallback, id=8
fallback, id=1
Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=4
Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=7
running normal, id=2
running normal, id=3
running normal, id=6
Q: 什麼時候使用線程池隔離,什麼使用信号量隔離?
A: 線程池隔離缺點是帶來一定的開銷,但不會阻塞請求線程,适合于于IO密集型的任務
信号量隔離使用使用者請求線程,沒有格外線程切換開銷,使用與執行時間和執行邏輯都比較短的本地計算。比如CPU密集型的任務
Fallback
Q1: 為什麼需要fallback?
簡單來說,在依賴調用失敗時,我們一般會需要提供降級方案,Hystrix對此提供了支援
降級
1public class CommandHelloWorld extends HystrixCommand<String> {
2
3 private final String name;
4
5 public CommandHelloWorld(String name) {
6 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必須
7 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
8 .withExecutionTimeoutInMilliseconds(500)) //逾時時間
9 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) //可選,預設 使用 this.getClass().getSimpleName();
10 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)));
11
12 this.name = name;
13 }
14
15 @Override
16 protected String run() throws InterruptedException {
17 System.out.println("running");
18 TimeUnit.MILLISECONDS.sleep(1000);
19 return "Hello " + name + "!";
20 }
21
22 @Override
23 protected String getFallback() {
24 return "Hello "+"Fallback";
25 }
26}
27
28@Test
29public void fallbackTest(){
30 assertEquals("Hello Fallback",new CommandHelloWorld("World").execute());
31}
Q2:什麼情況下會觸發fallback?
簡單來說,就是run方法抛異常,逾時,線程/信号量reject、短路
Failure Type | Exception class | Exception.cause | subject to fallback |
FAILURE | | underlying exception (user-controlled) | YES |
TIMEOUT | | | YES |
SHORT_CIRCUITED | | | YES |
THREAD_POOL_REJECTED | | | YES |
SEMAPHORE_REJECTED | | | YES |
BAD_REQUEST | | underlying exception (user-controlled) | NO |
以下為測試的主程式:
1public class CommandHelloFailure extends HystrixCommand<String> {
2
3 private final String name;
4
5 public CommandHelloFailure(String name) {
6 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必須
7 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
8 .withExecutionTimeoutInMilliseconds(1000)) //逾時時間
9 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))
10 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(3)));
11
12 this.name = name;
13 }
14
15 @Override
16 protected String run() throws InterruptedException {
17 String theadName = this.getThreadPoolKey().name();
18 String cmdKey=this.getThreadPoolKey().name();
19 System.out.println("running begin , threadPool="+theadName+" cmdKey="+cmdKey+" name="+name);
20
21 if("Exception".equals(name)) {
22 throw new RuntimeException("this command always fails");
23 }else if("Timeout".equals(name)){
24 TimeUnit.SECONDS.sleep(2);
25 }else if("Reject".equals(name)){
26 TimeUnit.MILLISECONDS.sleep(800);
27 }
28 System.out.println(" run end");
29
30 return "Hello " + name + "!";
31 }
32
33 @Override
34 protected String getFallback() {
35 StringBuilder sb = new StringBuilder("running fallback");
36 boolean isRejected = isResponseRejected();
37 boolean isException = isFailedExecution();
38 boolean isTimeout= isResponseTimedOut();
39 boolean isCircut = isCircuitBreakerOpen();
40
41 sb.append(", isRejected:").append(isRejected);
42 sb.append(", isException:"+isException);
43 if(isException){
44 sb.append(" msg=").append(getExecutionException().getMessage());
45 }
46 sb.append(", isTimeout: "+isTimeout);
47 sb.append(", isCircut:"+isCircut);
48
49 sb.append(", group:").append(this.getCommandGroup().name());
50 sb.append(", threadpool:").append(getThreadPoolKey().name());
51 System.out.println(sb.toString());
52
53 String msg="Hello Failure " + name + "!";
54 return msg;
55 }
56}
FAILURE
測試由異常導緻的fallback
1 @Test
2 public void expTest() {
3 assertEquals("Hello Failure Exception!", new CommandHelloFailure("Exception").execute());
4 }
5
//控制台輸出
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Exception
running fallback, isRejected:false, isException:true msg=this command always fails, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
TIMEOUT
測試有逾時導緻的fallback
@Test
public void timeOutTest() {
assertEquals("Hello Failure Timeout!", new CommandHelloFailure("Timeout").execute());
}
//控制台輸出
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Timeout
running fallback, isRejected:false, isException:false, isTimeout: true, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
THREAD_POOL_REJECTED
并發執行的任務數超過線程池和隊列之和會被reject,導緻fallback
1@Test
2public void rejectTest() throws InterruptedException {
3 int count = 5;
4 while (count-- > 0){
5 new CommandHelloFailure("Reject").queue();
6 TimeUnit.MILLISECONDS.sleep(100);
7 }
8}
//控制台輸出
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
SEMAPHORE_REJECTED 與 THREAD_POOL_REJECTED 類似,不再示範
SHORT_CIRCUITED
在一定時間内,使用者請求超過一定的比例失敗時(timeout, failure, reject),斷路器就會打開;短路器打開後所有請求直接走fallback
參數設定
名稱 | 類型 | 含義 | 預設值 |
circuitBreakerEnabled | Boolean | 是否啟用斷路器 | true |
circuitBreakerErrorThresholdPercentage | Integer | 錯誤百分比,超過該值打開斷路器 | 50 |
circuitBreakerForceClosed | Boolean | 強制斷路器打開 | false |
circuitBreakerForceOpen | Boolean | 強制短路器關閉 | false |
circuitBreakerRequestVolumeThreshold | Integer | 10s中内最少的請求量,大于該值,斷路器配置才會生效 | 20 |
circuitBreakerSleepWindowInMilliseconds | Integer | 短路器打開後多長時間嘗試關閉(Half open) | 5s |
一般配置如下:
1Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必須
2 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
3 .withExecutionTimeoutInMilliseconds(50)//逾時時間
4 .withCircuitBreakerRequestVolumeThreshold(5)
5 .withCircuitBreakerSleepWindowInMilliseconds(1000)
6 .withCircuitBreakerErrorThresholdPercentage(50))
7 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) //可選,預設 使用 this.getClass().getSimpleName();
8 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4));
以上配置的含義是: 在10s内,如果請求在5個及以上,且有50%失敗的情況下,開啟斷路器;斷路器開啟1000ms後嘗試關閉
短路器的工作機制,引用自官方文檔:
The precise way that the circuit opening and closing occurs is as follows:
Assuming the volume across a circuit meets a certain threshold (HystrixCommandProperties.circuitBreakerRequestVolumeThreshold())...
And assuming that the error percentage exceeds the threshold error percentage (HystrixCommandProperties.circuitBreakerErrorThresholdPercentage())...
Then the circuit-breaker transitions from CLOSED to OPEN.
While it is open, it short-circuits all requests made against that circuit-breaker.
After some amount of time (HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()), the next single request is let through (this is the HALF-OPEN state). If the request fails, the circuit-breaker returns to the OPEN state for the duration of the sleep window. If the request succeeds, the circuit-breaker transitions to CLOSED and the logic in 1. takes over again.
Q3:fallback時我們應該怎麼辦?
一般有以下幾種政策:
1、不實作getFallback方法:依賴調用失敗時直接抛出異常
2、實作getFallback方法,傳回預設值:這是一種常見的政策
3、實作getFallback方法,走降級方案
此外,生産環境中,fallback時,一般需要打點記錄
請求合并
簡單來說,就是将一段時間内的多次請求合并為一次請求,常用于網絡IO中,能減少IO次數,缺點是增加平均延遲
以下是測試代碼主程式:
1public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {
2
3 private final Integer key;
4
5 public CommandCollapserGetValueForKey(Integer key) {
6 super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser"))
7 .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
8 .withMaxRequestsInBatch(3)
9 .withTimerDelayInMilliseconds(10)));
10 this.key = key;
11 }
12
13 @Override
14 public Integer getRequestArgument() {
15 return key;
16 }
17
18 @Override
19 protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
20 return new BatchCommand(requests);
21 }
22
23 @Override
24 protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
25 int count = 0;
26 for (CollapsedRequest<String, Integer> request : requests) {
27 request.setResponse(batchResponse.get(count++));
28 }
29 }
30
31 private static final class BatchCommand extends HystrixCommand<List<String>> {
32 private final Collection<CollapsedRequest<String, Integer>> requests;
33
34 private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
35 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
36 .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
37 this.requests = requests;
38 }
39
40 @Override
41 protected List<String> run() {
42 System.out.println("BatchCommand run "+requests.size());
43 ArrayList<String> response = new ArrayList<String>();
44 for (CollapsedRequest<String, Integer> request : requests) {
45 // artificial response for each argument received in the batch
46 response.add("ValueForKey: " + request.getArgument());
47 }
48 return response;
49 }
50 }
51}
52
53
54@Test
55public void testCollapser() throws Exception {
56 HystrixRequestContext context = HystrixRequestContext.initializeContext();
57 try {
58 Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
59 Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
60 Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
61 Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();
62
63
64 assertEquals("ValueForKey: 1", f1.get());
65 assertEquals("ValueForKey: 2", f2.get());
66 assertEquals("ValueForKey: 3", f3.get());
67 assertEquals("ValueForKey: 4", f4.get());
68
69 // assert that the batch command 'GetValueForKey' was in fact
70 // executed and that it executed only once
71 assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
72 HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
73 // assert the command is the one we're expecting
74 assertEquals("GetValueForKey", command.getCommandKey().name());
75 // confirm that it was a COLLAPSED command execution
76 assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
77 // and that it was successful
78 assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
79 } finally {
80 context.shutdown();
81 }
82}
83
84//控制輸出
85BatchCommand run 3
86BatchCommand run 1
執行流程:
使用該特性
1、必須繼承HystrixCollapser類,
2、實作以下方法:
getRequestArgument: 傳回請求參數對象
createCommand : 傳回BatchCommand
mapResponseToRequests:實作Response和Request的映射
3、建立對應的BatchCommand類:批量請求的具體實作
參數配置:
名稱 | 類型 | 含義 | 預設值 |
| | 每個批次最大的請求數,超過該值,建立新的batch請求 | |
| | 等待時間視窗,超過該值,建立新的batch請求 | 10ms |
| | 是否啟用cache | true |
一般配置如下
Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser"))
.andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
.withMaxRequestsInBatch(3)
.withTimerDelayInMilliseconds(5));
請求cache
1public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
2 private final int value;
3
4 public CommandUsingRequestCache(int value) {
5 super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
6 this.value = value;
7 }
8
9 @Override
10 public Boolean run() {
11 return value == 0 || value % 2 == 0;
12 }
13
14 //使用cache功能,必須實作該方法
15 @Override
16 public String getCacheKey() {
17 return String.valueOf(value);
18 }
19}
20
21@Test
22public void testWithCacheHits() {
23 HystrixRequestContext context = HystrixRequestContext.initializeContext();
24 try {
25 CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
26 CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);
27
28 assertTrue(command2a.execute());
29 //第一次請求,沒有cache
30 assertFalse(command2a.isResponseFromCache());
31
32 assertTrue(command2b.execute());
33 // 第二次請求,從cache中拿的結果
34 assertTrue(command2b.isResponseFromCache());
35 } finally {
36 context.shutdown();
37 }
38
39 context = HystrixRequestContext.initializeContext();
40 try {
41 CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);
42 assertTrue(command3b.execute());
43 // this is a new request context so this
44 //new了新的 request context後,之前的cache失效
45 assertFalse(command3b.isResponseFromCache());
46 } finally {
47 context.shutdown();
48 }
49}
Hystrix Context
Global Context
UserRequest Context
使用與監控
1、工程中使用
使用Hystrix很簡單,隻需要添加相應依賴即可,以Maven為例:
1<!-- hystrix 依賴 -->
2<dependency>
3 <groupId>com.netflix.hystrix</groupId>
4 <artifactId>hystrix-core</artifactId>
5 <version>1.5.9</version>
6</dependency>
7<dependency>
8 <groupId>com.netflix.hystrix</groupId>
9 <artifactId>hystrix-metrics-event-stream</artifactId>
10 <version>1.5.9</version>
11</dependency>
2、DashBoard使用
web.xml中配置相應的Servlet
1<servlet>
2 <display-name>HystrixMetricsStreamServlet</display-name>
3 <servlet-name>HystrixMetricsStreamServlet</servlet-name>
4 <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
5</servlet>
6<servlet-mapping>
7 <servlet-name>HystrixMetricsStreamServlet</servlet-name>
8 <url-pattern>/hystrix.stream</url-pattern>
9</servlet-mapping>
下載下傳附件中的war檔案和jar檔案到任意目錄,執行
java -jar jetty-runner-9.2.10.v20150310.jar --port 8410 hystrix-dashboard-1.5.1.war
然後在浏覽器中打開:http://localhost:8410/ ,在輸入框中填寫 http://hostname:port/application/hystrix.stream, 點選 Add Stream ,然後在點選Monitor Stream
一般來說: Thread-pool Rejections 和Failuress/Exception應該是0,Thread timeouts是個很小的值。
來源:部落格園--李平
若有技術難題
可聯系小編進技術交流群
若有生活難題
也可聯系小編
我們可以從詩詞歌賦聊到人生哲學
說了這麼多我隻要你關注我