天天看點

雪崩利器 hystrix-go 源碼分析

閱讀源碼的過程,就像是在像武俠小說裡閱讀武功秘籍一樣,分析高手的一招一式,提煉出精髓,來增強自己的内力。

之前的文章說了一下微服務的雪崩效應和常見的解決方案,太水,沒有上代碼怎麼叫解決方案。

github

上有很多開源的庫來解決

雪崩問題

,比較出名的是

Netflix

的開源庫hystrix。集

流量控制

熔斷

容錯

等于一身的

java

語言的庫。今天分析的源碼庫是 hystrix-go,他是hystrix的的

go

語言版,應該是說簡化版本,用很少的代碼量實作了主要功能。很推薦朋友們有時間讀一讀。

使用簡單

hystrix

的使用是非常簡單的,同步執行,直接調用

Do

方法。

err := hystrix.Do("my_command", func() error {
   // talk to other services
   return nil
}, func(err error) error {
   // do this when services are down
   return nil
})
           

異步執行

Go

方法,内部實作是啟動了一個

gorouting

,如果想得到自定義方法的資料,需要你傳

channel

來處理資料,或者輸出。傳回的

error

也是一個

channel

output := make(chan bool, 1)
errors := hystrix.Go("my_command", func() error {
	// talk to other services
	output <- true
	return nil
}, nil)

select {
case out := <-output:
	// success
case err := <-errors:
	// failure
           

大概的執行流程圖

雪崩利器 hystrix-go 源碼分析

其實方法

Do

Go

方法内部都是調用了

hystrix.GoC

方法,隻是

Do

方法處理了異步的過程

func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
	done := make(chan struct{}, 1)
	r := func(ctx context.Context) error {
		err := run(ctx)
		if err != nil {
			return err
		}
		done <- struct{}{}
		return nil
	}
	f := func(ctx context.Context, e error) error {
		err := fallback(ctx, e)
		if err != nil {
			return err
		}
		done <- struct{}{}
		return nil
	}
	var errChan chan error
	if fallback == nil {
		errChan = GoC(ctx, name, r, nil)
	} else {
		errChan = GoC(ctx, name, r, f)
	}

	select {
	case <-done:
		return nil
	case err := <-errChan:
		return err
	}
}
           

自定義Command配置

在調用

Do

Go

等方法之前我們可以先自定義一些配置

hystrix.ConfigureCommand("mycommand", hystrix.CommandConfig{
		Timeout:                int(time.Second * 3),
		MaxConcurrentRequests:  100,
		SleepWindow:            int(time.Second * 5),
		RequestVolumeThreshold: 30,
		ErrorPercentThreshold: 50,
	})

	err := hystrix.DoC(context.Background(), "mycommand", func(ctx context.Context) error {
		// ...
		return nil
	}, func(i context.Context, e error) error {
		// ...
		return e
	})
           

我大要說了一下

CommandConfig

第個字段的意義:

  • Timeout: 執行command的逾時時間。

    預設時間是1000毫秒

  • MaxConcurrentRequests:command的最大并發量

    預設值是10

  • SleepWindow:當熔斷器被打開後,SleepWindow的時間就是控制過多久後去嘗試服務是否可用了。

    預設值是5000毫秒

  • RequestVolumeThreshold: 一個統計視窗10秒内請求數量。達到這個請求數量後才去判斷是否要開啟熔斷。

    預設值是20

  • ErrorPercentThreshold:錯誤百分比,請求數量大于等于

    RequestVolumeThreshold

    并且錯誤率到達這個百分比後就會啟動

    熔斷

    預設值是50

當然如果不配置他們,會使用

預設值

講完了怎麼用,接下來就是分析源碼了。我是從下層到上層的順序分析代碼和執行流程

統計控制器

每一個Command都會有一個預設統計控制器,當然也可以添加多個自定義的控制器。

預設的統計控制器

DefaultMetricCollector

儲存着

熔斷器

的所有狀态,

調用次數

失敗次數

被拒絕次數

等等

type DefaultMetricCollector struct {
	mutex *sync.RWMutex

	numRequests *rolling.Number
	errors      *rolling.Number

	successes               *rolling.Number
	failures                *rolling.Number
	rejects                 *rolling.Number
	shortCircuits           *rolling.Number
	timeouts                *rolling.Number
	contextCanceled         *rolling.Number
	contextDeadlineExceeded *rolling.Number

	fallbackSuccesses *rolling.Number
	fallbackFailures  *rolling.Number
	totalDuration     *rolling.Timing
	runDuration       *rolling.Timing
}
           

最主要的還是要看一下

rolling.Number

rolling.Number

才是狀态最終儲存的地方

Number

儲存了10秒内的

Buckets

資料資訊,每一個

Bucket

的統計時長為1秒

雪崩利器 hystrix-go 源碼分析
type Number struct {
	Buckets map[int64]*numberBucket
	Mutex   *sync.RWMutex
}

type numberBucket struct {
	Value float64
}
           

字典字段

Buckets map[int64]*numberBucket

中的

Key

儲存的是目前時間

可能你會好奇

Number

是如何保證隻儲存10秒内的資料的。每一次對

熔斷器

的狀态進行修改時,

Number

都要先得到目前的時間(秒級)的

Bucket

不存在則建立。

func (r *Number) getCurrentBucket() *numberBucket {
	now := time.Now().Unix()
	var bucket *numberBucket
	var ok bool

	if bucket, ok = r.Buckets[now]; !ok {
		bucket = &numberBucket{}
		r.Buckets[now] = bucket
	}

	return bucket
}
           

修改完後去掉10秒外的資料

func (r *Number) removeOldBuckets() {
	now := time.Now().Unix() - 10

	for timestamp := range r.Buckets {
		// TODO: configurable rolling window
		if timestamp <= now {
			delete(r.Buckets, timestamp)
		}
	}
}
           

比如

Increment

方法,先得到

Bucket

再删除舊的資料

func (r *Number) Increment(i float64) {
	if i == 0 {
		return
	}

	r.Mutex.Lock()
	defer r.Mutex.Unlock()

	b := r.getCurrentBucket()
	b.Value += i
	r.removeOldBuckets()
}
           

統計控制器是最基層和最重要的一個實作,上層所有的執行判斷都是基于他的資料進行邏輯處理的

上報執行狀态資訊

斷路器-->執行-->上報執行狀态資訊-->儲存到相應的Buckets
           
雪崩利器 hystrix-go 源碼分析

每一次斷路器邏輯的執行都會上報執行過程中的狀态,

// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard.
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
	// ...
	circuit.mutex.RLock()
	o := circuit.open
	circuit.mutex.RUnlock()
	if eventTypes[0] == "success" && o {
		circuit.setClose()
	}
	var concurrencyInUse float64
	if circuit.executorPool.Max > 0 {
		concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
	}
	select {
	case circuit.metrics.Updates <- &commandExecution{
		Types:            eventTypes,
		Start:            start,
		RunDuration:      runDuration,
		ConcurrencyInUse: concurrencyInUse,
	}:
	default:
		return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
	}

	return nil
}
           

circuit.metrics.Updates

這個信道就是處理上報資訊的,上報執行狀态自信的結構是

metricExchange

,結構體很簡單隻有4個字段。要的就是

  • channel

    字段

    Updates

    他是一個有

    buffer

    channel

    預設的數量是

    2000

    個,所有的狀态資訊都在他裡面
  • metricCollectors

    字段,就是儲存的具體的這個

    command

    執行過程中的各種資訊
type metricExchange struct {
	Name    string
	Updates chan *commandExecution
	Mutex   *sync.RWMutex

	metricCollectors []metricCollector.MetricCollector
}

type commandExecution struct {
	Types            []string      `json:"types"`
	Start            time.Time     `json:"start_time"`
	RunDuration      time.Duration `json:"run_duration"`
	ConcurrencyInUse float64       `json:"concurrency_inuse"`
}

func newMetricExchange(name string) *metricExchange {
	m := &metricExchange{}
	m.Name = name

	m.Updates = make(chan *commandExecution, 2000)
	m.Mutex = &sync.RWMutex{}
	m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name)
	m.Reset()

	go m.Monitor()

	return m
}
           

在執行

newMetricExchange

的時候會啟動一個協程

go m.Monitor()

去監控

Updates

的資料,然後上報給

metricCollectors

儲存執行的資訊資料比如前面提到的

調用次數

失敗次數

被拒絕次數

func (m *metricExchange) Monitor() {
	for update := range m.Updates {
		// we only grab a read lock to make sure Reset() isn't changing the numbers.
		m.Mutex.RLock()

		totalDuration := time.Since(update.Start)
		wg := &sync.WaitGroup{}
		for _, collector := range m.metricCollectors {
			wg.Add(1)
			go m.IncrementMetrics(wg, collector, update, totalDuration)
		}
		wg.Wait()

		m.Mutex.RUnlock()
	}
}
           

更新調用的是

go m.IncrementMetrics(wg, collector, update, totalDuration)

,裡面判斷了他的狀态

func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
	// granular metrics
	r := metricCollector.MetricResult{
		Attempts:         1,
		TotalDuration:    totalDuration,
		RunDuration:      update.RunDuration,
		ConcurrencyInUse: update.ConcurrencyInUse,
	}
	switch update.Types[0] {
	case "success":
		r.Successes = 1
	case "failure":
		r.Failures = 1
		r.Errors = 1
	case "rejected":
		r.Rejects = 1
		r.Errors = 1
	// ...
	}
	// ...
	collector.Update(r)
	wg.Done()
}
           

流量控制

hystrix-go

對流量控制的代碼是很簡單的。用了一個簡單的令牌算法,能得到令牌的就可以執行後繼的工作,執行完後要返還令牌。得不到令牌就拒絕,拒絕後調用使用者設定的

callback

方法,如果沒有設定就不執行。

結構體

executorPool

就是

hystrix-go

流量控制

的具體實作。字段

Max

就是每秒最大的并發值。

type executorPool struct {
	Name    string
	Metrics *poolMetrics
	Max     int
	Tickets chan *struct{}
}
           

在建立

executorPool

的時候,會根據

Max

值來建立

令牌

。Max值如果沒有設定會使用預設值

10

func newExecutorPool(name string) *executorPool {
	p := &executorPool{}
	p.Name = name
	p.Metrics = newPoolMetrics(name)
	p.Max = getSettings(name).MaxConcurrentRequests

	p.Tickets = make(chan *struct{}, p.Max)
	for i := 0; i < p.Max; i++ {
		p.Tickets <- &struct{}{}
	}

	return p
}
           

流量控制上報狀态

注意一下字段

Metrics

他用于統計執行數量,比如:

執行的總數量

,

最大的并發數

具體的代碼就不貼上來了。這個數量也可以顯露出,供可視化程式直覺的表現出來。

令牌使用完後是需要返還的,傳回的時候才會做上面所說的統計工作。

func (p *executorPool) Return(ticket *struct{}) {
	if ticket == nil {
		return
	}

	p.Metrics.Updates <- poolMetricsUpdate{
		activeCount: p.ActiveCount(),
	}
	p.Tickets <- ticket
}

func (p *executorPool) ActiveCount() int {
	return p.Max - len(p.Tickets)
}
           

一次Command的執行的流程

上面把

統計控制器

流量控制

上報執行狀态

講完了,主要的實作也就講的差不多了。最後就是串一次command的執行都經曆了啥:

err := hystrix.Do("my_command", func() error {
	// talk to other services
	return nil
}, func(err error) error {
	// do this when services are down
	return nil
})
           

hystrix

在執行一次command的前面也有提到過會調用

GoC

方法,下面我把代碼貼出來來,

篇幅問題去掉了一些代碼

,主要邏輯都在。就是在

判斷斷路器是否已打開

得到Ticket

得不到就限流,

執行我們自己的的方法

判斷context是否Done或者執行是否逾時

當然,每次執行結果都要

上報執行狀态

,最後要

返還Ticket

func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
	cmd := &command{
		run:      run,
		fallback: fallback,
		start:    time.Now(),
		errChan:  make(chan error, 1),
		finished: make(chan bool, 1),
	}
	//得到斷路器,不存在則建立
	circuit, _, err := GetCircuit(name)
	if err != nil {
		cmd.errChan <- err
		return cmd.errChan
	}
	//...
	// 返還ticket
	returnTicket := func() {
		// ...
		cmd.circuit.executorPool.Return(cmd.ticket)
	}
	// 上報執行狀态
	reportAllEvent := func() {
		err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
		// ...
	}
	go func() {
		defer func() { cmd.finished <- true }()
		// 檢視斷路器是否已打開
		if !cmd.circuit.AllowRequest() {
			// ...
			returnOnce.Do(func() {
				returnTicket()
				cmd.errorWithFallback(ctx, ErrCircuitOpen)
				reportAllEvent()
			})
			return
		}
		// ...
		// 擷取ticket 如果得不到就限流
		select {
		case cmd.ticket = <-circuit.executorPool.Tickets:
			ticketChecked = true
			ticketCond.Signal()
			cmd.Unlock()
		default:
			// ...
			returnOnce.Do(func() {
				returnTicket()
				cmd.errorWithFallback(ctx, ErrMaxConcurrency)
				reportAllEvent()
			})
			return
		}
		// 執行我們自已的方法,并上報執行資訊
		returnOnce.Do(func() {
			defer reportAllEvent()
			cmd.runDuration = time.Since(runStart)
			returnTicket()
			if runErr != nil {
				cmd.errorWithFallback(ctx, runErr)
				return
			}
			cmd.reportEvent("success")
		})
	}()
	// 等待context是否被結束,或執行者逾時,并上報
	go func() {
		timer := time.NewTimer(getSettings(name).Timeout)
		defer timer.Stop()

		select {
		case <-cmd.finished:
			// returnOnce has been executed in another goroutine
		case <-ctx.Done():
			// ...
			return
		case <-timer.C:
			// ...
		}
	}()

	return cmd.errChan
}
           

dashboard 可視化hystrix的上報資訊

代碼中

StreamHandler

就是把所有

斷路器

的狀态以流的方式不斷的推送到dashboard. 這部分代碼我就不用說了,很簡單。

需要在你的服務端加3行代碼,啟動我們的流服務

hystrixStreamHandler := hystrix.NewStreamHandler()
	hystrixStreamHandler.Start()
	go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)
           

dashboard

我使用的是

docker

版。

docker run -d -p 8888:9002 --name hystrix-dashboard mlabouardy/hystrix-dashboard:latest
           
雪崩利器 hystrix-go 源碼分析

在下面輸入你服務的位址,我是

http://192.168.1.67:81/hystrix.stream

雪崩利器 hystrix-go 源碼分析

如果是叢集可以使用Turbine進行監控,有時間大家自己來看吧

雪崩利器 hystrix-go 源碼分析

作者:李鵬

出處:http://www.cnblogs.com/li-peng/

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。