閱讀源碼的過程,就像是在像武俠小說裡閱讀武功秘籍一樣,分析高手的一招一式,提煉出精髓,來增強自己的内力。
之前的文章說了一下微服務的雪崩效應和常見的解決方案,太水,沒有上代碼怎麼叫解決方案。
github
上有很多開源的庫來解決 雪崩問題
,比較出名的是 Netflix
的開源庫hystrix。集 流量控制
、 熔斷
容錯
等于一身的 java
語言的庫。今天分析的源碼庫是 hystrix-go,他是hystrix的的 go
語言版,應該是說簡化版本,用很少的代碼量實作了主要功能。很推薦朋友們有時間讀一讀。 使用簡單
hystrix
的使用是非常簡單的,同步執行,直接調用
Do
方法。
err := hystrix.Do("my_command", func() error {
// talk to other services
return nil
}, func(err error) error {
// do this when services are down
return nil
})
異步執行
Go
方法,内部實作是啟動了一個
gorouting
,如果想得到自定義方法的資料,需要你傳
channel
來處理資料,或者輸出。傳回的
error
也是一個
channel
output := make(chan bool, 1)
errors := hystrix.Go("my_command", func() error {
// talk to other services
output <- true
return nil
}, nil)
select {
case out := <-output:
// success
case err := <-errors:
// failure
大概的執行流程圖
其實方法
Do
和
Go
方法内部都是調用了
hystrix.GoC
方法,隻是
Do
方法處理了異步的過程
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
done := make(chan struct{}, 1)
r := func(ctx context.Context) error {
err := run(ctx)
if err != nil {
return err
}
done <- struct{}{}
return nil
}
f := func(ctx context.Context, e error) error {
err := fallback(ctx, e)
if err != nil {
return err
}
done <- struct{}{}
return nil
}
var errChan chan error
if fallback == nil {
errChan = GoC(ctx, name, r, nil)
} else {
errChan = GoC(ctx, name, r, f)
}
select {
case <-done:
return nil
case err := <-errChan:
return err
}
}
自定義Command配置
在調用
Do
Go
等方法之前我們可以先自定義一些配置
hystrix.ConfigureCommand("mycommand", hystrix.CommandConfig{
Timeout: int(time.Second * 3),
MaxConcurrentRequests: 100,
SleepWindow: int(time.Second * 5),
RequestVolumeThreshold: 30,
ErrorPercentThreshold: 50,
})
err := hystrix.DoC(context.Background(), "mycommand", func(ctx context.Context) error {
// ...
return nil
}, func(i context.Context, e error) error {
// ...
return e
})
我大要說了一下
CommandConfig
第個字段的意義:
- Timeout: 執行command的逾時時間。
預設時間是1000毫秒
- MaxConcurrentRequests:command的最大并發量
預設值是10
- SleepWindow:當熔斷器被打開後,SleepWindow的時間就是控制過多久後去嘗試服務是否可用了。
預設值是5000毫秒
- RequestVolumeThreshold: 一個統計視窗10秒内請求數量。達到這個請求數量後才去判斷是否要開啟熔斷。
預設值是20
- ErrorPercentThreshold:錯誤百分比,請求數量大于等于
并且錯誤率到達這個百分比後就會啟動RequestVolumeThreshold
熔斷
預設值是50
當然如果不配置他們,會使用
預設值
講完了怎麼用,接下來就是分析源碼了。我是從下層到上層的順序分析代碼和執行流程
統計控制器
每一個Command都會有一個預設統計控制器,當然也可以添加多個自定義的控制器。
預設的統計控制器
DefaultMetricCollector
儲存着
熔斷器
的所有狀态,
調用次數
,
失敗次數
被拒絕次數
等等
type DefaultMetricCollector struct {
mutex *sync.RWMutex
numRequests *rolling.Number
errors *rolling.Number
successes *rolling.Number
failures *rolling.Number
rejects *rolling.Number
shortCircuits *rolling.Number
timeouts *rolling.Number
contextCanceled *rolling.Number
contextDeadlineExceeded *rolling.Number
fallbackSuccesses *rolling.Number
fallbackFailures *rolling.Number
totalDuration *rolling.Timing
runDuration *rolling.Timing
}
最主要的還是要看一下
rolling.Number
rolling.Number
才是狀态最終儲存的地方
Number
儲存了10秒内的
Buckets
資料資訊,每一個
Bucket
的統計時長為1秒
type Number struct {
Buckets map[int64]*numberBucket
Mutex *sync.RWMutex
}
type numberBucket struct {
Value float64
}
字典字段
Buckets map[int64]*numberBucket
中的
Key
儲存的是目前時間
可能你會好奇
Number
是如何保證隻儲存10秒内的資料的。每一次對
熔斷器
的狀态進行修改時,
Number
都要先得到目前的時間(秒級)的
Bucket
不存在則建立。
func (r *Number) getCurrentBucket() *numberBucket {
now := time.Now().Unix()
var bucket *numberBucket
var ok bool
if bucket, ok = r.Buckets[now]; !ok {
bucket = &numberBucket{}
r.Buckets[now] = bucket
}
return bucket
}
修改完後去掉10秒外的資料
func (r *Number) removeOldBuckets() {
now := time.Now().Unix() - 10
for timestamp := range r.Buckets {
// TODO: configurable rolling window
if timestamp <= now {
delete(r.Buckets, timestamp)
}
}
}
比如
Increment
方法,先得到
Bucket
再删除舊的資料
func (r *Number) Increment(i float64) {
if i == 0 {
return
}
r.Mutex.Lock()
defer r.Mutex.Unlock()
b := r.getCurrentBucket()
b.Value += i
r.removeOldBuckets()
}
統計控制器是最基層和最重要的一個實作,上層所有的執行判斷都是基于他的資料進行邏輯處理的
上報執行狀态資訊
斷路器-->執行-->上報執行狀态資訊-->儲存到相應的Buckets
每一次斷路器邏輯的執行都會上報執行過程中的狀态,
// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard.
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
// ...
circuit.mutex.RLock()
o := circuit.open
circuit.mutex.RUnlock()
if eventTypes[0] == "success" && o {
circuit.setClose()
}
var concurrencyInUse float64
if circuit.executorPool.Max > 0 {
concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
}
select {
case circuit.metrics.Updates <- &commandExecution{
Types: eventTypes,
Start: start,
RunDuration: runDuration,
ConcurrencyInUse: concurrencyInUse,
}:
default:
return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
}
return nil
}
circuit.metrics.Updates
這個信道就是處理上報資訊的,上報執行狀态自信的結構是
metricExchange
,結構體很簡單隻有4個字段。要的就是
-
字段channel
他是一個有Updates
的buffer
預設的數量是channel
個,所有的狀态資訊都在他裡面2000
-
字段,就是儲存的具體的這個metricCollectors
執行過程中的各種資訊command
type metricExchange struct {
Name string
Updates chan *commandExecution
Mutex *sync.RWMutex
metricCollectors []metricCollector.MetricCollector
}
type commandExecution struct {
Types []string `json:"types"`
Start time.Time `json:"start_time"`
RunDuration time.Duration `json:"run_duration"`
ConcurrencyInUse float64 `json:"concurrency_inuse"`
}
func newMetricExchange(name string) *metricExchange {
m := &metricExchange{}
m.Name = name
m.Updates = make(chan *commandExecution, 2000)
m.Mutex = &sync.RWMutex{}
m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name)
m.Reset()
go m.Monitor()
return m
}
在執行
newMetricExchange
的時候會啟動一個協程
go m.Monitor()
去監控
Updates
的資料,然後上報給
metricCollectors
儲存執行的資訊資料比如前面提到的
調用次數
失敗次數
被拒絕次數
func (m *metricExchange) Monitor() {
for update := range m.Updates {
// we only grab a read lock to make sure Reset() isn't changing the numbers.
m.Mutex.RLock()
totalDuration := time.Since(update.Start)
wg := &sync.WaitGroup{}
for _, collector := range m.metricCollectors {
wg.Add(1)
go m.IncrementMetrics(wg, collector, update, totalDuration)
}
wg.Wait()
m.Mutex.RUnlock()
}
}
更新調用的是
go m.IncrementMetrics(wg, collector, update, totalDuration)
,裡面判斷了他的狀态
func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
// granular metrics
r := metricCollector.MetricResult{
Attempts: 1,
TotalDuration: totalDuration,
RunDuration: update.RunDuration,
ConcurrencyInUse: update.ConcurrencyInUse,
}
switch update.Types[0] {
case "success":
r.Successes = 1
case "failure":
r.Failures = 1
r.Errors = 1
case "rejected":
r.Rejects = 1
r.Errors = 1
// ...
}
// ...
collector.Update(r)
wg.Done()
}
流量控制
hystrix-go
對流量控制的代碼是很簡單的。用了一個簡單的令牌算法,能得到令牌的就可以執行後繼的工作,執行完後要返還令牌。得不到令牌就拒絕,拒絕後調用使用者設定的
callback
方法,如果沒有設定就不執行。
結構體
executorPool
就是
hystrix-go
流量控制
的具體實作。字段
Max
就是每秒最大的并發值。
type executorPool struct {
Name string
Metrics *poolMetrics
Max int
Tickets chan *struct{}
}
在建立
executorPool
的時候,會根據
Max
值來建立
令牌
。Max值如果沒有設定會使用預設值
10
func newExecutorPool(name string) *executorPool {
p := &executorPool{}
p.Name = name
p.Metrics = newPoolMetrics(name)
p.Max = getSettings(name).MaxConcurrentRequests
p.Tickets = make(chan *struct{}, p.Max)
for i := 0; i < p.Max; i++ {
p.Tickets <- &struct{}{}
}
return p
}
流量控制上報狀态
注意一下字段
Metrics
他用于統計執行數量,比如:
執行的總數量
,
最大的并發數
具體的代碼就不貼上來了。這個數量也可以顯露出,供可視化程式直覺的表現出來。
令牌使用完後是需要返還的,傳回的時候才會做上面所說的統計工作。
func (p *executorPool) Return(ticket *struct{}) {
if ticket == nil {
return
}
p.Metrics.Updates <- poolMetricsUpdate{
activeCount: p.ActiveCount(),
}
p.Tickets <- ticket
}
func (p *executorPool) ActiveCount() int {
return p.Max - len(p.Tickets)
}
一次Command的執行的流程
上面把
統計控制器
流量控制
上報執行狀态
講完了,主要的實作也就講的差不多了。最後就是串一次command的執行都經曆了啥:
err := hystrix.Do("my_command", func() error {
// talk to other services
return nil
}, func(err error) error {
// do this when services are down
return nil
})
hystrix
在執行一次command的前面也有提到過會調用
GoC
方法,下面我把代碼貼出來來,
篇幅問題去掉了一些代碼
,主要邏輯都在。就是在
判斷斷路器是否已打開
得到Ticket
得不到就限流,
執行我們自己的的方法
判斷context是否Done或者執行是否逾時
當然,每次執行結果都要
上報執行狀态
,最後要
返還Ticket
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
cmd := &command{
run: run,
fallback: fallback,
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}
//得到斷路器,不存在則建立
circuit, _, err := GetCircuit(name)
if err != nil {
cmd.errChan <- err
return cmd.errChan
}
//...
// 返還ticket
returnTicket := func() {
// ...
cmd.circuit.executorPool.Return(cmd.ticket)
}
// 上報執行狀态
reportAllEvent := func() {
err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
// ...
}
go func() {
defer func() { cmd.finished <- true }()
// 檢視斷路器是否已打開
if !cmd.circuit.AllowRequest() {
// ...
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrCircuitOpen)
reportAllEvent()
})
return
}
// ...
// 擷取ticket 如果得不到就限流
select {
case cmd.ticket = <-circuit.executorPool.Tickets:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default:
// ...
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrMaxConcurrency)
reportAllEvent()
})
return
}
// 執行我們自已的方法,并上報執行資訊
returnOnce.Do(func() {
defer reportAllEvent()
cmd.runDuration = time.Since(runStart)
returnTicket()
if runErr != nil {
cmd.errorWithFallback(ctx, runErr)
return
}
cmd.reportEvent("success")
})
}()
// 等待context是否被結束,或執行者逾時,并上報
go func() {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()
select {
case <-cmd.finished:
// returnOnce has been executed in another goroutine
case <-ctx.Done():
// ...
return
case <-timer.C:
// ...
}
}()
return cmd.errChan
}
dashboard 可視化hystrix的上報資訊
代碼中
StreamHandler
就是把所有
斷路器
的狀态以流的方式不斷的推送到dashboard. 這部分代碼我就不用說了,很簡單。
需要在你的服務端加3行代碼,啟動我們的流服務
hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)
dashboard
我使用的是
docker
版。
docker run -d -p 8888:9002 --name hystrix-dashboard mlabouardy/hystrix-dashboard:latest
在下面輸入你服務的位址,我是
http://192.168.1.67:81/hystrix.stream
如果是叢集可以使用Turbine進行監控,有時間大家自己來看吧
作者:李鵬
出處:http://www.cnblogs.com/li-peng/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。