作者:薛港-移動雲
用戶端對象建立以及初始化
// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts…)
}
// newBaseClient returns a new baseClient.
func newBaseClient(ctx context.Context, urls []string, security SecurityOption, opts …ClientOption) (*baseClient, error) {
ctx1, cancel := context.WithCancel(ctx)
c := &baseClient{
urls: urls,
checkLeaderCh: make(chan struct{}, 1),
checkTSODispatcherCh: make(chan struct{}, 1),
}
c.initRetry(c.initClusterID);
c.initRetry(c.updateMember);
c.wg.Add(1)
go c.memberLoop()
return c, nil
}
client包含子子產品basic client ,這個子子產品目的很明确,基于使用者的請求,盡量發送到pd leader和tso dc-location 目的地。因為部分請求,隻能PD leader或者dc-location leader能夠處理,如果不是leader收到請求,還是要轉發到對應的leader。而basic client就是為了維護兩類leader,以及建立連接配接.
baseClient 子子產品分析,主要包含以下資訊:
對應pd leader對象
對應follower對象
dc-locatopn對應的leader對象以及grpc
一些channel,checkLeaderCh,checkTSODispatcherCh 。用于兩類leader的管理
// baseClient is a basic client for all other complex client.
type baseClient struct {
urls []string
clusterID uint64
// PD leader URL
leader atomic.Value // Store as string
// PD follower URLs
followers atomic.Value // Store as []string
// dc-location -> TSO allocator leader gRPC connection
clientConns sync.Map // Store as map[string]*grpc.ClientConn
// dc-location -> TSO allocator leader URL
allocators sync.Map // Store as map[string]string
checkLeaderCh chan struct{}
checkTSODispatcherCh chan struct{}
}
下面我們詳細分析basic client的建立流程,三個核心步驟
調用c.updateMember
func (c *baseClient) updateMember() error {
for _, u := range c.urls {
members, err := c.getMembers(ctx, u)
更新c.clientConns以及c.allocators,用于比對dc-location到grpc連接配接
c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders());
更新pd所有成員的url
c.updateURLs(members.GetMembers())
更新pd follower的urls
c.updateFollowers(members.GetMembers(), members.GetLeader())
更新c.leader,指定pd的leader.以及指定dc global對應的leader
c.switchLeader(members.GetLeader().GetClientUrls());
給c.checkTSODispatcherCh發送空消息,觸發check tso
c.scheduleCheckTSODispatcher()
return nil
}
}
啟動背景服務線程go c.memberLoop()
這個背景服務,每隔1分鐘觸發一次updateMemer用于更新pd成員角色。或者通過channel觸發成員角色更新
func (c *baseClient) memberLoop() {
for {
select {
case <-c.checkLeaderCh:
case <-time.After(time.Minute):
case <-ctx.Done():
return
}
if err := c.updateMember(); err != nil {
log.Error("[pd] failed updateMember", errs.ZapError(err))
}
}
}
Client流程分析:
核心流程分析,當使用者建立pd 用戶端的時候,觸發以下步驟,
1.建立client 對象,包含子子產品basic client對象的建立
2.調用c.updateTSODispatcher(),
3.啟動一系列背景管理線程
// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts…)
}
// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
base, err := newBaseClient(ctx, addrsToUrls(pdAddrs), security, opts...)
c := &client{
baseClient: base,
checkTSDeadlineCh: make(chan struct{}),
}
c.updateTSODispatcher()
c.wg.Add(3)
go c.tsLoop()
go c.tsCancelLoop()
go c.leaderCheckLoop()
return c, nil
}
排程每個dc-location,用于請求批量處理發往這個dc-location 的tso 請要求
在client,每個dc對應一個TSODispatcher,dispatcher用于批量發送請求到相應的dc-location leader
func (c *client) updateTSODispatcher() {
// Set up the new TSO dispatcher
c.allocators.Range(func(dcLocationKey, _ interface{}) bool {
dcLocation := dcLocationKey.(string)
if !c.checkTSODispatcher(dcLocation) {
go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)
}
return true
})
}
啟動背景服務handleDispatcher,這個背景服務調用processTSORequests 用于批量從PD請求一定數目的TSO
go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)
func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) {
for {
select {
case first := <-tsoDispatcher:
pendingPlus1 := len(tsoDispatcher) + 1
requests[0] = first
for i := 1; i < pendingPlus1; i++ {
requests[i] = <-tsoDispatcher
}
done := make(chan struct{})
dl := deadline{
timer: time.After(c.timeout),
done: done,
cancel: cancel,
}
tsDeadlineCh, ok := c.tsDeadline.Load(dc)
select {
case tsDeadlineCh.(chan deadline) <- dl:
case <-dispatcherCtx.Done():
return
}
err = c.processTSORequests(stream, dc, requests[:pendingPlus1], opts)
}
}
}
client背景服務
tsLoop流程分析:定期檢察或者通過checkTSODispatcherCh 觸發是否有新加入的dc-location,如果有的話,調用updateTSODispatcher,用于批量處理發往這個dc-location的tso 請求
func (c *client) tsLoop() {
ticker := time.NewTicker(tsLoopDCCheckInterval)
for {
c.updateTSODispatcher()
select {
case <-ticker.C:
case <-c.checkTSODispatcherCh:
case <-loopCtx.Done():
return
}
}
}
ticker := time.NewTicker(tsLoopDCCheckInterval)
for {
// Watch every dc-location's tsDeadlineCh
c.allocators.Range(func(dcLocation, _ interface{}) bool {
c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string))
return true
})
select {
case <-c.checkTSDeadlineCh:
continue
case <-ticker.C:
continue
case <-tsCancelLoopCtx.Done():
return
}
}
}
ticker := time.NewTicker(LeaderHealthCheckInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.checkLeaderHealth(leaderCheckLoopCtx)
}
}
}