天天看點

PD 三類選主流程梳理

作者:薛港-移動雲​

​​

PD 三類選主流程梳理

​​

PD涉及三類選主

1.ETCD選主

啟動etcd, 調用embed.StartEtcd(s.etcdCfg),

etcd, err := embed.StartEtcd(s.etcdCfg)

等待etcd選主完成,通過等待channel (etcd.Server.ReadyNotify()),這個channel收到通知表明etcd cluster 完成選主,可以對外提供服務

select {

// Wait etcd until it is ready to use

case <-etcd.Server.ReadyNotify():

case <-newCtx.Done():

return errs.ErrCancelStartEtcd.FastGenByArgs()

}

背景啟動線程,定期(時間間隔s.cfg.LeaderPriorityCheckInterval)檢察目前PD和ETCD leader的優化級,如果發現目前pd 優化級更高,調用etcd tranfer leader,切換etcd leader為目前pd

func (s *Server) etcdLeaderLoop() {

for {
    select {
    case <-time.After(s.cfg.LeaderPriorityCheckInterval.Duration):
      s.member.CheckPriority(ctx)
    case <-ctx.Done():
      log.Info("server is closed, exit etcd leader loop")
      return
    }
  }
}      

// CheckPriority checks whether the etcd leader should be moved according to the priority.

func (m *Member) CheckPriority(ctx context.Context) {

etcdLeader := m.GetEtcdLeader()

myPriority, err := m.GetMemberLeaderPriority(m.ID())

  leaderPriority, err := m.GetMemberLeaderPriority(etcdLeader)

  if myPriority > leaderPriority {
    err := m.MoveEtcdLeader(ctx, etcdLeader, m.ID())

  }
}      

2.PD leader選主

初始化pd server 中member 成員,這個對象用于pd leader選主

func (s *Server) startServer(ctx context.Context) error {

s.member = member.NewMember(etcd, client, etcdServerID)

}

func (s *Server) startServer(ctx context.Context) error {

s.member.MemberInfo(s.cfg, s.Name(), s.rootPath)
s.member.SetMemberDeployPath(s.member.ID())
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)      

}

啟動背景服務線程s.leaderLoop(),用于pd 的選主

1.檢察目前是否有leader,如果已經存在leader,這個pd 不用參與選主,隻要watch 目前的leader,直到leader 過期補删除

2.如果leader過期,或者目前沒有pd leader,調用s.campaignLeader()啟動選主

2.1 調用s.member.CampaignLeader 開始選主,原理很簡單,利用etcd的事務操作,如果能夠寫入特定的key value,就表示寫主成功

2.2 調用背景服務線程,不停的續約PD leader,保證leader一直有效

2.3 因為很多元件依賴pd 的主,是以當PD選主成功以後,會啟動很多其它元件的設定工作(tso元件,id 配置設定元件,重新加載配置參數)

func (s *Server) leaderLoop() {

defer logutil.LogPanic()

defer s.serverLoopWg.Done()

for {
  leader, rev, checkAgain := s.member.CheckLeader()
  if checkAgain {
    continue
  }
  if leader != nil {
    err := s.reloadConfigFromKV()

    log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader))
    // WatchLeader will keep looping and never return unless the PD leader has changed.
    s.member.WatchLeader(s.serverLoopCtx, leader, rev)
    syncer.StopSyncWithLeader()
    log.Info("pd leader has changed, try to re-campaign a pd leader")
  }

  // To make sure the etcd leader and PD leader are on the same server.
  etcdLeader := s.member.GetEtcdLeader()
  if etcdLeader != s.member.ID() {
    time.Sleep(200 * time.Millisecond)
    continue
  }
  s.campaignLeader()
}      

}

func (s *Server) campaignLeader() {

log.Info(“start to campaign pd leader”, zap.String(“campaign-pd-leader-name”, s.Name()))

if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil {

}

// maintain the PD leader
go s.member.KeepLeader(ctx)
log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))

alllocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
  log.Error("failed to get the global TSO allocator", errs.ZapError(err))
  return
}
log.Info("initializing the global TSO allocator")
if err := alllocator.Initialize(0); err != nil {
  log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
  return
}
defer s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)
// Check the cluster dc-location after the PD leader is elected
go s.tsoAllocatorManager.ClusterDCLocationChecker()

if err := s.reloadConfigFromKV(); err != nil {
  log.Error("failed to reload configuration", errs.ZapError(err))
  return
}



// Try to create raft cluster.
if err := s.createRaftCluster(); err != nil {
  log.Error("failed to create raft cluster", errs.ZapError(err))
  return
}
defer s.stopRaftCluster()
if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil {

  return
}
if err := s.idAllocator.Rebase(); err != nil {
  
  return
}
s.member.EnableLeader()      
// Check whether the Local TSO Allocator has the leader already
  allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader()
  if checkAgain {
    continue
  }
  if allocatorLeader != nil {
    log.Info("start to watch allocator leader",
      zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader),
      zap.String("local-tso-allocator-name", am.member.Member().Name))
    // WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
    allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)
    log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
      zap.String("dc-location", allocator.GetDCLocation()))
  }

  // Check the next-leader key
  nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation())
  if err != nil {
    log.Error("get next leader from etcd failed",
      zap.String("dc-location", allocator.GetDCLocation()),
      errs.ZapError(err))
    time.Sleep(200 * time.Millisecond)
    continue
  }
  isNextLeader := false
  if nextLeader != 0 {
    if nextLeader != am.member.ID() {
      log.Info("skip campaigning of the local tso allocator leader and check later",
        zap.String("server-name", am.member.Member().Name),
        zap.Uint64("server-id", am.member.ID()),
        zap.Uint64("next-leader-id", nextLeader))
      time.Sleep(200 * time.Millisecond)
      continue
    }
    isNextLeader = true
  }

  // Make sure the leader is aware of this new dc-location in order to make the
  // Global TSO synchronization can cover up this dc-location.
  ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation())
  if err != nil {
    log.Error("get dc-location info from pd leader failed",
      zap.String("dc-location", allocator.GetDCLocation()),
      errs.ZapError(err))
    // PD leader hasn't been elected out, wait for the campaign
    if !longSleep(ctx, time.Second) {
      return
    }
    continue
  }
  if !ok || dcLocationInfo.Suffix <= 0 {
    log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round",
      zap.String("dc-location", allocator.GetDCLocation()),
      zap.Any("dc-location-info", dcLocationInfo),
      zap.String("wait-duration", checkStep.String()))
    // Because the checkStep is long, we use select here to check whether the ctx is done
    // to prevent the leak of goroutine.
    if !longSleep(ctx, checkStep) {
      return
    }
    continue
  }

  am.campaignAllocatorLeader(ctx, allocator, dcLocationInfo, isNextLeader)
}