天天看点

Codis源码解析——sentinel的重同步(2)

Topom.ha.monitor本身相当于一个上帝视角的sentinel。它本身并不是一个实际的sentinel服务器,但是它负责收集各个sentinel的监控信息,并对集群作出反馈。这一讲我们就来看看Topom.ha.monitor。这一篇的源码也有助于大家理解并发模型中context的使用。

下面参数中的servers []string就是添加的sentinel的ip:port所组成的字符串数组,有多少个sentinel,数组的长度就有多少

func (s *Topom) rewatchSentinels(servers []string) {
    if s.ha.monitor != nil {
        s.ha.monitor.Cancel()
        s.ha.monitor = nil
    }
    if len(servers) == {
        s.ha.masters = nil
    } else {
        //创建Topom中的ha.monitor
        s.ha.monitor = redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
        s.ha.monitor.LogFunc = log.Warnf
        s.ha.monitor.ErrFunc = log.WarnErrorf

        go func(p *redis.Sentinel) {
            var trigger = make(chan struct{},)
            //一个延时工具类,要么休眠一秒,要么休眠现在距离deadline的时间,取决于哪个更短
            //如果现在已经过了deadline,就不休眠
            delayUntil := func(deadline time.Time) {
                //如果从Sentinel中Context.Done()读出值,就表示这个sentinel的context已经被cancel
                for !p.IsCanceled() {
                    var d = deadline.Sub(time.Now())
                    if d <= {
                        return
                    }
                    time.Sleep(math2.MinDuration(d, time.Second))
                }
            }
            go func() {
                defer close(trigger)
                callback := func() {
                    select {
                    case trigger <- struct{}{}:
                    default:
                    }
                }
                for !p.IsCanceled() {
                    timeout := time.Minute *
                    retryAt := time.Now().Add(time.Second *)
                    if !p.Subscribe(servers, timeout, callback) {
                        delayUntil(retryAt)
                    } else {
                        callback()
                    }
                }
            }()
            go func() {
                for _ = range trigger {
                    var success int
                    for i :=; i != && !p.IsCanceled() && success !=; i++ {
                        timeout := time.Second *
                        masters, err := p.Masters(servers, timeout)
                        if err != nil {
                            log.WarnErrorf(err, "fetch group masters failed")
                        } else {
                            if !p.IsCanceled() {
                                s.SwitchMasters(masters)
                            }
                            success +=
                        }
                        delayUntil(time.Now().Add(time.Second *))
                    }
                }
            }()
        }(s.ha.monitor)
    }
    log.Warnf("rewatch sentinels = %v", servers)
}
//一个context被取消的标准就是能从Context.Done()中读出值
func (s *Sentinel) IsCanceled() bool {
    select {
    case <-s.Context.Done():
        return true
    default:
        return false
    }
}
           

Subscribe是让sentinel订阅名为”+switch-master”的channel,并从这个channel中读取主从切换的信息。将订阅成功与否写到results := make(chan bool, len(sentinels))中在,最后再遍历results

//timeout为15min
func (s *Sentinel) Subscribe(sentinels []string, timeout time.Duration, onMajoritySubscribed func()) bool {
    cntx, cancel := context.WithTimeout(s.Context, timeout)
    defer cancel()

    timeout += time.Second *
    results := make(chan bool, len(sentinels))

    //集群中sentinel数量的半数以上
    var majority = + len(sentinels)

    var subscribed atomic2.Int64
    for i := range sentinels {
        go func(sentinel string) {
            notified, err := s.subscribeDispatch(cntx, sentinel, timeout, func() {
                if subscribed.Incr() == int64(majority) {
                    onMajoritySubscribed()
                }
            })
            if err != nil {
                s.errorf(err, "sentinel-[%s] subscribe failed", sentinel)
            }
            results <- notified
        }(sentinels[i])
    }

    for alive := len(sentinels); ; alive-- {
        //如果超过半数sentinel都没有订阅成功
        if alive < majority {
            if cntx.Err() == nil {
                s.printf("sentinel subscribe lost majority (%d/%d)", alive, len(sentinels))
            }
            return false
        }
        select {
        case <-cntx.Done():
            if cntx.Err() != context.DeadlineExceeded {
                s.printf("sentinel subscribe canceled (%v)", cntx.Err())
            }
            return false
        case notified := <-results:
            if notified {
                s.printf("sentinel subscribe notified +switch-master")
                return true
            }
        }
    }
}
//订阅"+switch-master"成功则返回true
func (s *Sentinel) subscribeDispatch(ctx context.Context, sentinel string, timeout time.Duration,
    onSubscribed func()) (bool, error) {
    var err = s.dispatch(ctx, sentinel, timeout, func(c *Client) error {
        return s.subscribeCommand(c, sentinel, onSubscribed)
    })
    if err != nil {
        switch errors.Cause(err) {
        case context.Canceled, context.DeadlineExceeded:
            return false, nil
        default:
            return false, err
        }
    }
    return true, nil
}
func (s *Sentinel) subscribeCommand(client *Client, sentinel string,
    onSubscribed func()) error {
    var channels = []interface{}{"+switch-master"}
    if err := client.Flush("SUBSCRIBE", channels...); err != nil {
        return errors.Trace(err)
    }
    for _, sub := range channels {
        values, err := redigo.Values(client.Receive())
        if err != nil {
            return errors.Trace(err)
        } else if len(values) != {
            return errors.Errorf("invalid response = %v", values)
        }
        s, err := redigo.Strings(values[], nil)
        if err != nil || s] != "subscribe" || s] != sub.(string) {
            return errors.Errorf("invalid response = %v", values)
        }
    }
    onSubscribed()
    for {
        values, err := redigo.Values(client.Receive())
        if err != nil {
            return errors.Trace(err)
        } else if len(values) < {
            return errors.Errorf("invalid response = %v", values)
        }
        message, err := redigo.Strings(values, nil)
        if err != nil || message] != "message" {
            return errors.Errorf("invalid response = %v", values)
        }
        s.printf("sentinel-[%s] subscribe event %v", sentinel, message)

        //从订阅的channel中读取消息
        switch message] {
        case "+switch-master":
            if len(message) != {
                return errors.Errorf("invalid response = %v", values)
            }
            var params = strings.SplitN(message], " ",)
            if len(params) != {
                return errors.Errorf("invalid response = %v", values)
            }
            _, yes := s.isSameProduct(params])
            if yes {
                return nil
            }
        }
    }
}
           

注意,到上面为止,是集群中的sentinel订阅了redis服务器之间主从切换的信息,只有哨兵知道哪台是master。对于codis集群来讲,并不清楚哪台slave被推上了master。下面我们要做的,就是让哨兵感知到的新的master同样被codis集群感知到,也就是将其推到每个group的第一台server。

最后一步,通过SENTINEL INFO命令得到当前的主服务器,然后在各个group中更新主服务器信息。比方说,如果超过半数sentinel认为group中序号为1的server才是master,就把这台服务器和序号为0的server进行交换

func (s *Sentinel) Masters(sentinels []string, timeout time.Duration) (map[int]string, error) {
    cntx, cancel := context.WithTimeout(s.Context, timeout)
    defer cancel()

    timeout += time.Second *
    results := make(chan map[int]*SentinelMaster, len(sentinels))

    var majority = + len(sentinels)

    for i := range sentinels {
        go func(sentinel string) {
            //通过SENTINEL INFO命令得到哨兵感知到的master
            masters, err := s.mastersDispatch(cntx, sentinel, timeout)
            if err != nil {
                s.errorf(err, "sentinel-[%s] masters failed", sentinel)
            }
            results <- masters
        }(sentinels[i])
    }

    masters := make(map[int]string)
    current := make(map[int]*SentinelMaster)

    var voted int
    for alive := len(sentinels); ; alive-- {
        if alive == {
            switch {
            case cntx.Err() != context.DeadlineExceeded && cntx.Err() != nil:
                s.printf("sentinel masters canceled (%v)", cntx.Err())
                return nil, errors.Trace(cntx.Err())
            case voted != len(sentinels):
                s.printf("sentinel masters voted = (%d/%d) masters = %d (%v)", voted, len(sentinels), len(masters), cntx.Err())
            }
            if voted < majority {
                return nil, errors.Errorf("lost majority (%d/%d)", voted, len(sentinels))
            }
            return masters, nil
        }
        select {
        case <-cntx.Done():
            switch {
            case cntx.Err() != context.DeadlineExceeded:
                s.printf("sentinel masters canceled (%v)", cntx.Err())
                return nil, errors.Trace(cntx.Err())
            default:
                s.printf("sentinel masters voted = (%d/%d) masters = %d (%v)", voted, len(sentinels), len(masters), cntx.Err())
            }
            //最终通过的方案必须是半数以上sentinel同意的
            if voted < majority {
                return nil, errors.Errorf("lost majority (%d/%d)", voted, len(sentinels))
            }
            return masters, nil
        case m := <-results:
            if m == nil {
                continue
            }
            //构造sentinels选举出的master
            for gid, master := range m {
                if current[gid] == nil || current[gid].Epoch < master.Epoch {
                    current[gid] = master
                    masters[gid] = master.Addr
                }
            }
            voted +=
        }
    }
}
func (s *Topom) SwitchMasters(masters map[int]string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.closed {
        return ErrClosedTopom
    }
    s.ha.masters = masters

    if len(masters) != {
        cache := &redis.InfoCache{
            Auth: s.config.ProductAuth, Timeout: time.Millisecond *,
        }
        for gid, master := range masters {
            if err := s.trySwitchGroupMaster(gid, master, cache); err != nil {
                log.WarnErrorf(err, "sentinel switch group master failed")
            }
        }
    }
    return nil
}
//执行codis集群可感知的主从切换
func (s *Topom) trySwitchGroupMaster(gid int, master string, cache *redis.InfoCache) error {
    ctx, err := s.newContext()
    if err != nil {
        return err
    }
    g, err := ctx.getGroup(gid)
    if err != nil {
        return err
    }

    var index = func() int {
        for i, x := range g.Servers {
            if x.Addr == master {
                return i
            }
        }
        for i, x := range g.Servers {
            rid1 := cache.GetRunId(master)
            rid2 := cache.GetRunId(x.Addr)
            if rid1 != "" && rid1 == rid2 {
                return i
            }
        }
        return
    }()
    if index == {
        return errors.Errorf("group-[%d] doesn't have server %s with runid = '%s'", g.Id, master, cache.GetRunId(master))
    }
    if index == {
        return nil
    }
    defer s.dirtyGroupCache(g.Id)

    log.Warnf("group-[%d] will switch master to server[%d] = %s", g.Id, index, g.Servers[index].Addr)

    //执行主从切换,我们之前说过,codis集群中默认每个group的第一个server为master
    g.Servers], g.Servers[index] = g.Servers[index], g.Servers]
    g.OutOfSync = true
    return s.storeUpdateGroup(g)
}
           

下一步,在每个Proxy中设置其ha.servers为当前ctx中的sentinel,再执行一次上面的rewatchSentinels方法。

var fut sync2.Future
    for _, p := range ctx.proxy {
        fut.Add()
        go func(p *models.Proxy) {
            err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
            if err != nil {
                log.ErrorErrorf(err, "proxy-[%s] resync sentinel failed", p.Token)
            }
            fut.Done(p.Token, err)
        }(p)
    }
    for t, v := range fut.Wait() {
        switch err := v.(type) {
        case error:
            if err != nil {
                return errors.Errorf("proxy-[%s] sentinel failed", t)
            }
        }
    }
    p.OutOfSync = false
    //更新zk信息
    return s.storeUpdateSentinel(p)
           

总结一下,当一台sentinel第一次被添加到codis集群,或者是脱离codis集群之后,需要执行resync操作来重新对集群做监控。首先遍历所有server,放弃其原先监控的信息。格式化之后,再重新监控集群中的所有group,并根据dashboard.toml中的配置进行监控设置。最后,新建Topom.ha.monitor上帝视角sentinel,让集群中的所有sentinel订阅”+switch-master”,如果发生主从切换(即可以从channel中读出值),要从哨兵中读出当前的master地址,并在每个codis group中将对应的server推到group的第一个。设置每个Proxy的ha.servers为当前ctx中的sentinel,再执行一次上面的rewatchSentinels方法,最后再将sentinel的OutofSync更新为true,然后再更新zk下存储的信息。

说明

如有转载,请注明出处

http://blog.csdn.net/antony9118/article/details/78141271