天天看點

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

一:為什麼需要Hystrix?

在大中型分布式系統中,通常系統很多依賴(HTTP,hession,Netty,Dubbo等),如下圖:

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

在高并發通路下,這些依賴的穩定性與否對系統的影響非常大,但是依賴有很多不可控問題:如網絡連接配接緩慢,資源繁忙,暫時不可用,服務脫機等.

如下圖:QPS為50的依賴 I 出現不可用,但是其他依賴仍然可用.

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

當依賴I 阻塞時,大多數伺服器的線程池就出現阻塞(BLOCK),影響整個線上服務的穩定性.如下圖:

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

在複雜的分布式架構的應用程式有很多的依賴,都會不可避免地在某些時候失敗。高并發的依賴失敗時如果沒有隔離措施,目前應用服務就有被拖垮的風險。

例如:

一個依賴30個SOA服務的系統,每個服務99.99%可用。  

99.99%的30次方 ≈ 99.7%  

0.3% 意味着一億次請求 會有 3,000,00次失敗  

換算成時間大約每月有2個小時服務不穩定.  

随着服務依賴數量的變多,服務不穩定的機率會成指數性提高.  

 解決問題方案:對依賴做隔離,Hystrix就是處理依賴隔離的架構,同時也是可以幫我們做依賴服務的治理和監控.

 Netflix 公司開發并成功使用Hystrix,使用規模如下:

1

2

The Netflix API processes 

10

+ billion HystrixCommand executions per day using thread isolation.   

Each API instance has 

40

+ thread-pools with 

5

-

20

threads in each (most are set to 

10

).

二:Hystrix如何解決依賴隔離

1:Hystrix使用指令模式HystrixCommand(Command)包裝依賴調用邏輯,每個指令在單獨線程中/信号授權下執行。

2:可配置依賴調用逾時時間,逾時時間一般設為比99.5%平均時間略高即可.當調用逾時時,直接傳回或執行fallback邏輯。

3:為每個依賴提供一個小的線程池(或信号),如果線程池已滿調用将被立即拒絕,預設不采用排隊.加速失敗判定時間。

4:依賴調用結果分:成功,失敗(抛出異常),逾時,線程拒絕,短路。 請求失敗(異常,拒絕,逾時,短路)時執行fallback(降級)邏輯。

5:提供熔斷器元件,可以自動運作或手動調用,停止目前依賴一段時間(10秒),熔斷器預設錯誤率門檻值為50%,超過将自動運作。

6:提供近實時依賴的統計和監控

Hystrix依賴的隔離架構,如下圖:

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

三:如何使用Hystrix

1:使用maven引入Hystrix依賴

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

<!-- 依賴版本 -->

<

hystrix.version

>1.3.16</

hystrix.version

>  

<

hystrix-metrics-event-stream.version

>1.1.2</

hystrix-metrics-event-stream.version

>   

<

dependency

>  

<

groupId

>com.netflix.hystrix</

groupId

>  

<

artifactId

>hystrix-core</

artifactId

>  

<

version

>${hystrix.version}</

version

>  

</

dependency

>  

<

dependency

>  

<

groupId

>com.netflix.hystrix</

groupId

>  

<

artifactId

>hystrix-metrics-event-stream</

artifactId

>  

<

version

>${hystrix-metrics-event-stream.version}</

version

>  

</

dependency

>  

<!-- 倉庫位址 -->

<

repository

>  

<

id

>nexus</

id

>  

<

name

>local private nexus</

name

>  

<

url

>http://maven.oschina.net/content/groups/public/</

url

>  

<

releases

>  

<

enabled

>true</

enabled

>  

</

releases

>  

<

snapshots

>  

<

enabled

>false</

enabled

>  

</

snapshots

>  

</

repository

>

2:使用指令模式封裝依賴邏輯

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

public

class

HelloWorldCommand 

extends

HystrixCommand<String> {  

private

final

String name;  

public

HelloWorldCommand(String name) {  

//最少配置:指定指令組名(CommandGroup)  

super

(HystrixCommandGroupKey.Factory.asKey(

"ExampleGroup"

));  

this

.name = name;  

}  

@Override

protected

String run() {  

// 依賴邏輯封裝在run()方法中  

return

"Hello "

+ name +

" thread:"

+ Thread.currentThread().getName();  

}  

//調用執行個體  

public

static

void

main(String[] args) 

throws

Exception{  

//每個Command對象隻能調用一次,不可以重複調用,  

//重複調用對應異常資訊:This instance can only be executed once. Please instantiate a new instance.  

HelloWorldCommand helloWorldCommand = 

new

HelloWorldCommand(

"Synchronous-hystrix"

);  

//使用execute()同步調用代碼,效果等同于:helloWorldCommand.queue().get();   

String result = helloWorldCommand.execute();  

System.out.println(

"result="

+ result);  

helloWorldCommand = 

new

HelloWorldCommand(

"Asynchronous-hystrix"

);  

//異步調用,可自由控制擷取結果時機,  

Future<String> future = helloWorldCommand.queue();  

//get操作不能超過command定義的逾時時間,預設:1秒  

result = future.get(

100

, TimeUnit.MILLISECONDS);  

System.out.println(

"result="

+ result);  

System.out.println(

"mainThread="

+ Thread.currentThread().getName());  

}  

}  

//運作結果: run()方法在不同的線程下執行  

// result=Hello Synchronous-hystrix thread:hystrix-HelloWorldGroup-1  

// result=Hello Asynchronous-hystrix thread:hystrix-HelloWorldGroup-2  

// mainThread=main

note:異步調用使用 command.queue()get(timeout, TimeUnit.MILLISECONDS);同步調用使用command.execute() 等同于 command.queue().get();

3:注冊異步事件回調執行

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

//注冊觀察者事件攔截  

Observable<String> fs = 

new

HelloWorldCommand(

"World"

).observe();  

//注冊結果回調事件  

fs.subscribe(

new

Action1<String>() {  

@Override

public

void

call(String result) {  

//執行結果處理,result 為HelloWorldCommand傳回的結果  

//使用者對結果做二次處理.  

}  

});  

//注冊完整執行生命周期事件  

fs.subscribe(

new

Observer<String>() {  

@Override

public

void

onCompleted() {  

// onNext/onError完成之後最後回調  

System.out.println(

"execute onCompleted"

);  

}  

@Override

public

void

onError(Throwable e) {  

// 當産生異常時回調  

System.out.println(

"onError "

+ e.getMessage());  

e.printStackTrace();  

}  

@Override

public

void

onNext(String v) {  

// 擷取結果後回調  

System.out.println(

"onNext: "

+ v);  

}  

});  

4:使用Fallback() 提供降級政策

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

//重載HystrixCommand 的getFallback方法實作邏輯  

public

class

HelloWorldCommand 

extends

HystrixCommand<String> {  

private

final

String name;  

public

HelloWorldCommand(String name) {  

super

(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(

"HelloWorldGroup"

))  

.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500)));  

this.name = name;  

}  

@Override  

protected String getFallback() {  

return "exeucute Falled";  

}  

@Override  

protected String run() throws Exception {  

//sleep 1 秒,調用會逾時  

TimeUnit.MILLISECONDS.sleep(1000);  

return "Hello " + name +" thread:" + Thread.currentThread().getName();  

}  

public static void main(String[] args) throws Exception{  

HelloWorldCommand command = new HelloWorldCommand("test-Fallback");  

String result = command.execute();  

}  

}  

NOTE: 除了HystrixBadRequestException異常之外,所有從run()方法抛出的異常都算作失敗,并觸發降級getFallback()和斷路器邏輯。

          HystrixBadRequestException用在非法參數或非系統故障異常等不應觸發回退邏輯的場景。

5:依賴命名:CommandKey

1

2

3

4

5

6

public

HelloWorldCommand(String name) {  

super

(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(

"ExampleGroup"

))  

.andCommandKey(HystrixCommandKey.Factory.asKey(

"HelloWorld"

)));  

this

.name = name;  

}

 NOTE: 每個CommandKey代表一個依賴抽象,相同的依賴要使用相同的CommandKey名稱。依賴隔離的根本就是對相同CommandKey的依賴做隔離.

6:依賴分組:CommandGroup

指令分組用于對依賴操作分組,便于統計,彙總等.

1

2

3

4

//使用HystrixCommandGroupKey工廠定義  

public

HelloWorldCommand(String name) {  

Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(

"HelloWorldGroup"

))  

}

NOTE: CommandGroup是每個指令最少配置的必選參數,在不指定ThreadPoolKey的情況下,字面值用于對不同依賴的線程池/信号區分.

7:線程池/信号:ThreadPoolKey

1

2

3

4

5

6

7

public

HelloWorldCommand(String name) {  

super

(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(

"ExampleGroup"

))  

.andCommandKey(HystrixCommandKey.Factory.asKey(

"HelloWorld"

))  

.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(

"HelloWorldPool"

)));  

this

.name = name;  

}

NOTE: 當對同一業務依賴做隔離時使用CommandGroup做區分,但是對同一依賴的不同遠端調用如(一個是redis 一個是http),可以使用HystrixThreadPoolKey做隔離區分.

           最然在業務上都是相同的組,但是需要在資源上做隔離時,可以使用HystrixThreadPoolKey區分.

8:請求緩存 Request-Cache

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

public

class

RequestCacheCommand 

extends

HystrixCommand<String> {  

private

final

int

id;  

public

RequestCacheCommand( 

int

id) {  

super

(HystrixCommandGroupKey.Factory.asKey(

"RequestCacheCommand"

));  

this

.id = id;  

}  

@Override

protected

String run() 

throws

Exception {  

System.out.println(Thread.currentThread().getName() + 

" execute id="

+ id);  

return

"executed="

+ id;  

}  

//重寫getCacheKey方法,實作區分不同請求的邏輯  

@Override

protected

String getCacheKey() {  

return

String.valueOf(id);  

}  

public

static

void

main(String[] args){  

HystrixRequestContext context = HystrixRequestContext.initializeContext();  

try

{  

RequestCacheCommand command2a = 

new

RequestCacheCommand(

2

);  

RequestCacheCommand command2b = 

new

RequestCacheCommand(

2

);  

Assert.assertTrue(command2a.execute());  

//isResponseFromCache判定是否是在緩存中擷取結果  

Assert.assertFalse(command2a.isResponseFromCache());  

Assert.assertTrue(command2b.execute());  

Assert.assertTrue(command2b.isResponseFromCache());  

finally

{  

context.shutdown();  

}  

context = HystrixRequestContext.initializeContext();  

try

{  

RequestCacheCommand command3b = 

new

RequestCacheCommand(

2

);  

Assert.assertTrue(command3b.execute());  

Assert.assertFalse(command3b.isResponseFromCache());  

finally

{  

context.shutdown();  

}  

}  

}

NOTE:請求緩存可以讓(CommandKey/CommandGroup)相同的情況下,直接共享結果,降低依賴調用次數,在高并發和CacheKey碰撞率高場景下可以提升性能.

Servlet容器中,可以直接實用Filter機制Hystrix請求上下文

1

2

3

4

5

6

7

8

9

10

11

public

class

HystrixRequestContextServletFilter 

implements

Filter {  

public

void

doFilter(ServletRequest request, ServletResponse response, FilterChain chain)   

throws

IOException, ServletException {  

HystrixRequestContext context = HystrixRequestContext.initializeContext();  

try

{  

chain.doFilter(request, response);  

finally

{  

context.shutdown();  

}  

}  

}

1

2

3

4

5

6

7

8

9

<

filter

>  

<

display-name

>HystrixRequestContextServletFilter</

display-name

>  

<

filter-name

>HystrixRequestContextServletFilter</

filter-name

>  

<

filter-class

>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</

filter-class

>  

</

filter

>  

<

filter-mapping

>  

<

filter-name

>HystrixRequestContextServletFilter</

filter-name

>  

<

url-pattern

>  

.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)));  

this.name = name;  

}  

@Override  

protected String run() throws Exception {  

return "HystrixThread:" + Thread.currentThread().getName();  

}  

public static void main(String[] args) throws Exception{  

HelloWorldCommand command = new HelloWorldCommand("semaphore");  

String result = command.execute();  

System.out.println(result);  

System.out.println("MainThread:" + Thread.currentThread().getName());  

}  

}  

10:fallback降級邏輯指令嵌套

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

适用場景:用于fallback邏輯涉及網絡通路的情況,如緩存通路。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

public

class

CommandWithFallbackViaNetwork 

extends

HystrixCommand<String> {  

private

final

int

id;  

protected

CommandWithFallbackViaNetwork(

int

id) {  

super

(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(

"RemoteServiceX"

))  

.andCommandKey(HystrixCommandKey.Factory.asKey(

"GetValueCommand"

)));  

this

.id = id;  

}  

@Override

protected

String run() {  

// RemoteService.getValue(id);  

throw

new

RuntimeException(

"force failure for example"

);  

}  

@Override

protected

String getFallback() {  

return

new

FallbackViaNetwork(id).execute();  

}  

private

static

class

FallbackViaNetwork 

extends

HystrixCommand<String> {  

private

final

int

id;  

public

FallbackViaNetwork(

int

id) {  

super

(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(

"RemoteServiceX"

))  

.andCommandKey(HystrixCommandKey.Factory.asKey(

"GetValueFallbackCommand"

))  

// 使用不同的線程池做隔離,防止上層線程池跑滿,影響降級邏輯.  

.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(

"RemoteServiceXFallback"

)));  

this

.id = id;  

}  

@Override

protected

String run() {  

MemCacheClient.getValue(id);  

}  

@Override

protected

String getFallback() {  

return

null

;  

}  

}  

}

NOTE:依賴調用和降級調用使用不同的線程池做隔離,防止上層線程池跑滿,影響二級降級邏輯調用.

11:顯示調用fallback邏輯,用于特殊業務處理

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

public

class

CommandFacadeWithPrimarySecondary 

extends

HystrixCommand<String> {  

private

final

static

DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty(

"primarySecondary.usePrimary"

true

);  

private

final

int

id;  

public

CommandFacadeWithPrimarySecondary(

int

id) {  

super

(Setter  

.withGroupKey(HystrixCommandGroupKey.Factory.asKey(

"SystemX"

))  

.andCommandKey(HystrixCommandKey.Factory.asKey(

"PrimarySecondaryCommand"

))  

.andCommandPropertiesDefaults(  

HystrixCommandProperties.Setter()  

.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));  

this

.id = id;  

}  

@Override

protected

String run() {  

if

(usePrimary.get()) {  

return

new

PrimaryCommand(id).execute();  

else

{  

return

new

SecondaryCommand(id).execute();  

}  

}  

@Override

protected

String getFallback() {  

return

"static-fallback-"

+ id;  

}  

@Override

protected

String getCacheKey() {  

return

String.valueOf(id);  

}  

private

static

class

PrimaryCommand 

extends

HystrixCommand<String> {  

private

final

int

id;  

private

PrimaryCommand(

int

id) {  

super

(Setter  

.withGroupKey(HystrixCommandGroupKey.Factory.asKey(

"SystemX"

))  

.andCommandKey(HystrixCommandKey.Factory.asKey(

"PrimaryCommand"

))  

.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(

"PrimaryCommand"

))  

.andCommandPropertiesDefaults(  

// we default to a 600ms timeout for primary  

HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(

600

)));  

this

.id = id;  

}  

@Override

protected

String run() {  

// perform expensive 'primary' service call  

return

"responseFromPrimary-"

+ id;  

}  

}  

private

static

class

SecondaryCommand 

extends

HystrixCommand<String> {  

private

final

int

id;  

private

SecondaryCommand(

int

id) {  

super

(Setter  

.withGroupKey(HystrixCommandGroupKey.Factory.asKey(

"SystemX"

))  

.andCommandKey(HystrixCommandKey.Factory.asKey(

"SecondaryCommand"

))  

.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(

"SecondaryCommand"

))  

.andCommandPropertiesDefaults(  

// we default to a 100ms timeout for secondary  

HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(

100

)));  

this

.id = id;  

}  

@Override

protected

String run() {  

// perform fast 'secondary' service call  

return

"responseFromSecondary-"

+ id;  

}  

}  

public

static

class

UnitTest {  

@Test

public

void

testPrimary() {  

HystrixRequestContext context = HystrixRequestContext.initializeContext();  

try

{  

ConfigurationManager.getConfigInstance().setProperty(

"primarySecondary.usePrimary"

true

);  

assertEquals(

"responseFromPrimary-20"

new

CommandFacadeWithPrimarySecondary(

20

).execute());  

finally

{  

context.shutdown();  

ConfigurationManager.getConfigInstance().clear();  

}  

}  

@Test

public

void

testSecondary() {  

HystrixRequestContext context = HystrixRequestContext.initializeContext();  

try

{  

ConfigurationManager.getConfigInstance().setProperty(

"primarySecondary.usePrimary"

false

);  

assertEquals(

"responseFromSecondary-20"

new

CommandFacadeWithPrimarySecondary(

20

).execute());  

finally

{  

context.shutdown();  

ConfigurationManager.getConfigInstance().clear();  

}  

}  

}  

}

NOTE:顯示調用降級适用于特殊需求的場景,fallback用于業務處理,fallback不再承擔降級職責,建議慎重使用,會造成監控統計換亂等問題.

12:指令調用合并:HystrixCollapser

指令調用合并允許多個請求合并到一個線程/信号下批量執行。

執行流程圖如下:

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

public

class

CommandCollapserGetValueForKey 

extends

HystrixCollapser<List<String>, String, Integer> {  

private

final

Integer key;  

public

CommandCollapserGetValueForKey(Integer key) {  

this

.key = key;  

}  

@Override

public

Integer getRequestArgument() {  

return

key;  

}  

@Override

protected

HystrixCommand<List<String>> createCommand(

final

Collection<CollapsedRequest<String, Integer>> requests) {  

//建立傳回command對象  

return

new

BatchCommand(requests);  

}  

@Override

protected

void

mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {  

int

count = 

;  

for

(CollapsedRequest<String, Integer> request : requests) {  

//手動比對請求和響應  

request.setResponse(batchResponse.get(count++));  

}  

}  

private

static

final

class

BatchCommand 

extends

HystrixCommand<List<String>> {  

private

final

Collection<CollapsedRequest<String, Integer>> requests;  

private

BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {  

super

(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(

"ExampleGroup"

))  

.andCommandKey(HystrixCommandKey.Factory.asKey(

"GetValueForKey"

)));  

this

.requests = requests;  

}  

@Override

protected

List<String> run() {  

ArrayList<String> response = 

new

ArrayList<String>();  

for

(CollapsedRequest<String, Integer> request : requests) {  

response.add(

"ValueForKey: "

+ request.getArgument());  

}  

return

response;  

}  

}  

public

static

class

UnitTest {  

HystrixRequestContext context = HystrixRequestContext.initializeContext();  

try

{  

Future<String> f1 = 

new

CommandCollapserGetValueForKey(

1

).queue();  

Future<String> f2 = 

new

CommandCollapserGetValueForKey(

2

).queue();  

Future<String> f3 = 

new

CommandCollapserGetValueForKey(

3

).queue();  

Future<String> f4 = 

new

CommandCollapserGetValueForKey(

4

).queue();  

assertEquals(

"ValueForKey: 1"

, f1.get());  

assertEquals(

"ValueForKey: 2"

, f2.get());  

assertEquals(

"ValueForKey: 3"

, f3.get());  

assertEquals(

"ValueForKey: 4"

, f4.get());  

assertEquals(

1

, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());  

HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(

new

HystrixCommand<?>[

1

])[

];  

assertEquals(

"GetValueForKey"

, command.getCommandKey().name());  

assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));  

assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));  

finally

{  

context.shutdown();  

}     

}  

}

 NOTE:使用場景:HystrixCollapser用于對多個相同業務的請求合并到一個線程甚至可以合并到一個連接配接中執行,降低線程互動次和IO數,但必須保證他們屬于同一依賴.

四:監控平台搭建Hystrix-dashboard

1:監控dashboard介紹

dashboard面闆可以對依賴關鍵名額提供實時監控,如下圖:

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

2:執行個體暴露command統計資料

Hystrix使用Servlet對目前JVM下所有command調用情況作資料流輸出

配置如下:

1

2

3

4

5

6

7

8

9

10

11

12

<

servlet

>  

<

display-name

>HystrixMetricsStreamServlet</

display-name

>  

<

servlet-name

>HystrixMetricsStreamServlet</

servlet-name

>  

<

servlet-class

>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</

servlet-class

>  

</

servlet

>  

<

servlet-mapping

>  

<

servlet-name

>HystrixMetricsStreamServlet</

servlet-name

>  

<

url-pattern

>/hystrix.stream</

url-pattern

>  

</

servlet-mapping

>  

<!--  

對應URL格式 : http://hostname:port/application/hystrix.stream 

-->

3:叢集模式監控統計搭建

1)使用Turbine元件做叢集資料彙總

結構圖如下:

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

2)内嵌jetty提供Servlet容器,暴露HystrixMetrics

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

public

class

JettyServer {  

private

final

Logger logger = LoggerFactory.getLogger(

this

.getClass());  

private

int

port;  

private

ExecutorService executorService = Executors.newFixedThreadPool(

1

);  

private

Server server = 

null

;  

public

void

init() {  

try

{  

executorService.execute(

new

Runnable() {  

@Override

public

void

run() {  

try

{  

//綁定8080端口,加載HystrixMetricsStreamServlet并映射url  

server = 

new

Server(

8080

);  

WebAppContext context = 

new

WebAppContext();  

context.setContextPath(

"/"

);  

context.addServlet(HystrixMetricsStreamServlet.

class

"/hystrix.stream"

);  

context.setResourceBase(

"."

);  

server.setHandler(context);  

server.start();  

server.join();  

catch

(Exception e) {  

logger.error(e.getMessage(), e);  

}  

}  

});  

catch

(Exception e) {  

logger.error(e.getMessage(), e);  

}  

}  

public

void

destory() {  

if

(server != 

null

) {  

try

{  

server.stop();  

server.destroy();  

logger.warn(

"jettyServer stop and destroy!"

);  

catch

(Exception e) {  

logger.error(e.getMessage(), e);  

}  

}  

}  

}

3)Turbine搭建和配置

  a:配置Turbine Servlet收集器

1

2

3

4

5

6

7

8

9

10

<

servlet

>  

<

description

></

description

>  

<

display-name

>TurbineStreamServlet</

display-name

>  

<

servlet-name

>TurbineStreamServlet</

servlet-name

>  

<

servlet-class

>com.netflix.turbine.streaming.servlet.TurbineStreamServlet</

servlet-class

>  

</

servlet

>  

<

servlet-mapping

>  

<

servlet-name

>TurbineStreamServlet</

servlet-name

>  

<

url-pattern

>/turbine.stream</

url-pattern

>  

</

servlet-mapping

>

b:編寫config.properties配置叢集執行個體

1

2

3

4

5

6

7

8

9

#配置兩個叢集:mobil-online,ugc-online  

turbine.aggregator.clusterConfig=mobil-online,ugc-online  

#配置mobil-online叢集執行個體  

turbine.ConfigPropertyBasedDiscovery.mobil-online.instances=

10.10

.*.*,

10.10

.*.*,

10.10

.*.*,

10.10

.*.*,

10.10

.*.*,

10.10

.*.*,

10.16

.*.*,

10.16

.*.*,

10.16

.*.*,

10.16

.*.*  

#配置mobil-online資料流servlet  

turbine.instanceUrlSuffix.mobil-online=:

8080

/hystrix.stream  

#配置ugc-online叢集執行個體  

turbine.ConfigPropertyBasedDiscovery.ugc-online.instances=

10.10

.*.*,

10.10

.*.*,

10.10

.*.*,

10.10

.*.*#配置ugc-online資料流servlet  

turbine.instanceUrlSuffix.ugc-online=:

8080

/hystrix.stream

 c:使用Dashboard配置連接配接Turbine

  如下圖 :

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

五:Hystrix配置與分析

1:Hystrix 配置

1):Command 配置

Command配置源碼在HystrixCommandProperties,構造Command時通過Setter進行配置

具體配置解釋和預設值如下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

//使用指令調用隔離方式,預設:采用線程隔離,ExecutionIsolationStrategy.THREAD  

private

final

HystrixProperty<ExecutionIsolationStrategy> executionIsolationStrategy;   

//使用線程隔離時,調用逾時時間,預設:1秒  

private

final

HystrixProperty<Integer> executionIsolationThreadTimeoutInMilliseconds;   

//線程池的key,用于決定指令在哪個線程池執行  

private

final

HystrixProperty<String> executionIsolationThreadPoolKeyOverride;   

//使用信号量隔離時,指令調用最大的并發數,預設:10  

private

final

HystrixProperty<Integer> executionIsolationSemaphoreMaxConcurrentRequests;  

//使用信号量隔離時,指令fallback(降級)調用最大的并發數,預設:10  

private

final

HystrixProperty<Integer> fallbackIsolationSemaphoreMaxConcurrentRequests;   

//是否開啟fallback降級政策 預設:true   

private

final

HystrixProperty<Boolean> fallbackEnabled;   

// 使用線程隔離時,是否對指令執行逾時的線程調用中斷(Thread.interrupt())操作.預設:true  

private

final

HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout;   

// 統計滾動的時間視窗,預設:5000毫秒circuitBreakerSleepWindowInMilliseconds  

private

final

HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds;  

// 統計視窗的Buckets的數量,預設:10個,每秒一個Buckets統計  

private

final

HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets; 

// number of buckets in the statisticalWindow  

//是否開啟監控統計功能,預設:true  

private

final

HystrixProperty<Boolean> metricsRollingPercentileEnabled;   

// 是否開啟請求日志,預設:true  

private

final

HystrixProperty<Boolean> requestLogEnabled;   

//是否開啟請求緩存,預設:true  

private

final

HystrixProperty<Boolean> requestCacheEnabled; 

// Whether request caching is enabled.

2):熔斷器(Circuit Breaker)配置

Circuit Breaker配置源碼在HystrixCommandProperties,構造Command時通過Setter進行配置,每種依賴使用一個Circuit Breaker

1

2

3

4

5

6

7

8

9

10

11

12

// 熔斷器在整個統計時間内是否開啟的閥值,預設20秒。也就是10秒鐘内至少請求20次,熔斷器才發揮起作用  

private

final

HystrixProperty<Integer> circuitBreakerRequestVolumeThreshold;   

//熔斷器預設工作時間,預設:5秒.熔斷器中斷請求5秒後會進入半打開狀态,放部分流量過去重試  

private

final

HystrixProperty<Integer> circuitBreakerSleepWindowInMilliseconds;   

//是否啟用熔斷器,預設true. 啟動  

private

final

HystrixProperty<Boolean> circuitBreakerEnabled;   

//預設:50%。當出錯率超過50%後熔斷器啟動.  

private

final

HystrixProperty<Integer> circuitBreakerErrorThresholdPercentage;  

//是否強制開啟熔斷器阻斷所有請求,預設:false,不開啟  

private

final

HystrixProperty<Boolean> circuitBreakerForceOpen;   

//是否允許熔斷器忽略錯誤,預設false, 不開啟  

private

final

HystrixProperty<Boolean> circuitBreakerForceClosed;

3):指令合并(Collapser)配置

Command配置源碼在HystrixCollapserProperties,構造Collapser時通過Setter進行配置

1

2

3

4

5

6

//請求合并是允許的最大請求數,預設: Integer.MAX_VALUE  

private

final

HystrixProperty<Integer> maxRequestsInBatch;  

//批處理過程中每個指令延遲的時間,預設:10毫秒  

private

final

HystrixProperty<Integer> timerDelayInMilliseconds;  

//批處理過程中是否開啟請求緩存,預設:開啟  

private

final

HystrixProperty<Boolean> requestCacheEnabled;

4):線程池(ThreadPool)配置

1

2

3

4

5

6

7

8

9

10

11

HystrixThreadPoolProperties.Setter().withCoreSize(

int

value)  

HystrixThreadPoolProperties.Setter().withMaxQueueSize(

int

value)

2:Hystrix關鍵元件分析

 1):Hystrix流程結構解析

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

流程說明:

1:每次調用建立一個新的HystrixCommand,把依賴調用封裝在run()方法中.

2:執行execute()/queue做同步或異步調用.

3:判斷熔斷器(circuit-breaker)是否打開,如果打開跳到步驟8,進行降級政策,如果關閉進入步驟.

4:判斷線程池/隊列/信号量是否跑滿,如果跑滿進入降級步驟8,否則繼續後續步驟.

5:調用HystrixCommand的run方法.運作依賴邏輯

5a:依賴邏輯調用逾時,進入步驟8.

6:判斷邏輯是否調用成功

6a:傳回成功調用結果

6b:調用出錯,進入步驟8.

7:計算熔斷器狀态,所有的運作狀态(成功, 失敗, 拒絕,逾時)上報給熔斷器,用于統計進而判斷熔斷器狀态.

8:getFallback()降級邏輯.

  以下四種情況将觸發getFallback調用:

 (1):run()方法抛出非HystrixBadRequestException異常。

 (2):run()方法調用逾時

 (3):熔斷器開啟攔截調用

 (4):線程池/隊列/信号量是否跑滿

8a:沒有實作getFallback的Command将直接抛出異常

8b:fallback降級邏輯調用成功直接傳回

8c:降級邏輯調用失敗抛出異常

9:傳回執行成功結果

2):熔斷器:Circuit Breaker 

Circuit Breaker 流程架構和統計

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

每個熔斷器預設維護10個bucket,每秒一個bucket,每個blucket記錄成功,失敗,逾時,拒絕的狀态,

預設錯誤超過50%且10秒内超過20個請求進行中斷攔截. 

3)隔離(Isolation)分析

Hystrix隔離方式采用線程/信号的方式,通過隔離限制依賴的并發量和阻塞擴散.

(1):線程隔離

         把執行依賴代碼的線程與請求線程(如:jetty線程)分離,請求線程可以自由控制離開的時間(異步過程)。

   通過線程池大小可以控制并發量,當線程池飽和時可以提前拒絕服務,防止依賴問題擴散。

   線上建議線程池不要設定過大,否則大量堵塞線程有可能會拖慢伺服器。

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

(2):線程隔離的優缺點

線程隔離的優點:

[1]:使用線程可以完全隔離第三方代碼,請求線程可以快速放回。

[2]:當一個失敗的依賴再次變成可用時,線程池将清理,并立即恢複可用,而不是一個長時間的恢複。

[3]:可以完全模拟異步調用,友善異步程式設計。

線程隔離的缺點:

[1]:線程池的主要缺點是它增加了cpu,因為每個指令的執行涉及到排隊(預設使用SynchronousQueue避免排隊),排程和上下文切換。

[2]:對使用ThreadLocal等依賴線程狀态的代碼增加複雜性,需要手動傳遞和清理線程狀态。

NOTE: Netflix公司内部認為線程隔離開銷足夠小,不會造成重大的成本或性能的影響。

Netflix 内部API 每天100億的HystrixCommand依賴請求使用線程隔,每個應用大約40多個線程池,每個線程池大約5-20個線程。

(3):信号隔離

      信号隔離也可以用于限制并發通路,防止阻塞擴散, 與線程隔離最大不同在于執行依賴代碼的線程依然是請求線程(該線程需要通過信号申請),

   如果用戶端是可信的且可以快速傳回,可以使用信号隔離替換線程隔離,降低開銷.

線程隔離與信号隔離差別如下圖:

Hystrix 分布式系統限流、降級、熔斷架構一:為什麼需要Hystrix?二:Hystrix如何解決依賴隔離三:如何使用Hystrix四:監控平台搭建Hystrix-dashboard五:Hystrix配置與分析

 解析圖檔出自官網wiki , 更多内容請見官網: https://github.com/Netflix/Hystrix

原文連結:http://hot66hot.iteye.com/blog/2155036

繼續閱讀