天天看點

Redis 和 I/O 多路複用

最近在看 UNIX 網絡程式設計并研究了一下 Redis 的實作,感覺 Redis 的源代碼十分适合閱讀和分析,其中 I/O 多路複用(mutiplexing)部分的實作非常幹淨和優雅,在這裡想對這部分的内容進行簡單的整理。

幾種 I/O 模型

為什麼 Redis 中要使用 I/O 多路複用這種技術呢?

首先,Redis 是跑在單線程中的,所有的操作都是按照順序線性執行的,但是由于讀寫操作等待使用者輸入或輸出都是阻塞的,是以 I/O 操作在一般情況下往往不能直接傳回,這會導緻某一檔案的 I/O 阻塞導緻整個程序無法對其它客戶提供服務,而 I/O 多路複用就是為了解決這個問題而出現的。

Blocking I/O

先來看一下傳統的阻塞 I/O 模型到底是如何工作的:當使用 

read

 或者 

write

 對某一個檔案描述符(File Descriptor 以下簡稱 FD)進行讀寫時,如果目前 FD 不可讀或不可寫,整個 Redis 服務就不會對其它的操作作出響應,導緻整個服務不可用。

這也就是傳統意義上的,也就是我們在程式設計中使用最多的阻塞模型:

阻塞模型雖然開發中非常常見也非常易于了解,但是由于它會影響其他 FD 對應的服務,是以在需要處理多個用戶端任務的時候,往往都不會使用阻塞模型。

I/O 多路複用

雖然還有很多其它的 I/O 模型,但是在這裡都不會具體介紹。

阻塞式的 I/O 模型并不能滿足這裡的需求,我們需要一種效率更高的 I/O 模型來支撐 Redis 的多個客戶(redis-cli),這裡涉及的就是 I/O 多路複用模型了:

在 I/O 多路複用模型中,最重要的函數調用就是 

select

,該方法的能夠同時監控多個檔案描述符的可讀可寫情況,當其中的某些檔案描述符可讀或者可寫時,

select

 方法就會傳回可讀以及可寫的檔案描述符個數。

關于 

select

 的具體使用方法,在網絡上資料很多,這裡就不過多展開介紹了;

與此同時也有其它的 I/O 多路複用函數 

epoll/kqueue/evport

,它們相比 

select

 性能更優秀,同時也能支撐更多的服務。

Reactor 設計模式

Redis 服務采用 Reactor 的方式來實作檔案事件處理器(每一個網絡連接配接其實都對應一個檔案描述符)

檔案事件處理器使用 I/O 多路複用子產品同時監聽多個 FD,當 

accept

read

write

 和 

close

 檔案事件産生時,檔案事件處理器就會回調 FD 綁定的事件處理器。

雖然整個檔案事件處理器是在單線程上運作的,但是通過 I/O 多路複用子產品的引入,實作了同時對多個 FD 讀寫的監控,提高了網絡通信模型的性能,同時也可以保證整個 Redis 服務實作的簡單。

I/O 多路複用子產品

I/O 多路複用子產品封裝了底層的 

select

epoll

avport

 以及 

kqueue

 這些 I/O 多路複用函數,為上層提供了相同的接口。

在這裡我們簡單介紹 Redis 是如何包裝 

select

epoll

 的,簡要了解該子產品的功能,整個 I/O 多路複用子產品抹平了不同平台上 I/O 多路複用函數的差異性,提供了相同的接口:

  • static int aeApiCreate(aeEventLoop *eventLoop)

  • static int aeApiResize(aeEventLoop *eventLoop, int setsize)

  • static void aeApiFree(aeEventLoop *eventLoop)

  • static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)

  • static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)

  • static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)

同時,因為各個函數所需要的參數不同,我們在每一個子子產品内部通過一個 

aeApiState

 來存儲需要的上下文資訊:

// select
typedef struct aeApiState {
    fd_set rfds, wfds;
    fd_set _rfds, _wfds;
} aeApiState;

// epoll
typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;
           

這些上下文資訊會存儲在 

eventLoop

 的 

void *state

 中,不會暴露到上層,隻在目前子子產品中使用。

封裝 select 函數

select

 可以監控 FD 的可讀、可寫以及出現錯誤的情況。

在介紹 I/O 多路複用子產品如何對 

select

 函數封裝之前,先來看一下 

select

 函數使用的大緻流程:

int fd = /* file descriptor */

fd_set rfds;
FD_ZERO(&rfds);
FD_SET(fd, &rfds)

for ( ; ; ) {
    select(fd+1, &rfds, NULL, NULL, NULL);
    if (FD_ISSET(fd, &rfds)) {
        /* file descriptor `fd` becomes readable */
    }
}
           
  1. 初始化一個可讀的 

    fd_set

     集合,儲存需要監控可讀性的 FD;
  2. 使用 

    FD_SET

     将 

    fd

     加入 

    rfds

  3. 調用 

    select

     方法監控 

    rfds

     中的 FD 是否可讀;
  4. 當 

    select

     傳回時,檢查 FD 的狀态并完成對應的操作。

而在 Redis 的 

ae_select

 檔案中代碼的組織順序也是差不多的,首先在 

aeApiCreate

 函數中初始化 

rfds

wfds

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    if (!state) return -1;
    FD_ZERO(&state->rfds);
    FD_ZERO(&state->wfds);
    eventLoop->apidata = state;
    return 0;
}
           

而 

aeApiAddEvent

aeApiDelEvent

 會通過 

FD_SET

FD_CLR

 修改 

fd_set

 中對應 FD 的标志位:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
    if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
    return 0;
}
           

整個 

ae_select

 子子產品中最重要的函數就是 

aeApiPoll

,它是實際調用 

select

 函數的部分,其作用就是在 I/O 多路複用函數傳回時,将對應的 FD 加入 

aeEventLoop

fired

 數組中,并傳回事件的個數:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, j, numevents = 0;

    memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
    memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));

    retval = select(eventLoop->maxfd+1,
                &state->_rfds,&state->_wfds,NULL,tvp);
    if (retval > 0) {
        for (j = 0; j <= eventLoop->maxfd; j++) {
            int mask = 0;
            aeFileEvent *fe = &eventLoop->events[j];

            if (fe->mask == AE_NONE) continue;
            if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                mask |= AE_WRITABLE;
            eventLoop->fired[numevents].fd = j;
            eventLoop->fired[numevents].mask = mask;
            numevents++;
        }
    }
    return numevents;
}
           

封裝 epoll 函數

Redis 對 

epoll

 的封裝其實也是類似的,使用 

epoll_create

 建立 

epoll

 中使用的 

epfd

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}
           

在 

aeApiAddEvent

 中使用 

epoll_ctl

 向 

epfd

 中添加需要監控的 FD 以及監聽的事件:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}
           

由于 

epoll

 相比 

select

 機制略有不同,在 

epoll_wait

 函數傳回時并不需要周遊所有的 FD 檢視讀寫情況;在 

epoll_wait

 函數傳回時會提供一個 

epoll_event

 數組:

typedef union epoll_data {
    void    *ptr;
    int      fd; /* 檔案描述符 */
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

struct epoll_event {
    uint32_t     events; /* Epoll 事件 */
    epoll_data_t data;
};
           
其中儲存了發生的 

epoll

 事件(

EPOLLIN

EPOLLOUT

EPOLLERR

EPOLLHUP

)以及發生該事件的 FD。

aeApiPoll

 函數隻需要将 

epoll_event

 數組中存儲的資訊加入 

eventLoop

fired

 數組中,将資訊傳遞給上層子產品:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}
           

子子產品的選擇

因為 Redis 需要在多個平台上運作,同時為了最大化執行的效率與性能,是以會根據編譯平台的不同選擇不同的 I/O 多路複用函數作為子子產品,提供給上層統一的接口;在 Redis 中,我們通過宏定義的使用,合理的選擇不同的子子產品:

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif
           

因為 

select

 函數是作為 POSIX 标準中的系統調用,在不同版本的作業系統上都會實作,是以将其作為保底方案:

Redis 會優先選擇時間複雜度為 $O(1)$ 的 I/O 多路複用函數作為底層實作,包括 Solaries 10 中的 

evport

、Linux 中的 

epoll

 和 macOS/FreeBSD 中的 

kqueue

,上述的這些函數都使用了核心内部的結構,并且能夠服務幾十萬的檔案描述符。

但是如果目前編譯環境沒有上述函數,就會選擇 

select

 作為備選方案,由于其在使用時會掃描全部監聽的描述符,是以其時間複雜度較差 $O(n)$,并且隻能同時服務 1024 個檔案描述符,是以一般并不會以 

select

 作為第一方案使用。

總結

Redis 對于 I/O 多路複用子產品的設計非常簡潔,通過宏保證了 I/O 多路複用子產品在不同平台上都有着優異的性能,将不同的 I/O 多路複用函數封裝成相同的 API 提供給上層使用。

整個子產品使 Redis 能以單程序運作的同時服務成千上萬個檔案描述符,避免了由于多程序應用的引入導緻代碼實作複雜度的提升,減少了出錯的可能性。

熬夜不易,點選請老王喝杯烈酒!!!!!!!