作者:薛港-移動雲
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cGcq5yM0MzN5MzN5MTO3gzN5UmYyYzX5EzM0QTMxAzLcdDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.jpg)
PD涉及三類選主
1.ETCD選主
啟動etcd, 調用embed.StartEtcd(s.etcdCfg),
etcd, err := embed.StartEtcd(s.etcdCfg)
等待etcd選主完成,通過等待channel (etcd.Server.ReadyNotify()),這個channel收到通知表明etcd cluster 完成選主,可以對外提供服務
select {
// Wait etcd until it is ready to use
case <-etcd.Server.ReadyNotify():
case <-newCtx.Done():
return errs.ErrCancelStartEtcd.FastGenByArgs()
}
背景啟動線程,定期(時間間隔s.cfg.LeaderPriorityCheckInterval)檢察目前PD和ETCD leader的優化級,如果發現目前pd 優化級更高,調用etcd tranfer leader,切換etcd leader為目前pd
func (s *Server) etcdLeaderLoop() {
for {
select {
case <-time.After(s.cfg.LeaderPriorityCheckInterval.Duration):
s.member.CheckPriority(ctx)
case <-ctx.Done():
log.Info("server is closed, exit etcd leader loop")
return
}
}
}
// CheckPriority checks whether the etcd leader should be moved according to the priority.
func (m *Member) CheckPriority(ctx context.Context) {
etcdLeader := m.GetEtcdLeader()
myPriority, err := m.GetMemberLeaderPriority(m.ID())
leaderPriority, err := m.GetMemberLeaderPriority(etcdLeader)
if myPriority > leaderPriority {
err := m.MoveEtcdLeader(ctx, etcdLeader, m.ID())
}
}
2.PD leader選主
初始化pd server 中member 成員,這個對象用于pd leader選主
func (s *Server) startServer(ctx context.Context) error {
s.member = member.NewMember(etcd, client, etcdServerID)
}
func (s *Server) startServer(ctx context.Context) error {
s.member.MemberInfo(s.cfg, s.Name(), s.rootPath)
s.member.SetMemberDeployPath(s.member.ID())
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
}
啟動背景服務線程s.leaderLoop(),用于pd 的選主
1.檢察目前是否有leader,如果已經存在leader,這個pd 不用參與選主,隻要watch 目前的leader,直到leader 過期補删除
2.如果leader過期,或者目前沒有pd leader,調用s.campaignLeader()啟動選主
2.1 調用s.member.CampaignLeader 開始選主,原理很簡單,利用etcd的事務操作,如果能夠寫入特定的key value,就表示寫主成功
2.2 調用背景服務線程,不停的續約PD leader,保證leader一直有效
2.3 因為很多元件依賴pd 的主,是以當PD選主成功以後,會啟動很多其它元件的設定工作(tso元件,id 配置設定元件,重新加載配置參數)
func (s *Server) leaderLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
for {
leader, rev, checkAgain := s.member.CheckLeader()
if checkAgain {
continue
}
if leader != nil {
err := s.reloadConfigFromKV()
log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader))
// WatchLeader will keep looping and never return unless the PD leader has changed.
s.member.WatchLeader(s.serverLoopCtx, leader, rev)
syncer.StopSyncWithLeader()
log.Info("pd leader has changed, try to re-campaign a pd leader")
}
// To make sure the etcd leader and PD leader are on the same server.
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
time.Sleep(200 * time.Millisecond)
continue
}
s.campaignLeader()
}
}
func (s *Server) campaignLeader() {
log.Info(“start to campaign pd leader”, zap.String(“campaign-pd-leader-name”, s.Name()))
if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil {
}
// maintain the PD leader
go s.member.KeepLeader(ctx)
log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))
alllocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get the global TSO allocator", errs.ZapError(err))
return
}
log.Info("initializing the global TSO allocator")
if err := alllocator.Initialize(0); err != nil {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return
}
defer s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)
// Check the cluster dc-location after the PD leader is elected
go s.tsoAllocatorManager.ClusterDCLocationChecker()
if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
return
}
// Try to create raft cluster.
if err := s.createRaftCluster(); err != nil {
log.Error("failed to create raft cluster", errs.ZapError(err))
return
}
defer s.stopRaftCluster()
if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil {
return
}
if err := s.idAllocator.Rebase(); err != nil {
return
}
s.member.EnableLeader()
// Check whether the Local TSO Allocator has the leader already
allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader()
if checkAgain {
continue
}
if allocatorLeader != nil {
log.Info("start to watch allocator leader",
zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader),
zap.String("local-tso-allocator-name", am.member.Member().Name))
// WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)
log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
zap.String("dc-location", allocator.GetDCLocation()))
}
// Check the next-leader key
nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation())
if err != nil {
log.Error("get next leader from etcd failed",
zap.String("dc-location", allocator.GetDCLocation()),
errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
continue
}
isNextLeader := false
if nextLeader != 0 {
if nextLeader != am.member.ID() {
log.Info("skip campaigning of the local tso allocator leader and check later",
zap.String("server-name", am.member.Member().Name),
zap.Uint64("server-id", am.member.ID()),
zap.Uint64("next-leader-id", nextLeader))
time.Sleep(200 * time.Millisecond)
continue
}
isNextLeader = true
}
// Make sure the leader is aware of this new dc-location in order to make the
// Global TSO synchronization can cover up this dc-location.
ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation())
if err != nil {
log.Error("get dc-location info from pd leader failed",
zap.String("dc-location", allocator.GetDCLocation()),
errs.ZapError(err))
// PD leader hasn't been elected out, wait for the campaign
if !longSleep(ctx, time.Second) {
return
}
continue
}
if !ok || dcLocationInfo.Suffix <= 0 {
log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round",
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("wait-duration", checkStep.String()))
// Because the checkStep is long, we use select here to check whether the ctx is done
// to prevent the leak of goroutine.
if !longSleep(ctx, checkStep) {
return
}
continue
}
am.campaignAllocatorLeader(ctx, allocator, dcLocationInfo, isNextLeader)
}