天天看點

PD 用戶端源碼分析

作者:薛港-移動雲​

用戶端對象建立以及初始化

// NewClient creates a PD client.

func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {

return NewClientWithContext(context.Background(), pdAddrs, security, opts…)

}

// newBaseClient returns a new baseClient.

func newBaseClient(ctx context.Context, urls []string, security SecurityOption, opts …ClientOption) (*baseClient, error) {

ctx1, cancel := context.WithCancel(ctx)

c := &baseClient{

urls: urls,

checkLeaderCh: make(chan struct{}, 1),

checkTSODispatcherCh: make(chan struct{}, 1),

}

c.initRetry(c.initClusterID);
  c.initRetry(c.updateMember);
  
  c.wg.Add(1)
  go c.memberLoop()

  return c, nil
}      

client包含子子產品basic client ,這個子子產品目的很明确,基于使用者的請求,盡量發送到pd leader和tso dc-location 目的地。因為部分請求,隻能PD leader或者dc-location leader能夠處理,如果不是leader收到請求,還是要轉發到對應的leader。而basic client就是為了維護兩類leader,以及建立連接配接.

baseClient 子子產品分析,主要包含以下資訊:

對應pd leader對象

對應follower對象

dc-locatopn對應的leader對象以及grpc

一些channel,checkLeaderCh,checkTSODispatcherCh 。用于兩類leader的管理

// baseClient is a basic client for all other complex client.

type baseClient struct {

urls []string

clusterID uint64

// PD leader URL

leader atomic.Value // Store as string

// PD follower URLs

followers atomic.Value // Store as []string

// dc-location -> TSO allocator leader gRPC connection

clientConns sync.Map // Store as map[string]*grpc.ClientConn

// dc-location -> TSO allocator leader URL

allocators sync.Map // Store as map[string]string

checkLeaderCh chan struct{}

checkTSODispatcherCh chan struct{}

}

下面我們詳細分析basic client的建立流程,三個核心步驟

調用c.updateMember

func (c *baseClient) updateMember() error {

for _, u := range c.urls {

members, err := c.getMembers(ctx, u)

更新c.clientConns以及c.allocators,用于比對dc-location到grpc連接配接

c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders());

更新pd所有成員的url
    c.updateURLs(members.GetMembers())
        
        更新pd follower的urls
    c.updateFollowers(members.GetMembers(), members.GetLeader())
        更新c.leader,指定pd的leader.以及指定dc global對應的leader
    c.switchLeader(members.GetLeader().GetClientUrls()); 
        給c.checkTSODispatcherCh發送空消息,觸發check tso
    c.scheduleCheckTSODispatcher()
    return nil
  }
  
}      

啟動背景服務線程go c.memberLoop()

這個背景服務,每隔1分鐘觸發一次updateMemer用于更新pd成員角色。或者通過channel觸發成員角色更新

func (c *baseClient) memberLoop() {

for {

select {

case <-c.checkLeaderCh:

case <-time.After(time.Minute):

case <-ctx.Done():

return

}

if err := c.updateMember(); err != nil {

log.Error("[pd] failed updateMember", errs.ZapError(err))

}

}

}

Client流程分析:

核心流程分析,當使用者建立pd 用戶端的時候,觸發以下步驟,

1.建立client 對象,包含子子產品basic client對象的建立

2.調用c.updateTSODispatcher(),

3.啟動一系列背景管理線程

// NewClient creates a PD client.

func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {

return NewClientWithContext(context.Background(), pdAddrs, security, opts…)

}

// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {

  base, err := newBaseClient(ctx, addrsToUrls(pdAddrs), security, opts...)

  c := &client{
    baseClient:        base,
    checkTSDeadlineCh: make(chan struct{}),
  }

  c.updateTSODispatcher()
  c.wg.Add(3)
  go c.tsLoop()
  go c.tsCancelLoop()
  go c.leaderCheckLoop()

  return c, nil
}      

排程每個dc-location,用于請求批量處理發往這個dc-location 的tso 請要求

在client,每個dc對應一個TSODispatcher,dispatcher用于批量發送請求到相應的dc-location leader

func (c *client) updateTSODispatcher() {

// Set up the new TSO dispatcher

c.allocators.Range(func(dcLocationKey, _ interface{}) bool {

dcLocation := dcLocationKey.(string)

if !c.checkTSODispatcher(dcLocation) {

go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)

}

return true

})

}      

啟動背景服務handleDispatcher,這個背景服務調用processTSORequests 用于批量從PD請求一定數目的TSO

go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)

func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) {

for {

  select {
  case first := <-tsoDispatcher:
    pendingPlus1 := len(tsoDispatcher) + 1
    requests[0] = first
    for i := 1; i < pendingPlus1; i++ {
      requests[i] = <-tsoDispatcher
    }
    done := make(chan struct{})
    dl := deadline{
      timer:  time.After(c.timeout),
      done:   done,
      cancel: cancel,
    }
    tsDeadlineCh, ok := c.tsDeadline.Load(dc)

    select {
    case tsDeadlineCh.(chan deadline) <- dl:
    case <-dispatcherCtx.Done():
      return
    }
  
    err = c.processTSORequests(stream, dc, requests[:pendingPlus1], opts)
    
  }
  
}      

}

client背景服務

tsLoop流程分析:定期檢察或者通過checkTSODispatcherCh 觸發是否有新加入的dc-location,如果有的話,調用updateTSODispatcher,用于批量處理發往這個dc-location的tso 請求

func (c *client) tsLoop() {

ticker := time.NewTicker(tsLoopDCCheckInterval)

  for {
    c.updateTSODispatcher()
    select {
    case <-ticker.C:
    case <-c.checkTSODispatcherCh:
    case <-loopCtx.Done():
      return
    }
  }
}      
ticker := time.NewTicker(tsLoopDCCheckInterval)

  for {
    // Watch every dc-location's tsDeadlineCh
    c.allocators.Range(func(dcLocation, _ interface{}) bool {
      c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string))
      return true
    })
    select {
    case <-c.checkTSDeadlineCh:
      continue
    case <-ticker.C:
      continue
    case <-tsCancelLoopCtx.Done():
      return
    }
  }
}      
ticker := time.NewTicker(LeaderHealthCheckInterval)
  defer ticker.Stop()

  for {
    select {
    case <-c.ctx.Done():
      return
    case <-ticker.C:
      c.checkLeaderHealth(leaderCheckLoopCtx)
    }
  }
}