天天看點

Redis是如何建立連接配接和處理指令的一:前言二:基礎架構三:源碼探究四:總結

一:前言

前幾天寫了如何調試Redis:《快速編譯調試 Redis》,對于閱讀源碼來說,調試是基本功,是以如果想快速上手調試的話,建議先看看上面這篇文章。

今天要說的是 Redis 的請求監聽,通俗點說,就是Redis是如何監聽用戶端發出的set、get等指令的。

二:基礎架構

衆所周知,Redis 是單程序單線程架構,雖然是單程序單線程,但是Redis的性能卻毫不遜色,能輕松應對一般的高并發場景,那麼Redis究竟是施了什麼魔法呢?

其實 Redis 的原理和 Nginx 差不多,都利用了 IO 多路複用來提高處理能力,所謂多路複用,就是一個線程可以同時處理多個IO操作,當 某個 IO 操作 Ready 時,作業系統會主動通知程序。使用 IO 多路複用,我們可以使用 epoll、kqueue、select,API 都差不多。

與Redis 不同的是,Nginx 并不是單程序架構,而是采用了多程序來處理請求。Nginx跑起來後會先啟動Maste程序,Master程序接着啟動多個 Worker 程序,每個Worker 程序都會參與請求的監聽和處理。這樣可以充分發揮CPU的多核特性。

想對 Nginx 多程序架構有更深了解的同學,可以看下我之前的一篇文章:《動手打造Nginx多程序架構》。

雖然 Redis 是單程序單線程,不能利用多核,但同樣也避免了多程序的并發問題,也就沒有了鎖帶來的開銷。

三:源碼探究

Redis 入口是 server.c 中的 main()方法,main()中會調用 initServer()初始化 Redis 服務。

3.1 初始化 Redis 服務

Redis 為了進行事件監聽,特地封裝了一個 struct:aeEventLoop,定義在 ae.c 中。

為什麼需要 aeEventLoop 呢?因為 Redis 針對于 epoll、evport、select 還有 kqueue 的 API做了封裝,分别為 ae_epoll、ae_evport、ae_select 和 ae_kqueue。但是 Redis 在使用監聽的時候,統一調用的都是 aeEventLoop 中的方法。ae.c 在預處理階段使用了條件編譯,這樣便可以選擇平台支援的 API 來使用多路複用。

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif           

因為筆者用的是 Mac ,Redis 選擇了 Mac 平台支援的 kqueue 使用多路複用,是以後面源碼分析的時候,我都會基于 kqueue 來講解。

(1)建立核心事件隊列

initServer()第一件做的事情就是為目前服務建立了一個 aeEventLoop,後續的所有指令監聽,都是基于 aeEventLoop 來做的。

server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);           

aeCreateEventLoop()内部會調用 aeApiCreate(),選擇性的調用epoll、select 或者 kqueue 的 API,建立事件監聽。這裡拿 kqueue 來舉例子,我們可以看下 ae_kqueue.c 中的實作。

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    ......
    state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize);
    state->kqfd = kqueue();
    ......
    eventLoop->apidata = state;
    return 0;
}           

上面建立了一個 aeApiState,并且調用了 kqueue 的API:kqueue():

// 建立一個核心消息隊列,傳回隊列描述符
int  kqueue(void);           

這裡建立了一個核心事件隊列,并将該隊列的檔案描述符存入了 state->kqfd 中。

(2)建立套接字,監聽端口

要想監聽用戶端的指令,首先要做的就是監聽端口,initServer()在建立好 aeEventLoop 後,就會開始執行端口監聽操作:

listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)           

其中 server.ipfd 是個數組,用來存放端口監聽後建立的描述符。為什麼是個數組?因為Redis 可以同時 bind 多個 IP 位址,是以listenToPort()内部會周遊配置檔案中配置的多個IP位址,依次進行監聽,并将建立的描述符存入 server.ipfd [] 中。

(3)注冊監聽到核心隊列

監聽好端口後,initServer()會周遊剛剛建立好的套接字描述符 ipfd [],依次注冊到事件監聽中。

aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)

我們進入aeCreateFileEvent()中看看。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    ......
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    ......
    return AE_OK;
}           

上面主要是初始化了 aeFileEvent,存放在 eventLoop 中的events [] 内。并且調用 aeApiAddEvent(),将監聽端口的描述符注冊到(1)中建立的核心隊列中。對于 kqueue來說,這裡調用的是 kqueue 的 EV_SET():

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct kevent ke;

    if (mask & AE_READABLE) {
        EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
        if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
    }
    if (mask & AE_WRITABLE) {
        EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
        if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
    }
    return 0;
}           

3.2 啟動事件監聽循環,監聽請求

經過了上面的一系列初始化操作,Redis 就會正式進入事件監聽的循環中,即 aeMain()。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}           

這段代碼看起來很簡單,隻要事件監聽沒有停止,Redis 就會一直循環調用aeProcessEvents()處理事件。

aeProcessEvents()中的核心邏輯就是兩步:

(1)調用 aeApiPoll ()來等待接收事件。

numevents = aeApiPoll(eventLoop, tvp);           

我們看下使用kqueue時,ae_kqueue.c 中的實作。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    ......
    retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        NULL);
    ......
    numevents = retval;
    ......
    return numevents;
}           

這裡調用的是 kevent()來接收事件。kevent()描述如下:

// 用途:注冊\反注冊 監聽事件,等待事件通知
// kq,上面建立的消息隊列描述符
// changelist,需要注冊的事件
// changelist,changelist數組大小
// eventlist,核心會把傳回的事件放在該數組中
// nevents,eventlist數組大小
// timeout,等待核心傳回事件的逾時事件,NULL 即為無限等待
int  kevent(int kq, 
           const struct kevent *changelist, int nchanges,
           struct kevent *eventlist, int nevents,
           const struct timespec *timeout);           

(2)周遊傳回的就緒事件數組,處理事件。

for (j = 0; j < numevents; j++) {
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int fired = 0; /* Number of events fired for current fd. */

    // 處理可讀事件  
    if (!invert && fe->mask & mask & AE_READABLE) {
        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
        fired++;
    }

    // 處理可寫事件
    if (fe->mask & mask & AE_WRITABLE) {
        if (!fired || fe->wfileProc != fe->rfileProc) {
            fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            fired++;
        }
    }

    if (invert && fe->mask & mask & AE_READABLE) {
        if (!fired || fe->wfileProc != fe->rfileProc) {
            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            fired++;
        }
    }

    processed++;
}           

每次周遊,都會先從eventLoop 的 events[] 中,根據描述符取出對應的 aeFileEvent,接着調用 aeFileEvent 的 rfileProc()或者 wfileProc()來處理事件,那麼 aeFileEvent 的 fileProc()時從哪來的呢?

記得 initServer()時,Redis 調用 aeCreateFileEvent()将監聽端口後的套接字描述符注冊到核心事件隊列中嗎?

aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)           

此時傳入的 acceptTcpHandler()就作為 rFileProc ,設定到此次建立的 aeFileEvent 之中 了。 acceptTcpHandler()的作用就是處理新用戶端的連接配接請求,與新用戶端建立 TCP 連接配接。

在 Redis 服務端調用 accept()與用戶端建立連接配接後,會建立一個 client 對象,用來描述該連接配接。接着 Redis 會将accept()傳回的新連接配接的描述符再次調用aeCreateFileEvent(),注冊到核心事件隊列中,用來接收該連接配接後續的事件,說白了,就是監聽後續該用戶端的 set、get 等請求:

aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR           

到現在讀者應該知道aeCreateFileEvent()的第四個參數時用來幹嘛的了,就是監聽到事件後用來處理事件的。可見用來處理用戶端指令請求的函數是:readQueryFromClient()。

3.3 處理用戶端指令

現在我們知道了用來處理用戶端指令請求的函數是 readQueryFromClient(),由函數名就能看出它的意義:讀取用戶端的查詢請求。

readQueryFromClient()調用了 processInputBuffer()從緩沖區讀取并解析請求内容,之後 processInputBuffer()便會調用 processCommand()處理指令。

(1)查詢指令

processCommand()第一步要做的就是根據請求找到對應的指令。Redis 的所有指令都預先定義在 server.c 的 redisCommandTable [] 中:

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    ......
}           

redisCommandTable [] 中定義了每個指令對應的處理函數,如set:setCommand, get:getCommand。

Redis 在啟動後會調用 server.c 的 initServerConfig()方法用來初始化配置,其中就調用了 populateCommandTable()。populateCommandTable()的作用就是周遊redisCommandTable [],将所有的指令以 name 作為 key,以自身 redisCommand 作為value,存入到一個字典中:server.commands。

processCommand()調用了lookupCommand()查詢指令。

c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);           

那麼該去哪找對應的指令呢?當然就是上面所說這個字典中。

struct redisCommand *lookupCommand(sds name) {
    return dictFetchValue(server.commands, name);
}           

(2)執行指令

找到請求對應的指令後,該幹嘛?想都不用想,當然是去執行指令了。

if (c->flags & CLIENT_MULTI &&
    c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
    c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
    queueMultiCommand(c);
    addReply(c,shared.queued);
} else {
    call(c,CMD_CALL_FULL);
    c->woff = server.master_repl_offset;
    if (listLength(server.ready_keys))
        handleClientsBlockedOnKeys();
}           

上面這段代碼的門道就在于:

如果處于事務中,就将指令先放入隊列中,不執行。

如果不在事務中,就直接調用call()執行指令。

call()中會調用指令對應的處理函數:

c->cmd->proc(c);           

如 set 指令對應的處理函數就是 t_string.c 中的 setCommand()。

四:總結

一步一步跟下來,對于網絡請求這塊,Redis 和 Nginx 确實太像了,都是用了IO多路複用。

Redis 監聽指令主要就是下面幾個步驟。

(1)建立套接字,監聽端口,也就是監聽新用戶端的建立連接配接請求。

(2)建立核心事件隊列,并注冊上述的套接字描述符到隊列中。

(3)開啟循環,監聽隊列中的就緒事件。

(4)當端口有新事件時,調用 accept()與新用戶端建立連接配接,并再次将新連接配接的描述符注冊到核心事件隊列中,監聽該TCP連接配接上的事件。

(5)當與用戶端的TCP連接配接上有新事件時,調用對應的事件處理函數,從該連接配接上讀取并解析請求,并執行請求對應的指令。

繼續閱讀