天天看点

PD 客户端源码分析

作者:薛港-移动云​

客户端对象创建以及初始化

// NewClient creates a PD client.

func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {

return NewClientWithContext(context.Background(), pdAddrs, security, opts…)

}

// newBaseClient returns a new baseClient.

func newBaseClient(ctx context.Context, urls []string, security SecurityOption, opts …ClientOption) (*baseClient, error) {

ctx1, cancel := context.WithCancel(ctx)

c := &baseClient{

urls: urls,

checkLeaderCh: make(chan struct{}, 1),

checkTSODispatcherCh: make(chan struct{}, 1),

}

c.initRetry(c.initClusterID);
  c.initRetry(c.updateMember);
  
  c.wg.Add(1)
  go c.memberLoop()

  return c, nil
}      

client包含子模块basic client ,这个子模块目的很明确,基于用户的请求,尽量发送到pd leader和tso dc-location 目的地。因为部分请求,只能PD leader或者dc-location leader能够处理,如果不是leader收到请求,还是要转发到对应的leader。而basic client就是为了维护两类leader,以及建立连接.

baseClient 子模块分析,主要包含以下信息:

对应pd leader对象

对应follower对象

dc-locatopn对应的leader对象以及grpc

一些channel,checkLeaderCh,checkTSODispatcherCh 。用于两类leader的管理

// baseClient is a basic client for all other complex client.

type baseClient struct {

urls []string

clusterID uint64

// PD leader URL

leader atomic.Value // Store as string

// PD follower URLs

followers atomic.Value // Store as []string

// dc-location -> TSO allocator leader gRPC connection

clientConns sync.Map // Store as map[string]*grpc.ClientConn

// dc-location -> TSO allocator leader URL

allocators sync.Map // Store as map[string]string

checkLeaderCh chan struct{}

checkTSODispatcherCh chan struct{}

}

下面我们详细分析basic client的创建流程,三个核心步骤

调用c.updateMember

func (c *baseClient) updateMember() error {

for _, u := range c.urls {

members, err := c.getMembers(ctx, u)

更新c.clientConns以及c.allocators,用于匹配dc-location到grpc连接

c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders());

更新pd所有成员的url
    c.updateURLs(members.GetMembers())
        
        更新pd follower的urls
    c.updateFollowers(members.GetMembers(), members.GetLeader())
        更新c.leader,指定pd的leader.以及指定dc global对应的leader
    c.switchLeader(members.GetLeader().GetClientUrls()); 
        给c.checkTSODispatcherCh发送空消息,触发check tso
    c.scheduleCheckTSODispatcher()
    return nil
  }
  
}      

启动后台服务线程go c.memberLoop()

这个后台服务,每隔1分钟触发一次updateMemer用于更新pd成员角色。或者通过channel触发成员角色更新

func (c *baseClient) memberLoop() {

for {

select {

case <-c.checkLeaderCh:

case <-time.After(time.Minute):

case <-ctx.Done():

return

}

if err := c.updateMember(); err != nil {

log.Error("[pd] failed updateMember", errs.ZapError(err))

}

}

}

Client流程分析:

核心流程分析,当用户创建pd 客户端的时候,触发以下步骤,

1.创建client 对象,包含子模块basic client对象的创建

2.调用c.updateTSODispatcher(),

3.启动一系列后台管理线程

// NewClient creates a PD client.

func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {

return NewClientWithContext(context.Background(), pdAddrs, security, opts…)

}

// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {

  base, err := newBaseClient(ctx, addrsToUrls(pdAddrs), security, opts...)

  c := &client{
    baseClient:        base,
    checkTSDeadlineCh: make(chan struct{}),
  }

  c.updateTSODispatcher()
  c.wg.Add(3)
  go c.tsLoop()
  go c.tsCancelLoop()
  go c.leaderCheckLoop()

  return c, nil
}      

调度每个dc-location,用于请求批量处理发往这个dc-location 的tso 请要求

在client,每个dc对应一个TSODispatcher,dispatcher用于批量发送请求到相应的dc-location leader

func (c *client) updateTSODispatcher() {

// Set up the new TSO dispatcher

c.allocators.Range(func(dcLocationKey, _ interface{}) bool {

dcLocation := dcLocationKey.(string)

if !c.checkTSODispatcher(dcLocation) {

go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)

}

return true

})

}      

启动后台服务handleDispatcher,这个后台服务调用processTSORequests 用于批量从PD请求一定数目的TSO

go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)

func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) {

for {

  select {
  case first := <-tsoDispatcher:
    pendingPlus1 := len(tsoDispatcher) + 1
    requests[0] = first
    for i := 1; i < pendingPlus1; i++ {
      requests[i] = <-tsoDispatcher
    }
    done := make(chan struct{})
    dl := deadline{
      timer:  time.After(c.timeout),
      done:   done,
      cancel: cancel,
    }
    tsDeadlineCh, ok := c.tsDeadline.Load(dc)

    select {
    case tsDeadlineCh.(chan deadline) <- dl:
    case <-dispatcherCtx.Done():
      return
    }
  
    err = c.processTSORequests(stream, dc, requests[:pendingPlus1], opts)
    
  }
  
}      

}

client后台服务

tsLoop流程分析:定期检察或者通过checkTSODispatcherCh 触发是否有新加入的dc-location,如果有的话,调用updateTSODispatcher,用于批量处理发往这个dc-location的tso 请求

func (c *client) tsLoop() {

ticker := time.NewTicker(tsLoopDCCheckInterval)

  for {
    c.updateTSODispatcher()
    select {
    case <-ticker.C:
    case <-c.checkTSODispatcherCh:
    case <-loopCtx.Done():
      return
    }
  }
}      
ticker := time.NewTicker(tsLoopDCCheckInterval)

  for {
    // Watch every dc-location's tsDeadlineCh
    c.allocators.Range(func(dcLocation, _ interface{}) bool {
      c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string))
      return true
    })
    select {
    case <-c.checkTSDeadlineCh:
      continue
    case <-ticker.C:
      continue
    case <-tsCancelLoopCtx.Done():
      return
    }
  }
}      
ticker := time.NewTicker(LeaderHealthCheckInterval)
  defer ticker.Stop()

  for {
    select {
    case <-c.ctx.Done():
      return
    case <-ticker.C:
      c.checkLeaderHealth(leaderCheckLoopCtx)
    }
  }
}