天天看点

Codis源码解析——sentinel的重同步(1)

sentinel是redis集群高可用的保障。怎么搭建sentinel,网上有很多教程,我们这里就不重复了。

添加sentinel的过程很简单。新建sentinel,调用sentinel flushconfig强制 sentinel 重写所有配置信息到配置文件,然后更新ctx.sentinel和zk上存储的信息。但是刚刚添加sentinel的OutOfSync属性默认为true,也就是说,此时与集群是脱节的。我们要手动触发sync来让新添加的sentinel监控整个集群

Codis源码解析——sentinel的重同步(1)

刷新ctx.sentinel的goroutine,参照之前的博客Codis源码解析——dashboard的启动(2)

func (s *Topom) ResyncSentinels() error {
    s.mu.Lock()
    defer s.mu.Unlock()
    ctx, err := s.newContext()
    if err != nil {
        return err
    }
    defer s.dirtySentinelCache()

    p := ctx.sentinel
    p.OutOfSync = true
    if err := s.storeUpdateSentinel(p); err != nil {
        return err
    }

    //dashboard.toml中的配置项
    config := &redis.MonitorConfig{
        Quorum:               s.config.SentinelQuorum,
        ParallelSyncs:        s.config.SentinelParallelSyncs,
        DownAfter:            s.config.SentinelDownAfter.Duration(),
        FailoverTimeout:      s.config.SentinelFailoverTimeout.Duration(),
        NotificationScript:   s.config.SentinelNotificationScript,
        ClientReconfigScript: s.config.SentinelClientReconfigScript,
    }

    sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
    if err := sentinel.RemoveGroupsAll(p.Servers, time.Second*); err != nil {
        log.WarnErrorf(err, "remove sentinels failed")
    }
    if err := sentinel.MonitorGroups(p.Servers, time.Second*, config, ctx.getGroupMasters()); err != nil {
        log.WarnErrorf(err, "resync sentinels failed")
        return err
    }
    s.rewatchSentinels(p.Servers)

    var fut sync2.Future
    for _, p := range ctx.proxy {
        fut.Add()
        go func(p *models.Proxy) {
            //s.newProxyClient(p)就是调用proxy.NewApiClient(p.AdminAddr),得到一个ApiClient
            //下一步调用s.proxy.SetSentinels(sentinel.Servers)
            //其中最关键的方法是func (s *Proxy) rewatchSentinels(servers []string),后面会讲到
            err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
            if err != nil {
                log.ErrorErrorf(err, "proxy-[%s] resync sentinel failed", p.Token)
            }
            fut.Done(p.Token, err)
        }(p)
    }
    for t, v := range fut.Wait() {
        switch err := v.(type) {
        case error:
            if err != nil {
                return errors.Errorf("proxy-[%s] sentinel failed", t)
            }
        }
    }

    p.OutOfSync = false
    return s.storeUpdateSentinel(p)
}
           

下面我们逐个看上面的方法。首先是新建sentinel,这里的context和我们这个集群的上下文不一样,是go并发中的context。不理解的话,可以参照我之前转的一篇文章Go语言并发模型:使用 context

func NewSentinel(product, auth string) *Sentinel {
    s := &Sentinel{Product: product, Auth: auth}
    s.Context, s.Cancel = context.WithCancel(context.Background())
    return s
}
           

下面是RemoveGroupsAll方法。在Codis源码解析——dashboard的启动(2)中,我们说过,集群中的每组SentinelGroup,键是product-name与group-id拼起来的。这一步先让每个sentinel放弃之前的监听,相当于格式化

func (s *Sentinel) RemoveGroupsAll(sentinels []string, timeout time.Duration) error {
    cntx, cancel := context.WithTimeout(s.Context, timeout)
    defer cancel()

    //timeout=5+5=10
    timeout += time.Second *
    results := make(chan error, len(sentinels))

    for i := range sentinels {
        go func(sentinel string) {
            //传入sentinel的地址,调用/utils/redis/client.go中的NewClient方法,新建一个sentinel的redisClient连接
            //然后,调用SENTINEL masters 显示被这个sentinel监控的所有master以及它们的状态
            //如果这个sentinel目前对于productName-groupId这个group有监控
            //就使用SENTINEL REMOVE <name> 命令sentinel逐个放弃上面的监听
            err := s.removeGroupsAllDispatch(cntx, sentinel, timeout)
            if err != nil {
                s.errorf(err, "sentinel-[%s] remove failed", sentinel)
            }
            results <- err
        }(sentinels[i])
    }

    var last error
    for _ = range sentinels {
        select {
        //当一个Context被显示cancel或者超时,Done会返回一个关闭的channel
        case <-cntx.Done():
            if last != nil {
                return last
            }
            // 当Done这个channel被关闭,Err说明了Context被cancel的原因
            return errors.Trace(cntx.Err())
        case err := <-results:
            if err != nil {
                last = err
            }
        }
    }
    return last
}
           

接下来是在sentinel格式化之后,重新监控集群中的group。

func (s *Sentinel) MonitorGroups(sentinels []string, timeout time.Duration, config *MonitorConfig, groups map[int]string) error {
    cntx, cancel := context.WithTimeout(s.Context, timeout)
    defer cancel()

    resolve := make(map[int]*net.TCPAddr)

    var exit = make(chan error,)

    go func() (err error) {
        defer func() {
            exit <- err
        }()
        for gid, addr := range groups {
            if err := cntx.Err(); err != nil {
                return errors.Trace(err)
            }
            tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
            if err != nil {
                s.printf("sentinel monitor resolve tcp address of %s failed, %s", addr, err)
                return errors.Trace(err)
            }
            resolve[gid] = tcpAddr
        }
        return nil
    }()

    select {
    case <-cntx.Done():
        if cntx.Err() != context.DeadlineExceeded {
            s.printf("sentinel monitor canceled (%v)", cntx.Err())
        } else {
            s.printf("sentinel montior resolve tcp address (%v)", cntx.Err())
        }
        return errors.Trace(cntx.Err())
    case err := <-exit:
        if err != nil {
            return err
        }
    }

    timeout += time.Second *
    results := make(chan error, len(sentinels))

    for i := range sentinels {
        go func(sentinel string) {
            //调用SENTINEL MONITOR命令,监控集群中的group。监控之后,根据dashboard.toml中设置的sentinel参数对sentinel进行设置
            err := s.monitorGroupsDispatch(cntx, sentinel, timeout, config, resolve)
            if err != nil {
                s.errorf(err, "sentinel-[%s] monitor failed", sentinel)
            }
            results <- err
        }(sentinels[i])
    }

    var last error
    for _ = range sentinels {
        select {
        case <-cntx.Done():
            if last != nil {
                return last
            }
            return errors.Trace(cntx.Err())
        case err := <-results:
            if err != nil {
                last = err
            }
        }
    }
    return last
}
           

下面说明一下dashboard.toml中的配置项各有什么含义

配置参数 含义
parallel-syncs 在发生failover主备切换时,这个选项指定了最多 可以有多少个slave同时对新的master进行同步,这个数字越小,完成failover所需的时间就越长,但是如果这个数字越大,就意味着越多的 slave因为replication而不可用。可以通过将这个值设为 1 来保证每次只有一个slave处于不能处理命令请求的状态。也可以理解为一次性修改几个slave指向新的new master
down-after-milliseconds sentinel会向master发送心跳PING来确认master是否存活,如果master在“一定时间范围”内不回应PONG 或者是回复了一个错误消息,那么这个sentinel会主观地(单方面地)认为这个master已经不可用了(subjectively down, 也简称为SDOWN)。而这个down-after-milliseconds就是用来指定这个“一定时间范围”的,单位是毫秒。
failover-timeout 如果sentinel A推荐sentinel B去执行failover,B会等待一段时间后,自行再次去对同一个master执行failover,这个等待的时间是通过failover-timeout配置项去配置的。从这个规则可以看出,sentinel集群中的sentinel不会再同一时刻并发去failover同一个master,第一个进行failover的sentinel如果失败了,另外一个将会在一定时间内进行重新进行failover,以此类推
notification-script 在群集failover时会触发执行指定的脚本。脚本的执行结果若为1,即稍后重试(最大重试次数为10);若为2,则执行结束。并且脚本最大执行时间为60秒,超时会被终止执行。使用方法如如sentinel notification-script mymaster ./check.sh
client-reconfig-script 在重新配置new master,new slave过程,可以触发的脚本

下一讲,我们来说rewatchSentinels方法,这个方法在后面的s.newProxyClient(p).SetSentinels(ctx.sentinel)中也调用了。

说明

如有转载,请注明出处

http://blog.csdn.net/antony9118/article/details/78115375