sentinel是redis集群高可用的保障。怎么搭建sentinel,网上有很多教程,我们这里就不重复了。
添加sentinel的过程很简单。新建sentinel,调用sentinel flushconfig强制 sentinel 重写所有配置信息到配置文件,然后更新ctx.sentinel和zk上存储的信息。但是刚刚添加sentinel的OutOfSync属性默认为true,也就是说,此时与集群是脱节的。我们要手动触发sync来让新添加的sentinel监控整个集群
刷新ctx.sentinel的goroutine,参照之前的博客Codis源码解析——dashboard的启动(2)
func (s *Topom) ResyncSentinels() error {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
if err != nil {
return err
}
defer s.dirtySentinelCache()
p := ctx.sentinel
p.OutOfSync = true
if err := s.storeUpdateSentinel(p); err != nil {
return err
}
//dashboard.toml中的配置项
config := &redis.MonitorConfig{
Quorum: s.config.SentinelQuorum,
ParallelSyncs: s.config.SentinelParallelSyncs,
DownAfter: s.config.SentinelDownAfter.Duration(),
FailoverTimeout: s.config.SentinelFailoverTimeout.Duration(),
NotificationScript: s.config.SentinelNotificationScript,
ClientReconfigScript: s.config.SentinelClientReconfigScript,
}
sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
if err := sentinel.RemoveGroupsAll(p.Servers, time.Second*); err != nil {
log.WarnErrorf(err, "remove sentinels failed")
}
if err := sentinel.MonitorGroups(p.Servers, time.Second*, config, ctx.getGroupMasters()); err != nil {
log.WarnErrorf(err, "resync sentinels failed")
return err
}
s.rewatchSentinels(p.Servers)
var fut sync2.Future
for _, p := range ctx.proxy {
fut.Add()
go func(p *models.Proxy) {
//s.newProxyClient(p)就是调用proxy.NewApiClient(p.AdminAddr),得到一个ApiClient
//下一步调用s.proxy.SetSentinels(sentinel.Servers)
//其中最关键的方法是func (s *Proxy) rewatchSentinels(servers []string),后面会讲到
err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
if err != nil {
log.ErrorErrorf(err, "proxy-[%s] resync sentinel failed", p.Token)
}
fut.Done(p.Token, err)
}(p)
}
for t, v := range fut.Wait() {
switch err := v.(type) {
case error:
if err != nil {
return errors.Errorf("proxy-[%s] sentinel failed", t)
}
}
}
p.OutOfSync = false
return s.storeUpdateSentinel(p)
}
下面我们逐个看上面的方法。首先是新建sentinel,这里的context和我们这个集群的上下文不一样,是go并发中的context。不理解的话,可以参照我之前转的一篇文章Go语言并发模型:使用 context
func NewSentinel(product, auth string) *Sentinel {
s := &Sentinel{Product: product, Auth: auth}
s.Context, s.Cancel = context.WithCancel(context.Background())
return s
}
下面是RemoveGroupsAll方法。在Codis源码解析——dashboard的启动(2)中,我们说过,集群中的每组SentinelGroup,键是product-name与group-id拼起来的。这一步先让每个sentinel放弃之前的监听,相当于格式化
func (s *Sentinel) RemoveGroupsAll(sentinels []string, timeout time.Duration) error {
cntx, cancel := context.WithTimeout(s.Context, timeout)
defer cancel()
//timeout=5+5=10
timeout += time.Second *
results := make(chan error, len(sentinels))
for i := range sentinels {
go func(sentinel string) {
//传入sentinel的地址,调用/utils/redis/client.go中的NewClient方法,新建一个sentinel的redisClient连接
//然后,调用SENTINEL masters 显示被这个sentinel监控的所有master以及它们的状态
//如果这个sentinel目前对于productName-groupId这个group有监控
//就使用SENTINEL REMOVE <name> 命令sentinel逐个放弃上面的监听
err := s.removeGroupsAllDispatch(cntx, sentinel, timeout)
if err != nil {
s.errorf(err, "sentinel-[%s] remove failed", sentinel)
}
results <- err
}(sentinels[i])
}
var last error
for _ = range sentinels {
select {
//当一个Context被显示cancel或者超时,Done会返回一个关闭的channel
case <-cntx.Done():
if last != nil {
return last
}
// 当Done这个channel被关闭,Err说明了Context被cancel的原因
return errors.Trace(cntx.Err())
case err := <-results:
if err != nil {
last = err
}
}
}
return last
}
接下来是在sentinel格式化之后,重新监控集群中的group。
func (s *Sentinel) MonitorGroups(sentinels []string, timeout time.Duration, config *MonitorConfig, groups map[int]string) error {
cntx, cancel := context.WithTimeout(s.Context, timeout)
defer cancel()
resolve := make(map[int]*net.TCPAddr)
var exit = make(chan error,)
go func() (err error) {
defer func() {
exit <- err
}()
for gid, addr := range groups {
if err := cntx.Err(); err != nil {
return errors.Trace(err)
}
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
s.printf("sentinel monitor resolve tcp address of %s failed, %s", addr, err)
return errors.Trace(err)
}
resolve[gid] = tcpAddr
}
return nil
}()
select {
case <-cntx.Done():
if cntx.Err() != context.DeadlineExceeded {
s.printf("sentinel monitor canceled (%v)", cntx.Err())
} else {
s.printf("sentinel montior resolve tcp address (%v)", cntx.Err())
}
return errors.Trace(cntx.Err())
case err := <-exit:
if err != nil {
return err
}
}
timeout += time.Second *
results := make(chan error, len(sentinels))
for i := range sentinels {
go func(sentinel string) {
//调用SENTINEL MONITOR命令,监控集群中的group。监控之后,根据dashboard.toml中设置的sentinel参数对sentinel进行设置
err := s.monitorGroupsDispatch(cntx, sentinel, timeout, config, resolve)
if err != nil {
s.errorf(err, "sentinel-[%s] monitor failed", sentinel)
}
results <- err
}(sentinels[i])
}
var last error
for _ = range sentinels {
select {
case <-cntx.Done():
if last != nil {
return last
}
return errors.Trace(cntx.Err())
case err := <-results:
if err != nil {
last = err
}
}
}
return last
}
下面说明一下dashboard.toml中的配置项各有什么含义
配置参数 | 含义 |
---|---|
parallel-syncs | 在发生failover主备切换时,这个选项指定了最多 可以有多少个slave同时对新的master进行同步,这个数字越小,完成failover所需的时间就越长,但是如果这个数字越大,就意味着越多的 slave因为replication而不可用。可以通过将这个值设为 1 来保证每次只有一个slave处于不能处理命令请求的状态。也可以理解为一次性修改几个slave指向新的new master |
down-after-milliseconds | sentinel会向master发送心跳PING来确认master是否存活,如果master在“一定时间范围”内不回应PONG 或者是回复了一个错误消息,那么这个sentinel会主观地(单方面地)认为这个master已经不可用了(subjectively down, 也简称为SDOWN)。而这个down-after-milliseconds就是用来指定这个“一定时间范围”的,单位是毫秒。 |
failover-timeout | 如果sentinel A推荐sentinel B去执行failover,B会等待一段时间后,自行再次去对同一个master执行failover,这个等待的时间是通过failover-timeout配置项去配置的。从这个规则可以看出,sentinel集群中的sentinel不会再同一时刻并发去failover同一个master,第一个进行failover的sentinel如果失败了,另外一个将会在一定时间内进行重新进行failover,以此类推 |
notification-script | 在群集failover时会触发执行指定的脚本。脚本的执行结果若为1,即稍后重试(最大重试次数为10);若为2,则执行结束。并且脚本最大执行时间为60秒,超时会被终止执行。使用方法如如sentinel notification-script mymaster ./check.sh |
client-reconfig-script | 在重新配置new master,new slave过程,可以触发的脚本 |
下一讲,我们来说rewatchSentinels方法,这个方法在后面的s.newProxyClient(p).SetSentinels(ctx.sentinel)中也调用了。
说明
如有转载,请注明出处
http://blog.csdn.net/antony9118/article/details/78115375