天天看點

Redis代碼閱讀3--Redis網絡監聽(2)



這篇文章接上一篇,主要介紹Redis網絡監聽流程的各個步驟。

  1. aeCreateEventLoop :建立用于循環監聽的 eventLoop

    , Redis 支援主流的三種事件觸發機制: select ,epoll, kqueue,

    可以通過在 config.h 裡面配置 HAVE_EPOLL/ HAVE_KQUEUE

    來根據不同的作業系統選擇合适的機制:調用 ae_epoll.c/ae_select.c/ae_kqueue.c中的 aeApiCreate;建立

    eventLoop 的時候沒有指定 beforesleep

    ,在開始循環監聽前将函數 beforeSleep 綁定到 eventLoop

    上,該函數也放在後面介紹

    C代碼

    Redis代碼閱讀3--Redis網絡監聽(2)
    Redis代碼閱讀3--Redis網絡監聽(2)
    Redis代碼閱讀3--Redis網絡監聽(2)
    1. /* test for polling API */  
    2. #ifdef __linux__  
    3. #define HAVE_EPOLL 1  
    4. #endif  
    5. #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)  
    6. #define HAVE_KQUEUE 1  
    /* test for polling API */
    #ifdef __linux__
    #define HAVE_EPOLL 1
    #endif
    
    #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
    #define HAVE_KQUEUE 1
    #endif
          
  2. aeCreateTimeEvent/:建立定時事件,注冊了定時時間函數 serverCron,作用放到後面介紹;
  3. aeCreateFileEvent:注冊了一個讀 I/O事件,綁定了函數 acceptTcpHandler(同樣也放到後面介紹 ),如果多路複用采用epoll機制的話,這采用LT模式進行觸發;
  4. 建立好server.ae後,通過aeMain這個方法開始網絡監聽;此處的代碼是:
    Redis代碼閱讀3--Redis網絡監聽(2)
    Redis代碼閱讀3--Redis網絡監聽(2)
    Redis代碼閱讀3--Redis網絡監聽(2)
    1. void aeMain(aeEventLoop *eventLoop) {  
    2.     eventLoop->stop = 0;  
    3.     while (!eventLoop->stop) {  
    4.         if (eventLoop->beforesleep != NULL)  
    5.             eventLoop->beforesleep(eventLoop);  
    6.         aeProcessEvents(eventLoop, AE_ALL_EVENTS);  
    7.     }  
    8. }  
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS);
        }
    }      
     在每次處理事件之前,都要執行一遍server.ae中設定的beforesleep方法,下面就介紹下beforesleep;
  5. Beforesleep:顧名思義,這個方法在Redis每次進入sleep/wait去等待監聽的端口發生I/O事件之前被調用(這話太拗口了。。。。),還是來看代碼:
    Redis代碼閱讀3--Redis網絡監聽(2)
    Redis代碼閱讀3--Redis網絡監聽(2)
    Redis代碼閱讀3--Redis網絡監聽(2)
    1. void beforeSleep(struct aeEventLoop *eventLoop) {  
    2.     REDIS_NOTUSED(eventLoop);  
    3.     listNode *ln;  
    4.     redisClient *c;  
    5.     /* Awake clients that got all the swapped keys they requested */  
    6.     if (server.vm_enabled && listLength(server.io_ready_clients)) {  
    7.         listIter li;  
    8.         listRewind(server.io_ready_clients,&li);  
    9.         while((ln = listNext(&li))) {  
    10.             c = ln->value;  
    11.             struct redisCommand *cmd;  
    12.             /* Resume the client. */  
    13.             listDelNode(server.io_ready_clients,ln);  
    14.             c->flags &= (~REDIS_IO_WAIT);  
    15.             server.vm_blocked_clients--;  
    16.             aeCreateFileEvent(server.el, c->fd, AE_READABLE,  
    17.                 readQueryFromClient, c);  
    18.             cmd = lookupCommand(c->argv[0]->ptr);  
    19.             redisAssert(cmd != NULL);  
    20.             call(c,cmd);  
    21.             resetClient(c);  
    22.             /* There may be more data to process in the input buffer. */  
    23.             if (c->querybuf && sdslen(c->querybuf) > 0)  
    24.                 processInputBuffer(c);  
    25.         }  
    26.     /* Try to process pending commands for clients that were just unblocked. */  
    27.     while (listLength(server.unblocked_clients)) {  
    28.         ln = listFirst(server.unblocked_clients);  
    29.         redisAssert(ln != NULL);  
    30.         c = ln->value;  
    31.         listDelNode(server.unblocked_clients,ln);  
    32.         c->flags &= ~REDIS_UNBLOCKED;  
    33.         /* Process remaining data in the input buffer. */  
    34.         if (c->querybuf && sdslen(c->querybuf) > 0)  
    35.             processInputBuffer(c);  
    36.     /* Write the AOF buffer on disk */  
    37.     flushAppendOnlyFile();  
    void beforeSleep(struct aeEventLoop *eventLoop) {
        REDIS_NOTUSED(eventLoop);
        listNode *ln;
        redisClient *c;
    
        /* Awake clients that got all the swapped keys they requested */
        if (server.vm_enabled && listLength(server.io_ready_clients)) {
            listIter li;
    
            listRewind(server.io_ready_clients,&li);
            while((ln = listNext(&li))) {
                c = ln->value;
                struct redisCommand *cmd;
    
                /* Resume the client. */
                listDelNode(server.io_ready_clients,ln);
                c->flags &= (~REDIS_IO_WAIT);
                server.vm_blocked_clients--;
                aeCreateFileEvent(server.el, c->fd, AE_READABLE,
                    readQueryFromClient, c);
                cmd = lookupCommand(c->argv[0]->ptr);
                redisAssert(cmd != NULL);
                call(c,cmd);
                resetClient(c);
                /* There may be more data to process in the input buffer. */
                if (c->querybuf && sdslen(c->querybuf) > 0)
                    processInputBuffer(c);
            }
        }
    
        /* Try to process pending commands for clients that were just unblocked. */
        while (listLength(server.unblocked_clients)) {
            ln = listFirst(server.unblocked_clients);
            redisAssert(ln != NULL);
            c = ln->value;
            listDelNode(server.unblocked_clients,ln);
            c->flags &= ~REDIS_UNBLOCKED;
    
            /* Process remaining data in the input buffer. */
            if (c->querybuf && sdslen(c->querybuf) > 0)
                processInputBuffer(c);
        }
    
        /* Write the AOF buffer on disk */
        flushAppendOnlyFile();
    }      

     這個方法做了三件事情:

    I.    如果Redi開啟了Virtual memory,那麼某些clients請求的keys可能因為被swap了,是以這些client會被block住,當這些clients請求的keys又被swap到記憶體中時,則這些被block住的clients應該unblock,然後被處理;io_ready_clients就是用來維護這些clients的,為了盡快響應client的請求,是以在每次sleep前都先處理這些請求

    II.    某些Redis操作是blocking的,如BLPOP,那麼執行這些操作的clients可能會被block住,unblocked_clients這個list就是用來維護那些剛被unblock的clients,如果這個list不為空,則也要盡快響應這些clients

    III.    flushAppendOnlyFile;因為clients的Socket的write隻能在eventLoop裡面進行,而flushAppendOnlyFile又是在每次sleep之前被調用,是以在eventLoop裡面的所有AOF writes都是先寫到記憶體裡的一塊buffer裡面,flushAppendOnlyFile則負責把這個buffer内容flush到disk;

  6. 執行完beforesleep後aeprocessEvents,該方法主要是處理各種監聽到的檔案讀寫事件和到期響應的定時事件,因為這個方法的代碼比較長,而且邏輯簡單,就不貼過來了,簡單介紹下過程:                                      a)    首先通過周遊eventLoop中注冊的timeEvent找出離目前最近timeEvent(即shortest)。

    b)    調用epoll_wait()方法,等待I/O事件的發生, 為了盡快響應時間事件,epoll_wait()方法的等待時間為shortest與目前時間的內插補點,如果該內插補點小于零,則epoll_wait()輪詢至有I/O事件發生;

    c)    響應eventLoop中fired的aeFileEvent,這裡調用的就是之前設定的檔案處理函數acceptTcpHandler。

    d)    響應完I/O事件後,則通過timeEventHead周遊timeEvent,逐一響應timeProc--serverCron。在響應定時事件的時候 需要注意幾點點:

    Redis代碼閱讀3--Redis網絡監聽(2)
    Redis代碼閱讀3--Redis網絡監聽(2)
    Redis代碼閱讀3--Redis網絡監聽(2)
    1. static int processTimeEvents(aeEventLoop *eventLoop) {  
    2.     int processed = 0;  
    3.     aeTimeEvent *te;  
    4.     long long maxId;  
    5.     te = eventLoop->timeEventHead;  
    6.    <span style="color: rgb(255, 0, 0);"> maxId = eventLoop->timeEventNextId-1;</span>  
    7.     while(te) {  
    8.         long now_sec, now_ms;  
    9.         long long id;  
    10.       <span style="color: rgb(255, 0, 0);">  if (te->id > maxId) </span>  
    11. {  
    12.             te = te->next;  
    13.             continue;  
    14.         aeGetTime(&now_sec, &now_ms);  
    15.         if (now_sec > te->when_sec ||  
    16.             (now_sec == te->when_sec && now_ms >= te->when_ms))  
    17.         {  
    18.             int retval;  
    19.             id = te->id;  
    20.             retval = te->timeProc(eventLoop, id, te->clientData);  
    21.             processed++;  
    22.             /* After an event is processed our time event list may 
    23.              * no longer be the same, so we restart from head. 
    24.              * Still we make sure to don't process events registered 
    25.              * by event handlers itself in order to don't loop forever. 
    26.              * To do so we saved the max ID we want to handle. 
    27.              * 
    28.              * FUTURE OPTIMIZATIONS: 
    29.              * Note that this is NOT great algorithmically. Redis uses 
    30.              * a single time event so it's not a problem but the right 
    31.              * way to do this is to add the new elements on head, and 
    32.              * to flag deleted elements in a special way for later 
    33.              * deletion (putting references to the nodes to delete into 
    34.              * another linked list). */  
    35.           <span style="color: rgb(255, 0, 0);">  if (retval != AE_NOMORE) {  
    36.                 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);  
    37.             } else {  
    38.                 aeDeleteTimeEvent(eventLoop, id);  
    39.             }</span>  
    40.            <span style="color: rgb(255, 0, 0);"> </span>  
    41. <span style="color: rgb(255, 0, 0);"><span style="background-color: rgb(255, 255, 255);">te = eventLoop->timeEventHead;</span>  
    42. </span>  
    43.         } else {  
    44.     return processed;  
    static int processTimeEvents(aeEventLoop *eventLoop) {
        int processed = 0;
        aeTimeEvent *te;
        long long maxId;
    
        te = eventLoop->timeEventHead;
       <span style="color: rgb(255, 0, 0);"> maxId = eventLoop->timeEventNextId-1;</span>
    
    
    
    
        while(te) {
            long now_sec, now_ms;
            long long id;
    
          <span style="color: rgb(255, 0, 0);">  if (te->id > maxId) </span>
    
    
    
    {
                te = te->next;
                continue;
            }
            aeGetTime(&now_sec, &now_ms);
            if (now_sec > te->when_sec ||
                (now_sec == te->when_sec && now_ms >= te->when_ms))
            {
                int retval;
    
                id = te->id;
                retval = te->timeProc(eventLoop, id, te->clientData);
                processed++;
                /* After an event is processed our time event list may
                 * no longer be the same, so we restart from head.
                 * Still we make sure to don't process events registered
                 * by event handlers itself in order to don't loop forever.
                 * To do so we saved the max ID we want to handle.
                 *
                 * FUTURE OPTIMIZATIONS:
                 * Note that this is NOT great algorithmically. Redis uses
                 * a single time event so it's not a problem but the right
                 * way to do this is to add the new elements on head, and
                 * to flag deleted elements in a special way for later
                 * deletion (putting references to the nodes to delete into
                 * another linked list). */
              <span style="color: rgb(255, 0, 0);">  if (retval != AE_NOMORE) {
                    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
                } else {
                    aeDeleteTimeEvent(eventLoop, id);
                }</span>
    
    
    
    
               <span style="color: rgb(255, 0, 0);"> </span>
    
    
    
    <span style="color: rgb(255, 0, 0);"><span style="background-color: rgb(255, 255, 255);">te = eventLoop->timeEventHead;</span>
    
    
    
    </span>
    
    
    
    
            } else {
                te = te->next;
            }
        }
        return processed;
    }      
    •  因為響應一個定時事件後,eventLoop裡面的定時事件連結清單可能會改變了,是以又要從頭結點開始周遊定時事件連結清單;
    • 因為每次都要從頭結點開始周遊定時事件連結清單,是以要考慮如何避免響應循環調用,即在響應定時事件a時,如果a的處理函數timeProc中又register了新的定時事件b,如果響應完事件a後,又響應b的話,那麼就會造成循環響應。為了解決這個情況,redis在eventLoop裡維護了一個timeEventNextId,即下一個定時事件的id,比如目前eventLoop的隻有一個timeEvent  a,那麼timeEventNextId=2,a->id = 1當a的timeProc方法又注冊了timeEvent 

      b,那麼timeEventNextId = 3,b->id = 2.那麼在redis在周遊定時事件開始的時候将周遊前的eventLoop裡面的maxId= timeEventNextId-1儲存起來,在周遊定時事件的時候,如果某個timeEvent->id >maxId,則跳過這個事件。

    • 作者也意識到了每次都從頭結點開始周遊定時事件不是一個好的算法,但是由于目前Redis裡面隻有一個定時事件,是以目前對redis來說不是個問題,但是作者也提到在未來的版本會對此進行改進   
  7. acceptTcpHandler:這個方法主要是監聽網絡端口:                                                                             i.    通過調用anetTcpAccept方法獲得監聽端口上的client connection;

    ii.    然後調用acceptCommonHandler建立redisClient對象,如果目前連接配接的client的數量大于配置的最大client數量,則拒絕目前連接配接,并傳回” max number of clients reached”提示資訊;

    iii.    調用createClient方法建立redisClient,同時注冊新的fileEvent(AE_READABLE),并綁定處理函數為readQueryFromClient;

    Redis代碼閱讀3--Redis網絡監聽(2)
    Redis代碼閱讀3--Redis網絡監聽(2)
    Redis代碼閱讀3--Redis網絡監聽(2)
    1. redisClient *createClient(int fd) {  
    2.     redisClient *c = zmalloc(sizeof(redisClient));  
    3.     c->bufpos = 0;  
    4.     anetNonBlock(NULL,fd);  
    5.     anetTcpNoDelay(NULL,fd);  
    6.     if (!c) return NULL;  
    7.     <span style="color: rgb(255, 0, 0);">if (aeCreateFileEvent(server.el,fd,AE_READABLE,  
    8.         readQueryFromClient, c) == AE_ERR)</span>  
    9.     {  
    10.         close(fd);  
    11.         zfree(c);  
    12.         return NULL;  
    13.     .........  
    redisClient *createClient(int fd) {
        redisClient *c = zmalloc(sizeof(redisClient));
        c->bufpos = 0;
    
        anetNonBlock(NULL,fd);
        anetTcpNoDelay(NULL,fd);
        if (!c) return NULL;
        <span style="color: rgb(255, 0, 0);">if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)</span>
    
    
    
    
        {
            close(fd);
            zfree(c);
            return NULL;
        }
        .........
    }      
      iv.    readQueryFromClient:從指定的Socket中讀取client發送過來的資料,并按照Redis的協定(後面将單獨介紹)進行解析組裝成Redis的各個command,然後通過查找commandTable,執行command
  8.  _installWriteEvent:上面介紹的檔案事件都是AE_READABLE事件,但Redis在執行完client請求後的指令後,向Client端return資料,就是往Socket寫入資料,這使一個AE_ AE_WRITABLE事件。Redis執行完command後,調用addReply方法,然後在這個方法裡面調用installWriteEvent來注冊一個AE_WRITABLE事件,并綁定事件處理函數sendReplyToClient,用來把資料發送到client。
  9. serverCron: 介紹完fileEvent的處理函數後,最後我們來介紹timeEvent的處理函數。顧名思義,serverCron就是Redis Server的定時計劃任務。這個方法比較複雜,處理的事情也很多,主要集中在記錄Redis的運作情況(memory,clients等),AOF write, VM Swap和BGSAVE等和Redis正常運作息息相關的事項。這個方法的代碼很多, 是以單獨介紹

繼續閱讀