天天看點

grpc 連結池(3)resolver 、balancer和picker

作者:go算法架構leetcode

在分析完grpc連接配接的建立、使用和銷毀過程後golang源碼分析:grpc 連結池(2),我們來分析下grpc留給我們的程式設計擴充接口resolver 、balancer和picker是如何嵌入grpc連接配接池的。

總的來說:每個 ClientConn 對應有多個 SubConn,ClientConn 會基于名字發現(resolver)得到多個 SubConn,并面向多個 SubConn 之間實作負載均衡(balancer),每次用戶端請求的時候根據picker提供的Pick接口,從連接配接池中選擇一個SubConn來完成請求。resolver 與 balancer 都是抽象的,内建的 resolver 包括 dns、manual、passthrough,内建的 balancer 包括 roundrobin、grpclb。當然也可以基于插件化的 Register 模式來在子產品自身的 init() 函數中将自己注冊。

1,resolver

// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}           

當我們調用Dial擷取連接配接池的時候,首先是擷取resolver,通過解析target,獲得schema,然後通過schema在全局系統資料庫中找到對應的resolver,需要注意的是,我們在自定義resolver的時候引用的grpc版本一定要和發起連接配接的時候的grpc版本一緻,否則會出現resolver找不到使用預設的passthrough的情況,這是踩坑的血淚記憶。擷取resolver的源碼定義在google.golang.org/[email protected]/clientconn.go

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  resolverBuilder, err := cc.parseTargetAndFindResolver()
  rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
  cc.resolverWrapper = rWrapper           
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
rb = cc.getResolver(parsedTarget.Scheme)           
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
return resolver.Get(scheme)           

和我們注冊的過程是對應的

func init() {
  resolver.Register(&mockResolverBuilder{})
 }           

當然在ClientConn中使用的時候都是經過裝飾器包裹了一層的google.golang.org/[email protected]/resolver_conn_wrapper.go,它會調用建造器的Build接口:

func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
  ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)           

那麼什麼時候調用我們定義的resolver的ResolveNow 接口呢?在建立連接配接的時候:

func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
ac.cc.resolveNow(resolver.ResolveNowOptions{})           
func (ac *addrConn) resetTransport() {
  if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
  ac.cc.resolveNow(resolver.ResolveNowOptions{})           

它通過一個協程調用了resolverWrapper對應的方法

func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
  r := cc.resolverWrapper
  go r.resolveNow(o)           

實作位于google.golang.org/[email protected]/resolver_conn_wrapper.go

func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
  ccr.resolverMu.Lock()
  if !ccr.done.HasFired() {
  ccr.resolver.ResolveNow(o)           

如果狀态更新,會調用UpdateState

func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
return balancer.ErrBadResolverState
}           

google.golang.org/[email protected]/clientconn.go

func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
  cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
  bw := cc.balancerWrapper
  uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})           

2,balancer

balancer的注冊過程和resolver的過程一樣,隻不過使用的的時候不是通過target的schema來加載的,而是通過 grpc.WithDefaultServiceConfig選項實作的

grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, mybalancer.Name)           

google.golang.org/[email protected]/balancer/balancer.go balancer的核心接口是UpdateClientConnState和UpdateSubConnState,它傳入的對象是ClientConnState,裡面儲存了ResolverState,也就是resolver的解析結果

type Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes.  If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error.  Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}           
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancing configuration returned by the builder's
// ParseConfig method, if implemented.
BalancerConfig serviceconfig.LoadBalancingConfig
}           

接口當然也是被裝飾器包裹者google.golang.org/[email protected]/balancer_conn_wrappers.go,它會啟動一個螢幕,當連接配接狀态發生變化的時候,會調用對應事件處理函數處理,事件生成是時候并不是同步處理,而是先發送到channel裡面

func (ccb *ccBalancerWrapper) watcher() {
  for {
  select {
    case u := <-ccb.updateCh.Get():
    switch update := u.(type) {
    case *ccStateUpdate:
       ccb.handleClientConnStateChange(update.ccs)           
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
  ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
  select {
    case res = <-ccb.resultCh.Get():
    ccb.resultCh.Load()           
func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
   ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))           

最終事件是交給balancer的UpdateClientConnState處理了,在basebalancer裡面也實作了這個接口,這裡會周遊ResolverState.Addresses的位址清單,然後發起連接配接,也就是建立連接配接池的初始子連接配接。并且生産picker

google.golang.org/[email protected]/balancer/base/balancer.go

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
  for _, a := range s.ResolverState.Addresses {
     addrsSet.Set(a, nil)
     if _, ok := b.subConns.Get(a); !ok {
      sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
      sc.Connect()
  b.regeneratePicker()
  b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})           

basebalancer也實作了另外一個接口,思路一樣,隻不過處理的是連接配接池裡的子連接配接:

func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
  if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
  b.state == connectivity.TransientFailure {
  b.regeneratePicker()
  }
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})           

他們調用的UpdateState位于google.golang.org/[email protected]/balancer_conn_wrappers.go,會更新picker

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
  ccb.cc.blockingpicker.updatePicker(s.Picker)
  ccb.cc.csMgr.updateState(s.ConnectivityState)           

3,picker

我們定義picker的時候這冊思路也一樣,需要實作builder

func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {           

它的參數定義位于google.golang.org/[email protected]/balancer/base/base.go

type PickerBuildInfo struct {
// ReadySCs is a map from all ready SubConns to the Addresses used to
// create them.
ReadySCs map[balancer.SubConn]SubConnInfo
}           

注意這裡面的map,包含了已經建立的連接配接,picker的實作,隻需要定義自己的選擇算法,從中選擇合适的連接配接供Invoke使用。這個map是什麼時候生成的呢,我們看下picker的執行個體化邏輯

google.golang.org/[email protected]/balancer/base/balancer.go

func (b *baseBalancer) regeneratePicker() {
  for _, addr := range b.subConns.Keys() {
     sci, _ := b.subConns.Get(addr)
      if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
      readySCs[sc] = SubConnInfo{Address: addr}
      }
  }
  b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})           

可以看到,它通過位址擷取連接配接,選取ready的連接配接,放入到這個map,最後用這個map初始化了picker。而位址的來源正是resolver解析得到的,它儲存在:

// ClientConnState describes the state of a ClientConn relevant to the
// balancer.
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancing configuration returned by the builder's
// ParseConfig method, if implemented.
BalancerConfig serviceconfig.LoadBalancingConfig
}           

google.golang.org/[email protected]/internal/balancer/gracefulswitch/gracefulswitch.go

func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
return balToUpdate.UpdateClientConnState(state)           

當Invoke的時候,會先調用裝飾器的pick方法

google.golang.org/[email protected]/clientconn.go

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx:            ctx,
FullMethodName: method,
})
}           

google.golang.org/[email protected]/picker_wrapper.go

func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
   pickResult, err := p.Pick(info)           

繼續閱讀