上一篇文章一起學習了Resolver的原理和源碼分析,本篇繼續和大家一起學習下和Resolver關系密切的Balancer的相關内容。這裡說的負載均衡主要指資料中心内的負載均衡,即RPC間的負載均衡。
傳送門 服務發現原理分析與源碼解讀
基于go-zero v1.3.5 和 grpc-go v1.47.0
負載均衡
每一個被調用服務都會有多個執行個體,那麼服務的調用方應該将請求,發向被調用服務的哪一個服務執行個體,這就是負載均衡的業務場景。
負載均衡的第一個關鍵點是公平性,即負載均衡需要關注被調用服務執行個體組之間的公平性,不要出現旱的旱死,澇的澇死的情況。
負載均衡的第二個關鍵點是正确性,即對于有狀态的服務來說,負載均衡需要關心請求的狀态,将請求排程到能處理它的後端執行個體上,不要出現不能處理和錯誤處理的情況。
無狀态的負載均衡
無狀态的負載均衡是我們日常工作中接觸比較多的負載均衡模型,它指的是參與負載均衡的後端執行個體是無狀态的,所有的後端執行個體都是對等的,一個請求不論發向哪一個執行個體,都會得到相同的并且正确的處理結果,是以無狀态的負載均衡政策不需要關心請求的狀态。下面介紹兩種無狀态負載均衡算法。
輪詢
輪詢的負載均衡政策非常簡單,隻需要将請求按順序配置設定給多個執行個體,不用再做其他的處理。例如,輪詢政策會将第一個請求配置設定給第一個執行個體,然後将下一個請求配置設定給第二個執行個體,這樣依次配置設定下去,配置設定完一輪之後,再回到開頭配置設定給第一個執行個體,再依次配置設定。輪詢在路由時,不利用請求的狀态資訊,屬于無狀态的負載均衡政策,是以它不能用于有狀态執行個體的負載均衡器,否則正确性會出現問題。在公平性方面,因為輪詢政策隻是按順序配置設定請求,是以适用于請求的工作負載和執行個體的處理能力差異都較小的情況。
權重輪詢
權重輪詢的負載均衡政策是将每一個後端執行個體配置設定一個權重,配置設定請求的數量和執行個體的權重成正比輪詢。例如有兩個執行個體 A,B,假設我們設定 A 的權重為 20,B 的權重為 80,那麼負載均衡會将 20% 的請求數量配置設定給 A,80 % 的請求數量配置設定給 B。權重輪詢在路由時,不利用請求的狀态資訊,屬于無狀态的負載均衡政策,是以它也不能用于有狀态執行個體的負載均衡器,否則正确性會出現問題。在公平性方面,因為權重政策會按執行個體的權重比例來配置設定請求數,是以,我們可以利用它解決執行個體的處理能力差異的問題,認為它的公平性比輪詢政策要好。
有狀态負載均衡
有狀态負載均衡是指,在負載均衡政策中會儲存服務端的一些狀态,然後根據這些狀态按照一定的算法選擇出對應的執行個體。
P2C+EWMA
在go-zero中預設使用的是P2C的負載均衡算法。該算法的原理比較簡單,即随機從所有可用節點中選擇兩個節點,然後計算這兩個節點的負載情況,選擇負載較低的一個節點來服務本次請求。為了避免某些節點一直得不到選擇導緻不平衡,會在超過一定的時間後強制選擇一次。
在該複雜均衡算法中,多出采用了EWMA指數移動權重平均的算法,表示是一段時間内的均值。該算法相對于算數平均來說對于突然的網絡抖動沒有那麼敏感,突然的抖動不會展現在請求的lag中,進而可以讓算法更加均衡。
go-zero/zrpc/internal/balancer/p2c/p2c.go:133
atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w)))
go-zero/zrpc/internal/balancer/p2c/p2c.go:139
atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w)))
系數w是一個時間衰減值,即兩次請求的間隔越大,則系數w就越小。
go-zero/zrpc/internal/balancer/p2c/p2c.go:124
w := math.Exp(float64(-td) / float64(decayTime))
節點的load值是通過該連接配接的請求延遲 lag 和目前請求數 inflight 的乘積所得,如果請求的延遲越大或者目前正在處理的請求數越多表明該節點的負載越高。
go-zero/zrpc/internal/balancer/p2c/p2c.go:199
func (c *subConn) load() int64 {
// plus one to avoid multiply zero
lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1)))
load := lag * (atomic.LoadInt64(&c.inflight) + 1)
if load == 0 {
return penalty
}
return
源碼分析
如下源碼會涉及go-zero和gRPC,請根據給出的代碼路徑進行區分
在gRPC中,Balancer和Resolver一樣也可以自定義,同樣也是通過Register方法進行注冊
grpc-go/balancer/balancer.go:53
func Register(b Builder)
Register的參數Builder為接口,在Builder接口中,Build方法的第一個參數ClientConn也為接口,Build方法的傳回值Balancer同樣也是接口,定義如下:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICN4ETMfdHLkVGepZ2XtxSZ6l2clJ3LcBnYldHL0FWby9mZvwVPrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdsAjMfd3bkFGazxCMx8VesATMfhHLlN3XnxCMz8FdsYkRGZkRG9lcvx2bjxSa2EWNhJTW1AlUxEFeVRUUfRHelRHL2EzXlpXazxyayFWbyVGdhd3LcV2Zh1Wa9M3clN2byBXLzN3btg3PwJWZ35yNzUjNykzN5QDM1ITO3YjZyYzX3MDMwADMzEzLchDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.webp)
可以看出,要想實作自定義的Balancer的話,就必須要實作balancer.Builder接口。
在了解了gRPC提供的Balancer的注冊方式之後,我們看一下go-zero是在什麼地方進行Balancer注冊的
go-zero/zrpc/internal/balancer/p2c/p2c.go:36
func init()
在go-zero中并沒有實作 balancer.Builder 接口,而是使用gRPC提供的 base.baseBuilder 進行注冊,base.baseBuilder 實作了balancer.Builder 接口。建立baseBuilder的時候調用了 base.NewBalancerBuilder 方法,需要傳入 PickerBuilder 參數,PickerBuilder為接口,在go-zero中 p2c.p2cPickerBuilder 實作了該接口。
PickerBuilder接口Build方法傳回值 balancer.Picker 也是一個接口,p2c.p2cPicker 實作了該接口。
grpc-go/balancer/base/base.go:65
func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder {
return
各結構之間的關系如下圖所示,其中各結構子產品對應的包為:
- balancer:grpc-go/balancer
- base:grpc-go/balancer/base
- p2c: go-zero/zrpc/internal/balancer/p2c
在哪裡擷取已注冊的Balancer?
通過上面的流程步驟,已經知道了如何自定義Balancer,以及如何注冊自定義的Blancer。既然注冊了肯定就會擷取,接下來看一下是在哪裡擷取已經注冊的Balancer的。
我們知道Resolver是通過解析DialContext的第二個參數target,進而得到Resolver的name,然後根據name擷取到對應的Resolver的。擷取Balancer同樣也是根據名稱,Balancer的名稱是在建立gRPC Client的時候通過配置項傳入的,這裡的p2c.Name為注冊Balancer時指定的名稱 p2c_ewma ,如下:
go-zero/zrpc/internal/client.go:50
func NewClient(target string, opts ...ClientOption) (Client, error) {
var cli client
svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, p2c.Name)
balancerOpt := WithDialOption(grpc.WithDefaultServiceConfig(svcCfg))
opts = append([]ClientOption{balancerOpt}, opts...)
if err := cli.dial(target, opts...); err != nil {
return nil, err
}
return &cli, nil
在上一篇文章中,我們已經知道當建立gRPC用戶端的時候,會觸發調用自定義Resolver的Build方法,在Build方法内部擷取到服務位址清單後,通過cc.UpdateState方法進行狀态更新,後面當監聽到服務狀态變化的時候同樣也會調用cc.UpdateState進行狀态的更新,而這裡的cc指的就是 ccResolverWrapper 對象,這一部分如果忘記的話,可以再去回顧一下講解Resolver的那篇文章,以便能絲滑接入本篇:
go-zero/zrpc/resolver/internal/kubebuilder.go:51
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
}); err != nil
這裡有幾個重要的子產品對象,如下:
- ClientConn:grpc-go/clientconn.go:464
- ccResolverWrapper:grpc-go/resolver_conn_wrapper.go:36
- ccBalancerWrapper:grpc-go/balancer_conn_wrappers.go:48
- Balancer:grpc-go/internal/balancer/gracefulswitch/gracefulswitch.go:46
- balancerWrapper:grpc-go/internal/balancer/gracefulswitch/gracefulswitch.go:247
當監聽到服務狀态的變更後(首次啟動或者通過Watch監聽變化)調用 ccResolverWrapper.UpdateState 觸發更新狀态的流程,各子產品間的調用鍊路如下所示:
擷取Balancer的動作是在 ccBalancerWrapper.handleSwitchTo 方法中觸發的,代碼如下所示:
grpc-go/balancer_conn_wrappers.go:266
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
}
if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
return
然後在 Balancer.SwitchTo 方法中,調用了自定義Balancer的Build方法:
grpc-go/internal/balancer/gracefulswitch/gracefulswitch.go:121
newBalancer := builder.Build(bw, gsb.bOpts)
上文有提到Build方法的第一個參數為接口 balancer.ClientConn ,而這裡傳入的為 balancerWrapper ,是以gracefulswitch.balancerWrapper實作了該接口:
到這裡我們已經知道了擷取自定義Balancer是在哪裡觸達的,以及在哪裡擷取的自定義的Balancer,和balancer.Builder的Build方法在哪裡被調用。
通過上文可知這裡的balancer.Builder為baseBuilder,是以調用的Build方法為baseBuilder的Build方法,Build方法的定義如下:
grpc-go/balancer/base/balancer.go:39
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bal := &baseBalancer{
cc: cc,
pickerBuilder: bb.pickerBuilder,
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
}
bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
return
Build方法傳回了baseBalancer,可以知道baseBalancer實作了balancer.Balancer接口:
再來回顧下這個流程,其實主要做了如下幾件事:
- 在自定義的Resolver中監聽服務狀态的變更
- 通過UpdateState來更新狀态
- 擷取自定義的Balancer
- 執行自定義Balancer的Build方法擷取Balancer
如何建立連接配接?
繼續回到ClientConn的updateResolverState方法,在方法的最後調用balancerWrapper.updateClientConnState方法更新用戶端的連接配接狀态:
grpc-go/clientconn.go:664
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
// currently meaningless to the caller.
後面的調用鍊路如下圖所示:
最終會調用baseBalancer.UpdateClientConnState方法:
grpc-go/balancer/base/balancer.go:94
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// .............
b.resolverErr = nil
addrsSet := resolver.NewAddressMap()
for _, a := range s.ResolverState.Addresses {
addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns.Set(a, sc)
b.scStates[sc] = connectivity.Idle
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
}
}
for _, a := range b.subConns.Keys() {
sci, _ := b.subConns.Get(a)
sc := sci.(balancer.SubConn)
if _, ok := addrsSet.Get(a); !ok {
b.cc.RemoveSubConn(sc)
b.subConns.Delete(a)
}
}
// ................
當第一次觸發調用UpdateClientConnState的時候,如下代碼中 ok 為 false:
_, ok := b.subConns.Get(a);
是以會建立新的連接配接:
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
這裡的 b.cc 即為 balancerWrapper,忘記的盆友可以往上翻看複習一下,也就是會調用 balancerWrapper.NewSubConn建立連接配接
grpc-go/internal/balancer/gracefulswitch/gracefulswitch.go:328
func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
// .............
sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
// .............
bw.subconns[sc] = true
// .............
bw.gsb.cc即為ccBalancerWrapper,是以這裡會調用ccBalancerWrapper.NewSubConn建立連接配接:
grpc-go/balancer_conn_wrappers.go:299
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if len(addrs) <= 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
if err != nil {
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
return acbw, nil
最終傳回的是acBalancerWrapper對象,acBalancerWrapper實作了balancer.SubConn接口:
調用流程圖如下所示:
建立連接配接的預設狀态為 connectivity.Idle :
grpc-go/clientconn.go:699
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: addrs,
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
resetBackoff: make(chan struct{}),
}
// ...........
在gRPC中為連接配接定義了五種狀态,分别如下:
const (
// Idle indicates the ClientConn is idle.
Idle State = iota
// Connecting indicates the ClientConn is connecting.
Connecting
// Ready indicates the ClientConn is ready for work.
Ready
// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
TransientFailure
// Shutdown indicates the ClientConn has started shutting down.
在 **baseBalancer ** 中通過b.scStates儲存建立的連接配接,初始狀态也為connectivity.Idle,之後通過sc.Connect()進行連接配接:
grpc-go/balancer/base/balancer.go:112
b.subConns.Set(a, sc)
b.scStates[sc] = connectivity.Idle
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
這裡sc.Connetc調用的是acBalancerWrapper的Connect方法,可以看到這裡建立連接配接是異步進行的:
grpc-go/balancer_conn_wrappers.go:406
func (acbw *acBalancerWrapper) Connect() {
acbw.mu.Lock()
defer acbw.mu.Unlock()
go
最後會調用addrConn.connect方法:
grpc-go/clientconn.go:786
func (ac *addrConn) connect() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
if ac.state != connectivity.Idle {
ac.mu.Unlock()
return nil
}
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
ac.resetTransport()
return nil
從connect開始的調用鍊路如下所示:
在baseBalancer的UpdateSubConnState方法的最後,更新了Picker為自定義的Picker:
grpc-go/balancer/base/balancer.go:221
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
在addrConn方法的最後會調用ac.resetTransport()真正的進行連接配接的建立:
當連接配接已經建立好,處于Ready狀态,最後調用baseBalancer.UpdateSubConnState方法,此時s==connectivity.Ready為true,而oldS == connectivity.Ready為false,是以會調用b.regeneratePicker()方法:
if
func (b *baseBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
b.picker = NewErrPicker(b.mergeErrors())
return
}
readySCs := make(map[balancer.SubConn]SubConnInfo)
// Filter out all ready SCs from full subConn map.
for _, addr := range b.subConns.Keys() {
sci, _ := b.subConns.Get(addr)
sc := sci.(balancer.SubConn)
if
在regeneratePicker中擷取了處于connectivity.Ready狀态可用的連接配接,同時更新了picker。還記得b.pickerBuilder嗎?b.b.pickerBuilder為在go-zero中自定義實作的base.PickerBuilder接口。
go-zero/zrpc/internal/balancer/p2c/p2c.go:42
func (b *p2cPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
readySCs := info.ReadySCs
if len(readySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
var conns []*subConn
for conn, connInfo := range readySCs {
conns = append(conns, &subConn{
addr: connInfo.Address,
conn: conn,
success: initSuccess,
})
}
return
最後把自定義的Picker指派為 ClientConn.blockingpicker.picker屬性。
grpc-go/balancer_conn_wrappers.go:347
func (ccb *ccBalancerWrapper)
如何選擇已建立的連接配接?
現在已經知道了如何建立連接配接,以及連接配接其實是在 baseBalancer.scStates 中管理,當連接配接的狀态發生變化,則會更新 **baseBalancer.scStates ** 。那麼接下來我們來看一下gRPC是如何選擇一個連接配接進行請求的發送的。
當gRPC用戶端發起調用的時候,會調用ClientConn的Invoke方法,一般不會主動使用該方法進行調用,該方法的調用一般是自動生成:
grpc-go/examples/helloworld/helloworld/helloworld_grpc.pb.go:39
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
如下為發起請求的調用鍊路,最終會調用p2cPicker.Pick方法擷取連接配接,我們自定義的負載均衡算法一般都在Pick方法中實作,擷取到連接配接之後,通過sendMsg發送請求。
grpc-go/stream.go:945
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
cs := a.cs
if a.trInfo != nil {
a.mu.Lock()
if a.trInfo.tr != nil {
a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
}
a.mu.Unlock()
}
if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
if !cs.desc.ClientStreams {
return nil
}
return io.EOF
}
if a.statsHandler != nil {
a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
}
return nil
源碼分析到此就結束了,由于篇幅有限沒法做到面面俱到,是以本文隻列出了源碼中的主要路徑。
結束語
Balancer相關的源碼還是有點複雜的,筆者也是讀了好幾遍才理清脈絡,是以如果讀了一兩遍感覺沒有頭緒也不用着急,對照文章的脈絡多讀幾遍就一定能搞懂。
如果有疑問可以随時找我讨論,在社群群中可以搜尋dawn_zhou找到我。
希望本篇文章對你有所幫助,你的點贊是作者持續輸出的最大動力。