天天看點

阿裡 雙11 同款,流量防衛兵 Sentinel go 源碼解讀

阿裡 雙11 同款,流量防衛兵 Sentinel go 源碼解讀

作者 | 于雨  apache/dubbo-go 項目負責人

本文作者系 apache/dubbo-go 項目負責人,目前在 dubbogo 項目中已内置可用 sentinel-go,如果想單獨使用可參考

在 dubbo-go 中使用 sentinel

一文,若有其他疑問可進 dubbogo社群【釘釘群 23331795】進行溝通。

導讀:本文主要分析阿裡巴巴集團開源的流量控制中間件 Sentinel,其原生支援了 Java/Go/C++ 等多種語言,本文僅僅分析其 Go 語言實作。下文如無特殊說明,sentinel 指代 Sentinel-Go。

1 基本概念 Resource  和 Rule

1.1 Resource

// ResourceType represents classification of the resources
    type ResourceType int32

    const (
        ResTypeCommon ResourceType = iota
        ResTypeWeb
        ResTypeRPC
    )

    // TrafficType describes the traffic type: Inbound or Outbound
    type TrafficType int32

    const (
        // Inbound represents the inbound traffic (e.g. provider)
        Inbound TrafficType = iota
        // Outbound represents the outbound traffic (e.g. consumer)
        Outbound
    )

    // ResourceWrapper represents the invocation
    type ResourceWrapper struct {
        // global unique resource name
        name string
        // resource classification
        classification ResourceType
        // Inbound or Outbound
        flowType TrafficType
    }           

Resource(ResourceWrapper) 存儲了應用場景 ResourceType,以及目标流控的方向 FlowType(TrafficType)。

1.2 Entry

// EntryOptions represents the options of a Sentinel resource entry.
    type EntryOptions struct {
        resourceType base.ResourceType
        entryType    base.TrafficType
        acquireCount uint32
        slotChain    *base.SlotChain
    }

    type EntryContext struct {
        entry *SentinelEntry

        // Use to calculate RT
        startTime uint64

        Resource *ResourceWrapper
        StatNode StatNode

        Input *SentinelInput
        // the result of rule slots check
        RuleCheckResult *TokenResult
    }

    type SentinelEntry struct {
        res *ResourceWrapper
        // one entry bounds with one context
        ctx *EntryContext

        sc *SlotChain
    }           

Entry 實體 SentinelEntry 關聯了 Resource(ResourceWrapper) 以及其流控規則集合 SlotChain。每個 Entry 實體有一個上下文環境 EntryContext,存儲每個 Rule 檢測時用到的一些流控參數和流控判定結果。

值得注意的是,

SentinelEntry.sc

值來自于

EntryOptions.slotChain

EntryOptions.slotChain

存儲了全局 SlotChain 對象

api/slot_chain.go:globalSlotChain

至于何為

SlotChain

,就是 sentinel 提供的所有的流控元件的集合,可以簡單地認為每個流控元件就是一個 Slot,其詳細分析見[[3.5 SlotChain]](#3.5)。

sentinel 一些變量和函數命名的可讀性較差,如

EntryOptions.acquireCount

實在無法讓人望文生義,看過函數

core/api.go:WithAcquireCount()

的注釋才明白:

EntryOptions.acquireCount

是批量動作執行次數。如有的一次 RPC 請求中調用了服務端的一個服務接口,則取值 1【也是

EntryOptions.acquireCount

的預設取值】,如果調用了服務端的 3 個服務接口,則取值 3。是以建議改名為

EntryOptions.batchCount

比較好,考慮到最小改動原則,可以在保留

core/api.go:WithAcquireCount()

的同時增加一個同樣功能的

core/api.go:WithBatchCount()

接口。相關改進已經送出到  

pr 263

1.3 Rule

type TokenCalculateStrategy int32
    const (
        Direct TokenCalculateStrategy = iota
        WarmUp
    )

    type ControlBehavior int32
    const (
        Reject ControlBehavior = iota
        Throttling
    )

    // Rule describes the strategy of flow control, the flow control strategy is based on QPS statistic metric
    type Rule struct {
        // Resource represents the resource name.
        Resource               string                 `json:"resource"`
        ControlBehavior        ControlBehavior        `json:"controlBehavior"`
        // Threshold means the threshold during StatIntervalInMs
        // If StatIntervalInMs is 1000(1 second), Threshold means QPS
        Threshold         float64          `json:"threshold"`
        MaxQueueingTimeMs uint32           `json:"maxQueueingTimeMs"`
        // StatIntervalInMs indicates the statistic interval and it's the optional setting for flow Rule.
        // If user doesn't set StatIntervalInMs, that means using default metric statistic of resource.
        // If the StatIntervalInMs user specifies can not reuse the global statistic of resource,
        //         sentinel will generate independent statistic structure for this rule.
        StatIntervalInMs uint32 `json:"statIntervalInMs"`
    }           

Rule 記錄了某 Resource 的限流判定門檻值 Threshold、限流時間視窗計時長度 StatIntervalInMs 以及 觸發限流後的判罰動作 ControlBehavior。

上面核心是 Rule 的接口 RuleCheckSlot,至于 StatSlot 則用于統計 sentinel 自身的運作 metrics。

1.4 Flow

目前章節主要分析流控中的限流(core/flow),根據流控的處理流程梳理 sentinel 整體骨架。

1.4.1 TrafficShapingController

所謂

TrafficShapingController

,顧名思義,就是 流量塑形控制器,是流控的具體實施者。

// core/flow/traffic_shaping.go

    // TrafficShapingCalculator calculates the actual traffic shaping threshold
    // based on the threshold of rule and the traffic shaping strategy.
    type TrafficShapingCalculator interface {
        CalculateAllowedTokens(acquireCount uint32, flag int32) float64
    }

    type DirectTrafficShapingCalculator struct {
        threshold float64
    }

    func (d *DirectTrafficShapingCalculator) CalculateAllowedTokens(uint32, int32) float64 {
        return d.threshold
    }           

TrafficShapingCalculator

接口用于計算限流的上限,如果不使用 warm-up 功能,可以不去深究其實作,其實體之一 DirectTrafficShapingCalculator 傳回

Rule.Threshold

【使用者設定的限流上限】。

// TrafficShapingChecker performs checking according to current metrics and the traffic
    // shaping strategy, then yield the token result.
    type TrafficShapingChecker interface {
        DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult
    }

    type RejectTrafficShapingChecker struct {
        rule  *Rule
    }

    func (d *RejectTrafficShapingChecker) DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult {
        metricReadonlyStat := d.BoundOwner().boundStat.readOnlyMetric
        if metricReadonlyStat == nil {
            return nil
        }
        curCount := float64(metricReadonlyStat.GetSum(base.MetricEventPass))
        if curCount+float64(acquireCount) > threshold {
            return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, "", d.rule, curCount)
        }
        return nil
    }           

RejectTrafficShapingChecker

依據

Rule.Threshold

判定 Resource 在目前時間視窗是否超限,其限流結果

TokenResultStatus

隻可能是 Pass 或者 Blocked。

sentinel flow 還有一個勻速限流

ThrottlingChecker

,它的目的是讓請求勻速被執行,把一個時間視窗【譬如 1s】根據 threshold 再細分為更細的微時間視窗,在每個微時間視窗最多執行一次請求,其限流結果

TokenResultStatus

隻可能是 Pass 或者 Blocked 或者 Wait,其相關意義分别為:

  • Pass:在微時間視窗内無超限,請求通過;
  • Wait:在微時間視窗内超限,被滞後若幹時間視窗執行,在這段時間内請求需要等待;
  • Blocked:在微時間視窗内超限,且等待時間超過使用者設定的最大願意等待時間長度【Rule.MaxQueueingTimeMs】,請求被拒絕。
type TrafficShapingController struct {
        flowCalculator TrafficShapingCalculator
        flowChecker    TrafficShapingChecker

        rule *Rule
        // boundStat is the statistic of current TrafficShapingController
        boundStat standaloneStatistic
    }

    func (t *TrafficShapingController) PerformChecking(acquireCount uint32, flag int32) *base.TokenResult {
        allowedTokens := t.flowCalculator.CalculateAllowedTokens(acquireCount, flag)
        return t.flowChecker.DoCheck(resStat, acquireCount, allowedTokens)
    }           

Direct + Reject

限流的場景下,這三個接口其實并無多大意義,其核心函數

TrafficShapingController.PerformChecking()

的主要流程是:

  • 1  從 TrafficShapingController.boundStat 中擷取目前 Resource 的 metrics 值【curCount】;
  • 2 如果 curCount + batchNum(acquireCount) > Rule.Threshold,則 pass,否則就 reject。

在限流場景下,

TrafficShapingController

四個成員的意義如下:

  • flowCalculator 計算限流上限;
  • flowChecker 執行限流 Check 動作;
  • rule 存儲限流規則;
  • boundStat 存儲限流的 Check 結果和時間視窗參數,作為下次限流 Check 動作判定的依據。

1.4.2 TrafficControllerMap

在執行限流判定時,需要根據 Resource 名稱擷取其對應的

TrafficShapingController

// TrafficControllerMap represents the map storage for TrafficShapingController.
   type TrafficControllerMap map[string][]*TrafficShapingController
    // core/flow/rule_manager.go
    tcMap        = make(TrafficControllerMap)           

package 級别全局私有變量 tcMap 存儲了所有的 Rule,其 key 為 Resource 名稱,value 則是與 Resource 對應的 TrafficShapingController。

使用者級别接口函數

core/flow/rule_manager.go:LoadRules()

會根據使用者定義的 Rule 構造其對應的

TrafficShapingController

存入

tcMap

,這個接口調用函數

generateStatFor(*Rule)

構造

TrafficShapingController.boundStat

限流場景下,函數

generateStatFor(*Rule)

的核心代碼如下:

func generateStatFor(rule *Rule) (*standaloneStatistic, error) {
        resNode = stat.GetOrCreateResourceNode(rule.Resource, base.ResTypeCommon)

        // default case, use the resource's default statistic
        readStat := resNode.DefaultMetric()
        retStat.reuseResourceStat = true
        retStat.readOnlyMetric = readStat
        retStat.writeOnlyMetric = nil
        return &retStat, nil
    }           

2 Metrics

Resource 的名額 Metrics 是進行 Rule 判定的基礎。

2.1 原子時間輪 AtomicBucketWrapArray

Sentinel 庫功能豐富,但無論是限流還是熔斷,其存儲基礎都是滑動時間視窗。其間包含了衆多優化:如無鎖定長時間輪。

滑動視窗實作有很多種,時間輪算法是其中一種比較簡單的實作,在時間輪算法之上可以實作多種限流方法。時間輪整體框圖如下:

阿裡 雙11 同款,流量防衛兵 Sentinel go 源碼解讀
1 BucketWrap

時間輪的最基本單元是一個桶【時間視窗】。

// BucketWrap represent a slot to record metrics
    // In order to reduce the usage of memory, BucketWrap don't hold length of BucketWrap
    // The length of BucketWrap could be seen in LeapArray.
    // The scope of time is [startTime, startTime+bucketLength)
    // The size of BucketWrap is 24(8+16) bytes
    type BucketWrap struct {
        // The start timestamp of this statistic bucket wrapper.
        BucketStart uint64
        // The actual data structure to record the metrics (e.g. MetricBucket).
        Value atomic.Value
    }           

補充:這裡之是以用指針,是因為以

BucketWrap

為基礎的

AtomicBucketWrapArray

會被多個

sentinel

流控元件使用,每個元件的流控參數不一,例如:

  • 1

    core/circuitbreaker/circuit_breaker.go:slowRtCircuitBreaker

    使用的

    slowRequestLeapArray

    的底層參數

    slowRequestCounter

// core/circuitbreaker/circuit_breaker.go
    type slowRequestCounter struct {
        slowCount  uint64
        totalCount uint64
    }           
  • 2

    core/circuitbreaker/circuit_breaker.go:errorRatioCircuitBreaker

    errorCounterLeapArray

    errorCounter

// core/circuitbreaker/circuit_breaker.go
    type errorCounter struct {
        errorCount uint64
        totalCount uint64
    }           
1.1 MetricBucket

BucketWrap 可以認作是一種 時間桶模闆,具體的桶的實體是 MetricsBucket,其定義如下:

// MetricBucket represents the entity to record metrics per minimum time unit (i.e. the bucket time span).
    // Note that all operations of the MetricBucket are required to be thread-safe.
    type MetricBucket struct {
        // Value of statistic
        counter [base.MetricEventTotal]int64
        minRt   int64
    }           

MetricBucket 存儲了五種類型的 metric:

// There are five events to record
    // pass + block == Total
    const (
        // sentinel rules check pass
        MetricEventPass MetricEvent = iota
        // sentinel rules check block
        MetricEventBlock

        MetricEventComplete
        // Biz error, used for circuit breaker
        MetricEventError
        // request execute rt, unit is millisecond
        MetricEventRt
        // hack for the number of event
        MetricEventTotal
    )           
2 AtomicBucketWrapArray

每個桶隻記錄了其起始時間和 metric 值,至于每個桶的時間視窗長度這種公共值則統一記錄在 AtomicBucketWrapArray 内,AtomicBucketWrapArray 定義如下:

// atomic BucketWrap array to resolve race condition
    // AtomicBucketWrapArray can not append or delete element after initializing
    type AtomicBucketWrapArray struct {
        // The base address for real data array
        base unsafe.Pointer
        // The length of slice(array), it can not be modified.
        length int
        data   []*BucketWrap
    }           

AtomicBucketWrapArray.base 的值是 AtomicBucketWrapArray.data slice 的 data 區域的首指針。因為 AtomicBucketWrapArray.data 是一個固定長度的 slice,是以 AtomicBucketWrapArray.base 直接存儲資料記憶體區域的首位址,以加速通路速度。

其次,AtomicBucketWrapArray.data 中存儲的是 BucketWrap 的指針,而不是 BucketWrap。

NewAtomicBucketWrapArrayWithTime() 函數會預熱一下,把所有的時間桶都生成出來。

2.2 時間輪

1 leapArray
// Give a diagram to illustrate
    // Suppose current time is 888, bucketLengthInMs is 200ms,
    // intervalInMs is 1000ms, LeapArray will build the below windows
    //   B0       B1      B2     B3      B4
    //   |_______|_______|_______|_______|_______|
    //  1000    1200    1400    1600    800    (1000)
    //                                        ^
    //                                      time=888
    type LeapArray struct {
        bucketLengthInMs uint32
        sampleCount      uint32
        intervalInMs     uint32
        array            *AtomicBucketWrapArray
        // update lock
        updateLock mutex
    }           

LeapArray 各個成員解析:

  • bucketLengthInMs 是漏桶長度,以毫秒為機關;
  • sampleCount 則是時間漏桶個數;
  • intervalInMs 是時間視窗長度,以毫秒為機關。

其注釋中的 ASCII 圖很好地解釋了每個字段的含義。

LeapArray

核心函數是

LeapArray.currentBucketOfTime()

,其作用是根據某個時間點擷取其做對應的時間桶

BucketWrap

,代碼如下:

func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
        if now <= 0 {
            return nil, errors.New("Current time is less than 0.")
        }

        idx := la.calculateTimeIdx(now)
        bucketStart := calculateStartTime(now, la.bucketLengthInMs)

        for { //spin to get the current BucketWrap
            old := la.array.get(idx)
            if old == nil {
                // because la.array.data had initiated when new la.array
                // theoretically, here is not reachable
                newWrap := &BucketWrap{
                    BucketStart: bucketStart,
                    Value:       atomic.Value{},
                }
                newWrap.Value.Store(bg.NewEmptyBucket())
                if la.array.compareAndSet(idx, nil, newWrap) {
                    return newWrap, nil
                } else {
                    runtime.Gosched()
                }
            } else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
                return old, nil
            } else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
                // current time has been next cycle of LeapArray and LeapArray dont't count in last cycle.
                // reset BucketWrap
                if la.updateLock.TryLock() {
                    old = bg.ResetBucketTo(old, bucketStart)
                    la.updateLock.Unlock()
                    return old, nil
                } else {
                    runtime.Gosched()
                }
            } else if bucketStart < atomic.LoadUint64(&old.BucketStart) {
                // TODO: reserve for some special case (e.g. when occupying "future" buckets).
                return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
            }
        }
    }           

其 for-loop 核心邏輯是:

  • 1 擷取時間點對應的時間桶 old;
  • 2 如果 old 為空,則建立一個時間桶,以原子操作的方式嘗試存入時間視窗的時間輪中,存入失敗則重新嘗試;
  • 3 如果 old 就是目前時間點所在的時間桶,則傳回;
  • 4 如果 old 的時間起點小于目前時間,則通過樂觀鎖嘗試 reset 桶的起始時間等參數值,加鎖更新成功則傳回;
  • 5 如果 old 的時間起點大于目前時間,則系統發生了時間扭曲,傳回錯誤。
2 BucketLeapArray

leapArray 實作了滑動時間視窗的所有主體,其對外使用接口則是 BucketLeapArray:

// The implementation of sliding window based on LeapArray (as the sliding window infrastructure)
    // and MetricBucket (as the data type). The MetricBucket is used to record statistic
    // metrics per minimum time unit (i.e. the bucket time span).
    type BucketLeapArray struct {
        data     LeapArray
        dataType string
    }           

從這個 struct 的注釋可見,其時間視窗 BucketWrap 的實體是 MetricBucket。

2.3 Metric 資料讀寫

SlidingWindowMetric
// SlidingWindowMetric represents the sliding window metric wrapper.
    // It does not store any data and is the wrapper of BucketLeapArray to adapt to different internal bucket
    // SlidingWindowMetric is used for SentinelRules and BucketLeapArray is used for monitor
    // BucketLeapArray is per resource, and SlidingWindowMetric support only read operation.
    type SlidingWindowMetric struct {
        bucketLengthInMs uint32
        sampleCount      uint32
        intervalInMs     uint32
        real             *BucketLeapArray
    }           

SlidingWindowMetric 是對 BucketLeapArray 的一個封裝,隻提供了隻讀接口。

ResourceNode
type BaseStatNode struct {
        sampleCount uint32
        intervalMs  uint32

        goroutineNum int32

        arr    *sbase.BucketLeapArray
        metric *sbase.SlidingWindowMetric
    }

    type ResourceNode struct {
        BaseStatNode

        resourceName string
        resourceType base.ResourceType
    }

    // core/stat/node_storage.go
    type ResourceNodeMap map[string]*ResourceNode
    var (
        inboundNode = NewResourceNode(base.TotalInBoundResourceName, base.ResTypeCommon)

        resNodeMap = make(ResourceNodeMap)
        rnsMux     = new(sync.RWMutex)
    )           

BaseStatNode 對外提供了讀寫接口,其資料寫入 BaseStatNode.arr,讀取接口則依賴 BaseStatNode.metric。

BaseStatNode.arr

是在

NewBaseStatNode()

中建立的,指針

SlidingWindowMetric.real

也指向它。

ResourceNode

則顧名思義,其代表了某資源和它的 Metrics 存儲  

ResourceNode.BaseStatNode

全局變量

resNodeMap

存儲了所有資源的 Metrics 名額資料。

3 限流流程

本節隻分析 Sentinel 庫提供的最基礎的流量整形功能 -- 限流,限流算法多種多樣,可以使用其内置的算法,使用者自己也可以進行擴充。

限流過程有三步步驟:

  • 1 針對特定 Resource 構造其 EntryContext,存儲其 Metrics、限流開始時間等,Sentinel 稱之為 StatPrepareSlot;
  • 2 依據 Resource 的限流算法判定其是否應該進行限流,并給出限流判定結果,Sentinel 稱之為 RuleCheckSlot;
    • 補充:這個限流算法是一系列判斷方法的合集(SlotChain);
  • 3 判定之後,除了使用者自身根據判定結果執行相應的 action,Sentinel 也需要根據判定結果執行自身的 Action,以及把整個判定流程所使用的的時間 RT 等名額存儲下來,Sentinel 稱之為 StatSlot。

整體流程如下圖所示:

阿裡 雙11 同款,流量防衛兵 Sentinel go 源碼解讀

3.1 Slot

針對 Check 三個步驟,有三個對應的 Slot 分别定義如下:

// StatPrepareSlot is responsible for some preparation before statistic
    // For example: init structure and so on
    type StatPrepareSlot interface {
        // Prepare function do some initialization
        // Such as: init statistic structure、node and etc
        // The result of preparing would store in EntryContext
        // All StatPrepareSlots execute in sequence
        // Prepare function should not throw panic.
        Prepare(ctx *EntryContext)
    }

    // RuleCheckSlot is rule based checking strategy
    // All checking rule must implement this interface.
    type RuleCheckSlot interface {
        // Check function do some validation
        // It can break off the slot pipeline
        // Each TokenResult will return check result
        // The upper logic will control pipeline according to SlotResult.
        Check(ctx *EntryContext) *TokenResult
    }

    // StatSlot is responsible for counting all custom biz metrics.
    // StatSlot would not handle any panic, and pass up all panic to slot chain
    type StatSlot interface {
        // OnEntryPass function will be invoked when StatPrepareSlots and RuleCheckSlots execute pass
        // StatSlots will do some statistic logic, such as QPS、log、etc
        OnEntryPassed(ctx *EntryContext)
        // OnEntryBlocked function will be invoked when StatPrepareSlots and RuleCheckSlots fail to execute
        // It may be inbound flow control or outbound cir
        // StatSlots will do some statistic logic, such as QPS、log、etc
        // blockError introduce the block detail
        OnEntryBlocked(ctx *EntryContext, blockError *BlockError)
        // OnCompleted function will be invoked when chain exits.
        // The semantics of OnCompleted is the entry passed and completed
        // Note: blocked entry will not call this function
        OnCompleted(ctx *EntryContext)
    }           

抛卻 Prepare 和 Stat,可以簡單的認為:所謂的 slot,就是 sentinel 提供的某個流控元件。

值得注意的是,根據注釋 StatSlot.OnCompleted 隻有在 RuleCheckSlot.Check 通過才會執行,用于計算從請求開始到結束所使用的 RT 等 Metrics。

3.2 Prepare

// core/base/slot_chain.go
    // StatPrepareSlot is responsible for some preparation before statistic
    // For example: init structure and so on
    type StatPrepareSlot interface {
        // Prepare function do some initialization
        // Such as: init statistic structure、node and etc
        // The result of preparing would store in EntryContext
        // All StatPrepareSlots execute in sequence
        // Prepare function should not throw panic.
        Prepare(ctx *EntryContext)
    }

    // core/stat/stat_prepare_slot.go
    type ResourceNodePrepareSlot struct {
    }

    func (s *ResourceNodePrepareSlot) Prepare(ctx *base.EntryContext) {
        node := GetOrCreateResourceNode(ctx.Resource.Name(), ctx.Resource.Classification())
        // Set the resource node to the context.
        ctx.StatNode = node
    }           

如前面解釋,Prepare 主要是構造存儲 Resource Metrics 所使用的 ResourceNode。所有 Resource 的 StatNode 都會存儲在 package 級别的全局變量

core/stat/node_storage.go:resNodeMap [type: map[string]*ResourceNode]

中,函數

GetOrCreateResourceNode

用于根據 Resource Name 從

resNodeMap

中擷取其對應的 StatNode,如果不存在則建立一個 StatNode 并存入

resNodeMap

3.3 Check

RuleCheckSlot.Check() 執行流程:

  • 1 根據 Resource 名稱擷取其所有的 Rule 集合;
  • 2 周遊 Rule 集合,對 Resource 依次執行 Check,任何一個 Rule 判定 Resource 需要進行限流【Blocked】則傳回,否則放行。
type Slot struct {
    }

    func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
        res := ctx.Resource.Name()
        tcs := getTrafficControllerListFor(res)
        result := ctx.RuleCheckResult

        // Check rules in order
        for _, tc := range tcs {
            r := canPassCheck(tc, ctx.StatNode, ctx.Input.AcquireCount)
            if r == nil {
                // nil means pass
                continue
            }
            if r.Status() == base.ResultStatusBlocked {
                return r
            }
            if r.Status() == base.ResultStatusShouldWait {
                if waitMs := r.WaitMs(); waitMs > 0 {
                    // Handle waiting action.
                    time.Sleep(time.Duration(waitMs) * time.Millisecond)
                }
                continue
            }
        }
        return result
    }

    func canPassCheck(tc *TrafficShapingController, node base.StatNode, acquireCount uint32) *base.TokenResult {
        return canPassCheckWithFlag(tc, node, acquireCount, 0)
    }

    func canPassCheckWithFlag(tc *TrafficShapingController, node base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {
        return checkInLocal(tc, node, acquireCount, flag)
    }

    func checkInLocal(tc *TrafficShapingController, resStat base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {
        return tc.PerformChecking(resStat, acquireCount, flag)
    }           

3.4 Exit

sentinel 對 Resource 進行 Check 後,其後續邏輯執行順序是:

  • 1 如果 RuleCheckSlot.Check() 判定 pass 通過則執行 StatSlot.OnEntryPassed(),否則 RuleCheckSlot.Check() 判定 reject 則執行 StatSlot.OnEntryBlocked();
  • 2 如果 RuleCheckSlot.Check() 判定 pass 通過,則執行本次 Action;
  • 3 如果 RuleCheckSlot.Check() 判定 pass 通過,則執行 SentinelEntry.Exit() --> SlotChain.ext() --> StatSlot.OnCompleted() 。

第三步驟的調用鍊路如下:

StatSlot.OnCompleted()
// core/flow/standalone_stat_slot.go
    type StandaloneStatSlot struct {
    }

    func (s StandaloneStatSlot) OnEntryPassed(ctx *base.EntryContext) {
        res := ctx.Resource.Name()
        for _, tc := range getTrafficControllerListFor(res) {
            if !tc.boundStat.reuseResourceStat {
                if tc.boundStat.writeOnlyMetric != nil {
                    tc.boundStat.writeOnlyMetric.AddCount(base.MetricEventPass, int64(ctx.Input.AcquireCount))
                }
            }
        }
    }

    func (s StandaloneStatSlot) OnEntryBlocked(ctx *base.EntryContext, blockError *base.BlockError) {
        // Do nothing
    }

    func (s StandaloneStatSlot) OnCompleted(ctx *base.EntryContext) {
        // Do nothing
    }           
SlotChain.exit()
// core/base/slot_chain.go
    type SlotChain struct {
    }

    func (sc *SlotChain) exit(ctx *EntryContext) {
        // The OnCompleted is called only when entry passed
        if ctx.IsBlocked() {
            return
        }
        for _, s := range sc.stats {
            s.OnCompleted(ctx)
        }
    }           
SentinelEntry.Exit()
// core/base/entry.go
    type SentinelEntry struct {
        sc *SlotChain
        exitCtl sync.Once
    }

    func (e *SentinelEntry) Exit() {
        e.exitCtl.Do(func() {
            if e.sc != nil {
                e.sc.exit(ctx)
            }
        })
    }           

從上面執行可見,

StatSlot.OnCompleted()

是在 Action 【如一次 RPC 的請求-響應 Invokation】完成之後調用的。如果有的元件需要計算一次 Action 的時間耗費  RT,就在其對應的

StatSlot.OnCompleted()

中依據

EntryContext.startTime

完成時間耗費計算。

[3.5 SlotChain]()

Sentinel 本質是一個流控包,不僅提供了限流功能,還提供了衆多其他諸如自适應流量保護、熔斷降級、冷啟動、全局流量 Metrics 結果等功能流控元件,Sentinel-Go 包定義了一個

SlotChain

實體存儲其所有的流控元件。

// core/base/slot_chain.go

    // SlotChain hold all system slots and customized slot.
    // SlotChain support plug-in slots developed by developer.
    type SlotChain struct {
        statPres   []StatPrepareSlot
        ruleChecks []RuleCheckSlot
        stats      []StatSlot
    }

    // The entrance of slot chain
    // Return the TokenResult and nil if internal panic.
    func (sc *SlotChain) Entry(ctx *EntryContext) *TokenResult {
        // execute prepare slot
        sps := sc.statPres
        if len(sps) > 0 {
            for _, s := range sps {
                s.Prepare(ctx)
            }
        }

        // execute rule based checking slot
        rcs := sc.ruleChecks
        var ruleCheckRet *TokenResult
        if len(rcs) > 0 {
            for _, s := range rcs {
                sr := s.Check(ctx)
                if sr == nil {
                    // nil equals to check pass
                    continue
                }
                // check slot result
                if sr.IsBlocked() {
                    ruleCheckRet = sr
                    break
                }
            }
        }
        if ruleCheckRet == nil {
            ctx.RuleCheckResult.ResetToPass()
        } else {
            ctx.RuleCheckResult = ruleCheckRet
        }

        // execute statistic slot
        ss := sc.stats
        ruleCheckRet = ctx.RuleCheckResult
        if len(ss) > 0 {
            for _, s := range ss {
                // indicate the result of rule based checking slot.
                if !ruleCheckRet.IsBlocked() {
                    s.OnEntryPassed(ctx)
                } else {
                    // The block error should not be nil.
                    s.OnEntryBlocked(ctx, ruleCheckRet.blockErr)
                }
            }
        }
        return ruleCheckRet
    }

    func (sc *SlotChain) exit(ctx *EntryContext) {
        if ctx == nil || ctx.Entry() == nil {
            logging.Error(errors.New("nil EntryContext or SentinelEntry"), "")
            return
        }
        // The OnCompleted is called only when entry passed
        if ctx.IsBlocked() {
            return
        }
        for _, s := range sc.stats {
            s.OnCompleted(ctx)
        }
        // relieve the context here
    }           

建議:Sentinel 包針對某個 Resource 無法确知其使用了那個元件,在運作時會針對某個 Resource 的 EntryContext 依次執行所有的元件的 Rule。Sentinel-golang 為何不給使用者相關使用者提供一個接口讓其設定使用的流控元件集合,以減少下面函數

SlotChain.Entry()

中執行

RuleCheckSlot.Check()

執行次數?相關改進已經送出到

pr 264

【補充,代碼已合并,據負責人壓測後回複 sentinel-go 效率整體提升 15%】。

globalSlotChain

Sentinel-Go 定義了一個 SlotChain 的 package 級别的全局私有變量

globalSlotChain

用于存儲其所有的流控元件對象。相關代碼示例如下。因本文隻關注限流元件,是以下面隻給出了限流元件的注冊代碼。

// api/slot_chain.go

    func BuildDefaultSlotChain() *base.SlotChain {
        sc := base.NewSlotChain()
        sc.AddStatPrepareSlotLast(&stat.ResourceNodePrepareSlot{})

        sc.AddRuleCheckSlotLast(&flow.Slot{})

        sc.AddStatSlotLast(&flow.StandaloneStatSlot{})

        return sc
    }

    var globalSlotChain = BuildDefaultSlotChain()           
Entry

在 Sentinel-Go 對外的最重要的入口函數

api/api.go:Entry()

中,

globalSlotChain

會作為 EntryOptions 的 SlotChain 參數被使用。

// api/api.go

    // Entry is the basic API of Sentinel.
    func Entry(resource string, opts ...EntryOption) (*base.SentinelEntry, *base.BlockError) {
        options := entryOptsPool.Get().(*EntryOptions)
        options.slotChain = globalSlotChain

        return entry(resource, options)
    }           

Sentinel 的演進離不開社群的貢獻。Sentinel Go 1.0 GA 版本即将在近期釋出,帶來更多雲原生相關的特性。我們非常歡迎感興趣的開發者參與貢獻,一起來主導未來版本的演進。我們鼓勵任何形式的貢獻,包括但不限于:

• bug fix

• new features/improvements

• dashboard

• document/website

• test cases

開發者可以在 GitHub 上面的 good first issue 清單上挑選感興趣的 issue 來參與讨論和貢獻。我們會重點關注積極參與貢獻的開發者,核心貢獻者會提名為 Committer,一起主導社群的發展。我們也歡迎大家有任何問題和建議,都可以通過 GitHub issue、Gitter 或釘釘群(群号:30150716)等管道進行交流。Now start hacking!

• Sentinel Go repo:

https://github.com/alibaba/sentinel-golang

• 企業使用者歡迎進行登記:

https://github.com/alibaba/Sentinel/issues/18

作者簡介

于雨(github @AlexStocks),apache/dubbo-go 項目負責人,一個有十多年服務端基礎架構研發一線工作經驗的程式員,目前在螞蟻金服可信原生部從事容器編排和 service mesh 工作。熱愛開源,從 2015 年給 Redis 貢獻代碼開始,陸續改進過 Muduo/Pika/Dubbo/Dubbo-go 等知名項目。

阿裡巴巴雲原生 關注微服務、Serverless、容器、Service Mesh 等技術領域、聚焦雲原生流行技術趨勢、雲原生大規模的落地實踐,做最懂雲原生開發者的公衆号。”

繼續閱讀