天天看點

Redis源碼剖析和注釋(十九)--- Redis 事件處理實作Redis 事件處理實作

Redis 事件處理實作

1. Redis事件介紹

Redis

伺服器是一個

事件驅動程式

。下面先來簡單介紹什麼是事件驅動。

所謂事件驅動,就是當你輸入一條指令并且按下回車,然後消息被組裝成

Redis

協定的格式發送給

Redis

伺服器,這就會産生一個事件,

Redis

伺服器會接收該指令,處理該指令和發送回複,而當你沒有與伺服器進行互動時,那麼伺服器就會處于阻塞等待狀态,會讓出CPU進而進入睡眠狀态,當事件觸發時,就會被作業系統喚醒。事件驅動使CPU更高效的利用。

事件驅動是一種概括和抽象,也可以稱為I/O多路複用(I/O multiplexing),它的實作方式各個系統都不同,一會會說到Redis的方式。

redis

伺服器中,處理了兩類事件:

  • 檔案事件(file event):

    Redis

    伺服器通過套接字于用戶端(或其他Redis伺服器)進行連接配接,而檔案事件就是伺服器對套接字操作的抽象。
  • 時間事件(time event):Redis伺服器的一些操作需要在給定的事件點執行,而時間事件就是伺服器對這類定時操作的抽象。

2. 事件的抽象

Redis

将這兩個事件分别抽象成一個資料結構來管理。redis 所有源碼注釋

2.1 檔案事件結構

/* File event structure */
typedef struct aeFileEvent {
    // 檔案時間類型:AE_NONE,AE_READABLE,AE_WRITABLE
    int mask; /* one of AE_(READABLE|WRITABLE) */
    // 可讀處理函數
    aeFileProc *rfileProc;
    // 可寫處理函數
    aeFileProc *wfileProc;
    // 用戶端傳入的資料
    void *clientData;
} aeFileEvent;  //檔案事件
           

其中

rfileProc

wfileProc

成員分别為兩個函數指針,他們的原型為

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
           

這個函數是

回調函數

,如果目前檔案事件所指定的事件類型發生時,則會調用對應的

回調函數

處理該事件。函數指針與回調函數詳解

當事件就緒時,我們需要知道檔案事件的檔案描述符還有事件類型才能對于鎖定該事件,是以定義了

aeFiredEvent

結構統一管理:

/* A fired event */
typedef struct aeFiredEvent {
    // 就緒事件的檔案描述符
    int fd;
    // 就緒事件類型:AE_NONE,AE_READABLE,AE_WRITABLE
    int mask;
} aeFiredEvent; //就緒事件
           

2.2 時間事件結構

/* Time event structure */
typedef struct aeTimeEvent {
    // 時間事件的id
    long long id; /* time event identifier. */
    // 時間事件到達的時間的秒數
    long when_sec; /* seconds */
    // 時間事件到達的時間的毫秒數
    long when_ms; /* milliseconds */
    // 時間事件處理函數
    aeTimeProc *timeProc;
    // 時間事件終結函數
    aeEventFinalizerProc *finalizerProc;
    // 用戶端傳入的資料
    void *clientData;
    // 指向下一個時間事件
    struct aeTimeEvent *next;
} aeTimeEvent;  //時間事件
           

從這個結構中可以看出,時間事件表是一個連結清單,因為它有一個

next

指針域,指向下一個時間事件。

和檔案事件一樣,當時間事件所指定的事件發生時,也會調用對應的

回調函數

,結構成員

timeProc

finalizerProc

都是回調函數,函數原型如下:函數指針與回調函數詳解

typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
           

雖然對檔案事件和時間事件都做了抽象,

Redis

仍然需要對事件做整體抽象,于是定義了

aeEventLoop

結構。

2.3 事件狀态結構

/* State of an event based program */
typedef struct aeEventLoop {
    // 目前已注冊的最大的檔案描述符
    int maxfd;   /* highest file descriptor currently registered */
    // 檔案描述符監聽集合的大小
    int setsize; /* max number of file descriptors tracked */
    // 下一個時間事件的ID
    long long timeEventNextId;
    // 最後一次執行事件的時間
    time_t lastTime;     /* Used to detect system clock skew */
    // 注冊的檔案事件表
    aeFileEvent *events; /* Registered events */
    // 已就緒的檔案事件表
    aeFiredEvent *fired; /* Fired events */
    // 時間事件的頭節點指針
    aeTimeEvent *timeEventHead;
    // 事件處理開關
    int stop;
    // 多路複用庫的事件狀态資料
    void *apidata; /* This is used for polling API specific data */
    // 執行處理事件之前的函數
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;  //事件輪詢的狀态結構
           

aeEventLoop

結構儲存了一個

void *

類型的萬能指針

apidata

,是用來儲存輪詢事件的狀态的,也就是儲存底層調用的多路複用庫的事件狀态,關于Redis的多路複用庫的選擇,Redis包裝了常見的

select

epoll

evport

kqueue

,他們在編譯階段,根據不同的系統選擇性能最高的一個多路複用庫作為Redis的多路複用程式的實作,而且所有庫實作的接口名稱都是相同的,是以Redis多路複用程式底層實作是可以互換的。具體選擇庫的源碼為

// IO複用的選擇,性能依次下降,Linux支援 "ae_epoll.c" 和 "ae_select.c"
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif
           

也可以通過Redis用戶端的指令來檢視目前選擇的多路複用庫,

INFO server

:> INFO server
# Server
……
multiplexing_api:epoll
……
           

那麼,既然知道了多路複用庫的選擇,那麼我們來檢視一下

apidata

儲存的

epoll

模型的事件狀态結構:ae_epoll.c檔案中

typedef struct aeApiState {
    // epoll事件的檔案描述符
    int epfd;
    // 事件表
    struct epoll_event *events;
} aeApiState;   //事件的狀态
           

關于epoll的I/O多路複用模型可以檢視:Linux網絡程式設計—I/O複用模型之epoll

epoll模型的

struct epoll_event

的結構中定義這自己的事件類型,例如

EPOLLIN

POLLOUT

等等,但是Redis的檔案事件結構

aeFileEvent

中也在

mask

中定義了自己的事件類型,例如:

AE_READABLE

AE_WRITABLE

等,于是,就需要實作一個中間層将兩者的事件類型相聯系起來,這也就是之前提到的

ae_epoll.c

檔案中實作的相同的API,我們列出來:

// 建立一個epoll執行個體,儲存到eventLoop中
static int aeApiCreate(aeEventLoop *eventLoop)
// 調整事件表的大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize)  
// 釋放epoll執行個體和事件表空間
static void aeApiFree(aeEventLoop *eventLoop)
// 在epfd辨別的事件表上注冊fd的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// 在epfd辨別的事件表上注删除fd的事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// 等待所監聽檔案描述符上有事件發生
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// 傳回正在使用的IO多路複用庫的名字
static char *aeApiName(void)
           

這些API都是調用相應的底層多路複用庫來将Redis事件狀态結構

aeEventLoop

所關聯,就是将

epoll

的底層函數封裝起來,Redis實作事件時,隻需調用這些接口即可。我們檢視兩個重要的函數的源碼,看看是如何實作的

redis 所有源碼注釋

  • 向Redis事件狀态結構

    aeEventLoop

    的事件表

    event

    注冊一個事件,對應的是

    epoll_ctl

    函數
// 在epfd辨別的事件表上注冊fd的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    // EPOLL_CTL_ADD,向epfd注冊fd的上的event
    // EPOLL_CTL_MOD,修改fd已注冊的event
    // #define AE_NONE 0           //未設定
    // #define AE_READABLE 1       //事件可讀
    // #define AE_WRITABLE 2       //事件可寫
    // 判斷fd事件的操作,如果沒有設定事件,則進行關聯mask類型事件,否則進行修改
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    // struct epoll_event {
    //      uint32_t     events;      /* Epoll events */
    //      epoll_data_t data;        /* User data variable */
    // };
    ee.events = ;
    // 如果是修改事件,合并之前的事件類型
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    // 根據mask映射epoll的事件類型
    if (mask & AE_READABLE) ee.events |= EPOLLIN;   //讀事件
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;  //寫事件
    ee.data.fd = fd;    //設定事件所從屬的目标檔案描述符
    // 将ee事件注冊到epoll中
    if (epoll_ctl(state->epfd,op,fd,&ee) == -) return -;
    return ;
}
           
  • 等待所監聽檔案描述符上有事件發生,對應着底層

    epoll_wait

    函數
// 等待所監聽檔案描述符上有事件發生
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = ;

    // 監聽事件表上是否有事件發生
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec* + tvp->tv_usec/) : -);
    // 至少有一個就緒的事件
    if (retval > ) {
        int j;

        numevents = retval;
        // 周遊就緒的事件表,将其加入到eventLoop的就緒事件表中
        for (j = ; j < numevents; j++) {
            int mask = ;
            struct epoll_event *e = state->events+j;

            // 根據就緒的事件類型,設定mask
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            // 添加到就緒事件表中
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    // 傳回就緒的事件個數
    return numevents;
}
           

3. 事件的源碼實作

redis 所有源碼注釋

Redis

事件的源碼全部定義在

ae.c

檔案中,我們從事件的主函數

aeMain

說起,一步一步深入剖析。

// 事件輪詢的主函數
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = ;
    // 一直處理事件
    while (!eventLoop->stop) {
        // 執行處理事件之前的函數
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        //處理到時的時間事件和就緒的檔案事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}
           

這個事件的主函數

aeMain

很清楚的可以看到,如果伺服器一直處理事件,那麼就是一個死循環,而一個最典型的事件驅動,就是一個死循環。調用處理事件的函數

aeProcessEvents

,他們參數是一個事件狀态結構

aeEventLoop

AE_ALL_EVENTS

,源碼如下:

// 處理到時的時間事件和就緒的檔案事件
// 函數傳回執行的事件個數
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = , numevents;

    /* Nothing to do? return ASAP */
    // 如果什麼事件都沒有設定則直接傳回
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return ;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    // 請注意,既然我們要處理時間事件,即使沒有要處理的檔案事件,我們仍要調用select(),以便在下一次事件準備啟動之前進行休眠

    // 目前還沒有要處理的檔案事件,或者設定了時間時間但是沒有設定不阻塞辨別
    if (eventLoop->maxfd != - ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 如果設定了時間事件而沒有設定不阻塞辨別
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 擷取最近到時的時間事件
            shortest = aeSearchNearestTimer(eventLoop);
        // 擷取到了最早到時的時間事件
        if (shortest) {
            long now_sec, now_ms;
            // 擷取目前時間
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            // 等待該時間事件到時所需要的時長
            long long ms =
                (shortest->when_sec - now_sec)* +
                shortest->when_ms - now_ms;

            // 如果沒到時
            if (ms > ) {
                // 儲存時長到tvp中
                tvp->tv_sec = ms/;
                tvp->tv_usec = (ms % )*;
            // 如果已經到時,則将tvp的時間設定為0
            } else {
                tvp->tv_sec = ;
                tvp->tv_usec = ;
            }

        // 沒有擷取到了最早到時的時間事件,時間事件連結清單為空
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            // 如果設定了不阻塞辨別
            if (flags & AE_DONT_WAIT) {
                // 将tvp的時間設定為0,就不會阻塞
                tv.tv_sec = tv.tv_usec = ;
                tvp = &tv;
            } else {
                // 阻塞到第一個時間事件的到來
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        // 等待所監聽檔案描述符上有事件發生
        // 如果tvp為NULL,則阻塞在此,否則等待tvp設定阻塞的時間,就會有時間事件到時
        // 傳回了就緒檔案事件的個數
        numevents = aeApiPoll(eventLoop, tvp);
        // 周遊就緒檔案事件表
        for (j = ; j < numevents; j++) {
            // 擷取就緒檔案事件的位址
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            // 擷取就緒檔案事件的類型,檔案描述符
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = ;

        /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 如果是檔案可讀事件發生
            if (fe->mask & mask & AE_READABLE) {
                // 設定讀事件辨別 且 調用讀事件方法處理讀事件
                rfired = ;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 如果是檔案可寫事件發生
            if (fe->mask & mask & AE_WRITABLE) {
                // 讀寫事件的執行發法不同,則執行寫事件,避免重複執行相同的方法
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;    //執行的事件次數加1
        }
    }
    /* Check time events */
    // 執行時間事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
           

剛才提到該函數的一個參數是

AE_ALL_EVENTS

,他的定義在

ae.h

中,定義如下:

#define AE_FILE_EVENTS 1                                //檔案事件
#define AE_TIME_EVENTS 2                                //時間事件
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)   //檔案和時間事件
#define AE_DONT_WAIT 4                                  //不阻塞等待辨別
           

很明顯,

flags

AE_FILE_EVENTS

AE_TIME_EVENTS

或的結果,他們的含義如下:

  • 如果flags = 0,函數什麼都不做,直接傳回
  • 如果flags設定了 AE_ALL_EVENTS ,則執行所有類型的事件
  • 如果flags設定了 AE_FILE_EVENTS ,則執行檔案事件
  • 如果flags設定了 AE_TIME_EVENTS ,則執行時間事件
  • 如果flags設定了 AE_DONT_WAIT ,那麼函數處理完事件後直接傳回,不阻塞等待

Redis伺服器在沒有被事件觸發時,就會阻塞等待,因為沒有設定

AE_DONT_WAIT

辨別。但是他不會一直的死等待,等待檔案事件的到來,因為他還要處理時間時間,是以,在調用

aeApiPoll

進行監聽之前,先從時間事件表中擷取一個最近到達的時間時間,根據要等待的時間建構一個

struct timeval tv, *tvp

結構的變量,這個變量儲存着伺服器阻塞等待檔案事件的最長時間,一旦時間到達而沒有觸發檔案事件,

aeApiPoll

函數就會停止阻塞,進而調用

processTimeEvents

處理時間事件,因為Redis伺服器設定一個對自身資源和狀态進行檢查的周期性檢查的時間事件,而該函數就是

timeProc

所指向的回調函數

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
           

如果在阻塞等待的最長時間之間,觸發了檔案事件,就會先執行檔案事件,後執行時間事件,是以處理時間事件通常比預設的會晚一點。

而執行檔案事件

rfileProc

wfileProc

也是調用了回調函數,Redis将檔案事件的處理分為了好幾種,用于處理不同的網絡通信需求,下面列出回調函數的原型:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
           
  • acceptTcpHandler:用于

    accept

    client的

    connect

  • acceptUnixHandler:用于

    accept

    client的本地

    connect

  • sendReplyToClient:用于向client發送指令回複。
  • readQueryFromClient:用于讀入client發送的請求。

接下來,我們檢視擷取最快達到的時間事件的函數

aeSearchNearestTimer

實作

// 尋找第一個快到時的時間事件
// 這個操作是有用的知道有多少時間可以選擇該事件設定為不用推遲任何事件的睡眠中。
// 如果事件連結清單沒有時間将傳回NULL。
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    // 時間事件頭節點位址
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;

    // 周遊所有的時間事件
    while(te) {
        // 尋找第一個快到時的時間事件,儲存到nearest中
        if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}
           

這個函數沒什麼,就是周遊連結清單,找到最小值。我們重點看執行時間事件的函數

processTimeEvents

實作

/* Process time events */
// 執行時間事件
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = ;
    aeTimeEvent *te, *prev;
    long long maxId;
    time_t now = time(NULL);

    // 這裡嘗試發現時間混亂的情況,上一次處理事件的時間比目前時間還要大
    // 重置最近一次處理事件的時間
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = ;
            te = te->next;
        }
    }
    // 設定上一次時間事件處理的時間為目前時間
    eventLoop->lastTime = now;

    prev = NULL;
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-;   //目前時間事件表中的最大ID
    // 周遊時間事件連結清單
    while(te) {
        long now_sec, now_ms;
        long long id;

        /* Remove events scheduled for deletion. */
        // 如果時間事件已被删除了
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            // 從事件連結清單中删除事件的節點
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            // 調用時間事件終結方法清楚該事件
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        // 確定我們不處理在此疊代中由時間事件建立的時間事件。 請注意,此檢查目前無效:我們總是在頭節點添加新的計時器,但是如果我們更改實施細節,則該檢查可能會再次有用:我們将其保留在未來的防禦
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        // 擷取目前時間
        aeGetTime(&now_sec, &now_ms);
        // 找到已經到時的時間事件
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            // 調用時間事件處理方法
            retval = te->timeProc(eventLoop, id, te->clientData);
            // 時間事件次數加1
            processed++;
            // 如果不是定時事件,則繼續設定它的到時時間
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            // 如果是定時時間,則retval為-1,則将其時間事件删除,惰性删除
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        // 更新前驅節點指針和後繼節點指針
        prev = te;
        te = te->next;
    }
    return processed;   //傳回執行事件的次數
}
           

如果時間事件不存在,則就調用

finalizerProc

指向的回調函數,删除目前的時間事件。如果存在,就調用

timeProc

指向的回調函數處理時間事件。Redis的時間事件分為兩類

  • 定時事件:讓一段程式在指定的時間後執行一次。
  • 周期性事件:讓一段程式每隔指定的時間後執行一次。

如果目前的時間事件是周期性,那麼就會在将時間周期添加到周期事件的到時時間中。如果是定時事件,則将該時間事件删除。

至此,Redis事件的實作就剖析完畢,但是事件的其他API,例如:建立事件,删除事件,調整事件表的大小等等都沒有列出,所有源碼的剖析,可以上github上檢視:redis 所有源碼注釋