天天看点

Codis源码解析——proxy监听redis请求

上一篇我们讲到,pkg/proxy/proxy.go的构造函数中,传入Config,返回Proxy。其中有一步是

//s是Proxy
go s.serveProxy()
           

每接到一个redis请求,就创建一个独立的session进行处理(默认的每个session的tcp连接过期时间为75秒,也就是每个请求最多处理75秒)。这里的第一个参数是net.Conn,Conn是一个通用的面向流的网络连接,多个goroutines可以同时调用Conn的方法。这里的net.Conn就是我们之前Proxy的lproxy这个Listener监听到的19000请求到来的时候返回的net.Conn。

func NewSession(sock net.Conn, config *Config) *Session {
    c := redis.NewConn(sock,
        config.SessionRecvBufsize.AsInt(),
        config.SessionSendBufsize.AsInt(),
    )
    c.ReaderTimeout = config.SessionRecvTimeout.Duration()
    c.WriterTimeout = config.SessionSendTimeout.Duration()
    c.SetKeepAlivePeriod(config.SessionKeepAlivePeriod.Duration())

    s := &Session{
        Conn: c, config: config,
        CreateUnix: time.Now().Unix(),
    }
    s.stats.opmap = make(map[string]*opStats, )
    log.Infof("session [%p] create: %s", s, s)
    return s
}
           

下一步将路由器传入,调用start方法。在stats.go中,有一个sessions结构,里面记录了总的session数量和alive的session数量。

start方法首先检查总的session数量是否超过上限(默认为1000),以及Router是否在线。如果不符合就返回错误。注意下面的方法是被Session中的Once.Do包起来的,即使有多次调用,也只会执行一次,避免浪费不必要的性能

if int(incrSessions()) > s.config.ProxyMaxClients {
    go func() {
        s.Conn.Encode(redis.NewErrorf("ERR max number of clients reached"), true)
        s.CloseWithError(ErrTooManySessions)
    }()
    decrSessions()
    return
}

if !d.isOnline() {
    go func() {
        s.Conn.Encode(redis.NewErrorf("ERR router is not online"), true)
        s.CloseWithError(ErrRouterNotOnline)
    }()
    decrSessions()
    return
}
           

核心就是创建loopReader和loopWriter。loopReader负责读取和分发请求到后端,loopWriter负责合并请求结果,然后返回给客户端。

//给RequestChan的buff中赋1024位的数组,并返回一个RequestChan
tasks := NewRequestChanBuffer)
go func() {
    s.loopWriter(tasks)
    //alive session减一
    decrSessions()
}()

go func() {
    s.loopReader(tasks, d)
    //所有请求取完或者proxy退出之后,上面的方法就会结束,关闭tasks这个requestChan
    tasks.Close()
}()
           

我们看一下RequestChan的结构,后面很多东西都是基于它的。每一个session都会有一个RequestChan

type RequestChan struct {
    lock sync.Mutex

    //sync.NewCond(&RequestChan.lock)
    //如果RequestChan为空,就让goroutinewait;如果向RequestChan放入了一个请求,并且有goroutine在等待,就唤醒一个
    cond *sync.Cond

    data []*Request
    buff []*Request

    waits  int
    closed bool
}
           

这一篇我们对这两个方法进行详细介绍。

loopReader读取和分发请求到后端,关键代码就是handleRequest函数,传入的两个参数分别是d *Router和r := &Request{},也就是把结果存到task里面,后面loopWriter会用到

func (s *Session) loopReader(tasks *RequestChan, d *Router) (err error) {
    defer func() {
        s.CloseReaderWithError(err)
    }()

    var (
        breakOnFailure = s.config.SessionBreakOnFailure
        maxPipelineLen = s.config.SessionMaxPipeline
    )

    //session只要没有退出,就一直从conn中取请求,直到请求取完就return,然后会关闭tasks这个requestChan
    for !s.quit {
        //从redis连接中取出请求参数
        multi, err := s.Conn.DecodeMultiBulk()
        if err != nil {
            return err
        }
        s.incrOpTotal()

        //检测requestChan的data是否超过配置的每个pipeline最大请求长度,默认为
        if tasks.Buffered() > maxPipelineLen {
            return ErrTooManyPipelinedRequests
        }

        start := time.Now()
        s.LastOpUnix = start.Unix()
        s.Ops++

        r := &Request{}
        //这个Multi非常重要,请求的参数就在里面,是一个[]*redis.Resp切片
        r.Multi = multi
        //WaitGroup的作用是,阻塞主线程的执行,一直等到所有的goroutine执行完成。每创建一个goroutine
        //就把任务队列中任务的数量+,任务完成,将任务队列中的任务数量-。有点类似于java里面的CountDownLatch
        //这个Batch用于检测redis请求是否完成(完成的标志是BackendConn调用了setRResponse)
        r.Batch = &sync.WaitGroup{}
        r.Database = s.database
        r.UnixNano = start.UnixNano()

        if err := s.handleRequest(r, d); err != nil {
            r.Resp = redis.NewErrorf("ERR handle request, %s", err)
            tasks.PushBack(r)
            if breakOnFailure {
                return err
            }
        } else {
            tasks.PushBack(r)
        }
    }
    return nil
}
           

先解释一下上面的PushBack方法,这个方法比较简单,就是把request(此时已经处理完毕,将resp设置为request的一个参数)添加到当前Session中之前创建的RequestChan中。loopWriter后面再遍历RequestChan取出所有请求及结果

func (c *RequestChan) lockedPushBack(r *Request) int {
    if c.closed {
        panic("send on closed chan")
    }
    //RequestChan的waits不为0的时候(也就是在RequestChan上等待的request数量不为0时),唤醒一个在cond上等待的goroutine
    //这里的意思是,如果向requestChan中放入了请求,就将一个在cond上等待取出的goroutine唤醒
    if c.waits != {
        c.cond.Signal()
    }
    //将request添加到RequestChan的data []*Request切片中,用于记录处理过的请求。
    c.data = append(c.data, r)
    return len(c.data)
}
           

这个方法不是重点,重点在于handleRequest方法,就是将请求取出,然后根据不同的redis请求调用不同的方法,被调用的就是codis-server

func (s *Session) handleRequest(r *Request, d *Router) error {
    //解析请求。opstr取决于具体的命令,比如说"SET"
    opstr, flag, err := getOpInfo(r.Multi)
    if err != nil {
        return err
    }
    r.OpStr = opstr
    r.OpFlag = flag
    r.Broken = &s.broken

    //有些命令不支持,就会返回错误
    if flag.IsNotAllowed() {
        return fmt.Errorf("command '%s' is not allowed", opstr)
    }

    switch opstr {
    case "QUIT":
        return s.handleQuit(r)
    case "AUTH":
        return s.handleAuth(r)
    }

    if !s.authorized {
        if s.config.SessionAuth != "" {
            r.Resp = redis.NewErrorf("NOAUTH Authentication required")
            return nil
        }
        s.authorized = true
    }

    //根据不同的redis操作调用不同的方法
    switch opstr {
    case "SELECT":
        return s.handleSelect(r)
    case "PING":
        return s.handleRequestPing(r, d)
    case "INFO":
        return s.handleRequestInfo(r, d)
    case "MGET":
        return s.handleRequestMGet(r, d)
    default:
    return d.dispatch(r)
    .
    .
    }
}
           

其实仔细观察上面的switch case语句,就能发现,除了SELECT和default之外,都多传入了一个参数,也就是Router。联想到路由本身的作用,就不难想到,handleRequestPing之类的方法,都是要对底层的redis服务器进行分发的;反之,handleSelect方法,则不需要分发。我们来对比一下。

首先是handleSelect方法,进去看一下。这个方法很简单了,因为只是选择db。从请求参数中取出db号,做了一个号超出范围的异常处理,如果db取得没有问题的话,就返回ok,error为nil

func (s *Session) handleSelect(r *Request) error {
    if len(r.Multi) !=  {
        r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'SELECT' command")
        return nil
    }
    //将字符串转换为十进制整数,从请求参数中取出是对哪个db进行操作
    switch db, err := strconv.Atoi(string(r.Multi[].Value)); {
    case err != nil:
        r.Resp = redis.NewErrorf("ERR invalid DB index")
    case db <  || db >= int(s.config.BackendNumberDatabases):
        r.Resp = redis.NewErrorf("ERR invalid DB index, only accept DB [0,%d)", s.config.BackendNumberDatabases)
    default:
        r.Resp = RespOK
        s.database = int32(db)
    }
    return nil
}
           

其他传入了Router参数的方法中,都调用了Router的dispatch方法,将请求分发给相应的槽进行处理,如果调用Mset,一次设置多个值的话,就还要多做一步,通过Coalesce来合并请求结果

func (s *Session) handleRequestMSet(r *Request, d *Router) error {
    var nblks = len(r.Multi) - 
    switch {
    case nblks ==  || nblks%2 != :
        r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'MSET' command")
        return nil
    case nblks == :
        return d.dispatch(r)
    }

    //将一个Mset请求拆分成多个子set请求,分别dispatch
    var sub = r.MakeSubRequest(nblks / 2)
    for i := range sub {
        sub[i].Multi = []*redis.Resp{
            r.Multi[],
            r.Multi[i*2+],
            r.Multi[i*2+],
        }
        if err := d.dispatch(&sub[i]); err != nil {
            return err
        }
    }
    r.Coalesce = func() error {
        for i := range sub {
            if err := sub[i].Err; err != nil {
                return err
            }
            switch resp := sub[i].Resp; {
            case resp == nil:
                return ErrRespIsRequired
            case resp.IsString():
                r.Resp = resp
            default:
                return fmt.Errorf("bad mset resp: %s value.len = %d", resp.Type, len(resp.Value))
            }
        }
        return nil
    }
    return nil
}
           

将某个request分发给各个具体的slot进行处理的方法主要有三个,都是由Router来完成:

//根据key进行转发
func (s *Router) dispatch(r *Request) error {
    hkey := getHashKey(r.Multi, r.OpStr)
    var id = Hash(hkey) % MaxSlotNum
    slot := &s.slots[id]
    //将请求分发到相应的slot
    return slot.forward(r, hkey)
}
//将request发到指定slot
func (s *Router) dispatchSlot(r *Request, id int) error {
    if id <  || id >= MaxSlotNum {
        return ErrInvalidSlotId
    }
    slot := &s.slots[id]
    return slot.forward(r, nil)
}
//这个是指明了redis服务器地址的请求,如果找不到就返回false
func (s *Router) dispatchAddr(r *Request, addr string) bool {
    s.mu.RLock()
    defer s.mu.RUnlock()
    if bc := s.pool.primary.Get(addr).BackendConn(r.Database, r.Seed16(), false); bc != nil {
        bc.PushBack(r)
        return true
    }
    if bc := s.pool.replica.Get(addr).BackendConn(r.Database, r.Seed16(), false); bc != nil {
        bc.PushBack(r)
        return true
    }
    return false
}
           

我们看一下forward方法,这个方法的具体实现是在/pkg/proxy/foward.go中。每一个slot可能有两种forwardMethod,分别是forwardSync或者forwardSemiSync,原理类似,都是将指定的slot、request、键的哈希值,经过process得到实际处理请求的BackendConn,然后把请求放入BackendConn的chan *Request中,等待处理。这两个的区别就是,如果一个slot在迁移中,proxy分别会使用SLOTSMGRTTAGONE和SLOTSMGRT-EXEC-WRAPPER强制完成其迁移。BackendConn一般最重要的信息就是redis服务器的地址(例如10.0.2.15:6379)以及对应的database编号(0到15,一般就是0)。

//forwardSync的Forward和process
func (d *forwardSync) Forward(s *Slot, r *Request, hkey []byte) error {
    //加slot的读锁
    s.lock.RLock()
    bc, err := d.process(s, r, hkey)   
    s.lock.RUnlock()
    if err != nil {
        return err
    }
    //请求放入BackendConn等待处理
    bc.PushBack(r)                     
    return nil
}
func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) {
    //检查该slot的backend是否为空
    if s.backend.bc == nil {
        log.Debugf("slot-%04d is not ready: hash key = '%s'",
            s.id, hkey)
        return nil, ErrSlotIsNotReady
    }
    //如果这个slot处在迁移过程中,那么其migrate就不为空(从何处迁移),由proxy的slot的fowardMethod强制对其完成迁移
    if s.migrate.bc != nil && len(hkey) != {
        if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil {
            log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
                s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
            return nil, err
        }
    }
    r.Group = &s.refs
    r.Group.Add)
    return d.forward2(s, r), nil
}
//forwardSemiAsync的Forward和process
func (d *forwardSemiAsync) Forward(s *Slot, r *Request, hkey []byte) error {
    var loop int
    for {
        s.lock.RLock()
        bc, retry, err := d.process(s, r, hkey)
        s.lock.RUnlock()

        switch {
        case err != nil:
            return err
        case !retry:
            if bc != nil {
                bc.PushBack(r)
            }
            return nil
        }

        var delay time.Duration
        switch {
        case loop <:
            delay =
        case loop <:
            delay = time.Millisecond * time.Duration(loop)
        default:
            delay = time.Millisecond *
        }
        time.Sleep(delay)

        if r.IsBroken() {
            return ErrRequestIsBroken
        }
        loop +=
    }
}
           

无论是forwardSync还是forwardSemiAsync,在process的过程中,都要调用下面的forward2方法来从Slot获取真正处理redis请求的BackendConn

//获取真正处理redis请求的BackendConn 
func (d *forwardHelper) forward2(s *Slot, r *Request) *BackendConn {
    var database, seed = r.Database, r.Seed16()
    //如果是读请求,就调用replicaGroup来处理
    if s.migrate.bc == nil && r.IsReadOnly() && len(s.replicaGroups) != {
        for _, group := range s.replicaGroups {
            var i = seed
            for _ = range group {
                i = (i +) % uint(len(group))
                if bc := group[i].BackendConn(database, seed, false); bc != nil {
                    return bc
                }
            }
        }
    }
    //从sharedBackendConn中取出一个BackendConn(sharedBackendConn中储存了BackendConn组成的二维切片)
    return s.backend.bc.BackendConn(database, seed, true)
}
func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool) *BackendConn {
    if s == nil {
        return nil
    }

    if s.single != nil {
        bc := s.single[database]
        if must || bc.IsConnected() {
            return bc
        }
        return nil
    }

    var parallel = s.conns[database]

    var i = seed
    for _ = range parallel {
        i = (i +) % uint(len(parallel))
        if bc := parallel[i]; bc.IsConnected() {
            return bc
        }
    }
    if !must {
        return nil
    }
    return parallel]
}
           

选出了bc之后,就把请求交给bc,等待处理

//请求放入BackendConn等待处理。如果request的sync.WaitGroup不为空,就加一,然后判断加一之后的值,如果加一之后couter为0,那么所有阻塞在counter上的goroutine都会得到释放
//将请求直接存入到BackendConn的chan *Request中,等待后续被取出并进行处理。
func (bc *BackendConn) PushBack(r *Request) {
    if r.Batch != nil {
        //请求处理之后,setResponse的时候,r.Batch会减1,表明请求已经处理完。
        //session的loopWriter里面收集请求结果的时候,会调用wait方法等一次的所有请求处理完成
        r.Batch.Add)
    }
    bc.input <- r
}
           

再回到LoopWriter,如果已经找不到文件位置的话,请转向/pkg/proxy/session.go。LoopWriter的作用就是合并请求的处理结果并返回给客户端。请求结果由BackendConn处理好之后,放在Request这个struct的*redis.Resp属性中,这里只需要把结果取出。可以看到,codis将请求与结果关联起来的方式,就是把结果当做request的一个属性

func (s *Session) loopWriter(tasks *RequestChan) (err error) {
    defer func() {
        s.CloseWithError(err)
        tasks.PopFrontAllVoid(func(r *Request) {
            s.incrOpFails(r, nil)
        })
        s.flushOpStats(true)
    }()

    var (
        breakOnFailure = s.config.SessionBreakOnFailure
        maxPipelineLen = s.config.SessionMaxPipeline
    )

    p := s.Conn.FlushEncoder()
    p.MaxInterval = time.Millisecond
    p.MaxBuffered = maxPipelineLen /

    //前面我们在tasks.PushBack(r)中,将请求放入了data []*Request切片,现在就是取出最早的请求及其处理结果
    //如果当前session的requestChan为空,就调用cond.wait让goroutine等待,直到调用pushback又放入请求为止
    return tasks.PopFrontAll(func(r *Request) error {
        resp, err := s.handleResponse(r)
        if err != nil {
            resp = redis.NewErrorf("ERR handle response, %s", err)
            if breakOnFailure {
                s.Conn.Encode(resp, true)
                return s.incrOpFails(r, err)
            }
        }
        if err := p.Encode(resp); err != nil {
            return s.incrOpFails(r, err)
        }
        fflush := tasks.IsEmpty()
        if err := p.Flush(fflush); err != nil {
            return s.incrOpFails(r, err)
        } else {
            s.incrOpStats(r, resp.Type)
        }
        if fflush {
            s.flushOpStats(false)
        }
        return nil
    })
}
func (s *Session) handleResponse(r *Request) (*redis.Resp, error) {
    //goroutine都会一直阻塞到所有请求处理完之后,前面我们已经看到,向RequestChan中添加一个请求的时候
    //request.Batch会加一,后面BackendConn调用setResponse,也就是完成处理的时候,又会调用Done方法减一
    r.Batch.Wait()
    //如果是单个的请求,例如SET,这里就为空了
    if r.Coalesce != nil {
        //如果是MSET这种请求,就需要调用之前自定义的Coalesce方法,将请求合并之后再返回
        if err := r.Coalesce(); err != nil {
            return nil, err
        }
    }
    if err := r.Err; err != nil {
        return nil, err
    } else if r.Resp == nil {
        return nil, ErrRespIsRequired
    }
    return r.Resp, nil
}
           

至于sharedBackendConn是何时创建以及如何处理请求的,请看笔者的另一篇博客Codis源码解析——sharedBackendConn

总结一下,在启动proxy的时候,会有一个goroutine专门负责监听发送到19000端口的redis请求。每次有redis请求过来,会新建一个session,并启动两个goroutine,loopReader和loopWriter。loopReader的作用是,Router根据请求指派到slot,并找到slot后面真正处理请求的BackendConn,再将请求放入BackendConn的RequestChan中,等待后续取出并进行处理,然后将请求写入session的RequestChan。loopWriter则将请求的结果从session的RequestChan(Request的*redis.Resp属性)中取出并返回,如果是MSET这种批处理的请求,要注意合并结果再返回

说明

如有转载,请注明出处:

http://blog.csdn.net/antony9118/article/details/75293115