最近在看 UNIX 網絡程式設計并研究了一下 Redis 的實作,感覺 Redis 的源代碼十分适合閱讀和分析,其中 I/O 多路複用(mutiplexing)部分的實作非常幹淨和優雅,在這裡想對這部分的内容進行簡單的整理。
幾種 I/O 模型
為什麼 Redis 中要使用 I/O 多路複用這種技術呢?
首先,Redis 是跑在單線程中的,所有的操作都是按照順序線性執行的,但是由于讀寫操作等待使用者輸入或輸出都是阻塞的,是以 I/O 操作在一般情況下往往不能直接傳回,這會導緻某一檔案的 I/O 阻塞導緻整個程序無法對其它客戶提供服務,而 I/O 多路複用就是為了解決這個問題而出現的。
Blocking I/O
先來看一下傳統的阻塞 I/O 模型到底是如何工作的:當使用
read
或者
write
對某一個檔案描述符(File Descriptor 以下簡稱 FD)進行讀寫時,如果目前 FD 不可讀或不可寫,整個 Redis 服務就不會對其它的操作作出響應,導緻整個服務不可用。
這也就是傳統意義上的,也就是我們在程式設計中使用最多的阻塞模型:
阻塞模型雖然開發中非常常見也非常易于了解,但是由于它會影響其他 FD 對應的服務,是以在需要處理多個用戶端任務的時候,往往都不會使用阻塞模型。
I/O 多路複用
雖然還有很多其它的 I/O 模型,但是在這裡都不會具體介紹。
阻塞式的 I/O 模型并不能滿足這裡的需求,我們需要一種效率更高的 I/O 模型來支撐 Redis 的多個客戶(redis-cli),這裡涉及的就是 I/O 多路複用模型了:
在 I/O 多路複用模型中,最重要的函數調用就是
select
,該方法的能夠同時監控多個檔案描述符的可讀可寫情況,當其中的某些檔案描述符可讀或者可寫時,
select
方法就會傳回可讀以及可寫的檔案描述符個數。
關于
select
的具體使用方法,在網絡上資料很多,這裡就不過多展開介紹了;
與此同時也有其它的 I/O 多路複用函數
,它們相比
epoll/kqueue/evport
性能更優秀,同時也能支撐更多的服務。
select
Reactor 設計模式
Redis 服務采用 Reactor 的方式來實作檔案事件處理器(每一個網絡連接配接其實都對應一個檔案描述符)
檔案事件處理器使用 I/O 多路複用子產品同時監聽多個 FD,當
accept
、
read
write
和
close
檔案事件産生時,檔案事件處理器就會回調 FD 綁定的事件處理器。
雖然整個檔案事件處理器是在單線程上運作的,但是通過 I/O 多路複用子產品的引入,實作了同時對多個 FD 讀寫的監控,提高了網絡通信模型的性能,同時也可以保證整個 Redis 服務實作的簡單。
I/O 多路複用子產品
I/O 多路複用子產品封裝了底層的
select
epoll
avport
以及
kqueue
這些 I/O 多路複用函數,為上層提供了相同的接口。
在這裡我們簡單介紹 Redis 是如何包裝
select
epoll
的,簡要了解該子產品的功能,整個 I/O 多路複用子產品抹平了不同平台上 I/O 多路複用函數的差異性,提供了相同的接口:
-
static int aeApiCreate(aeEventLoop *eventLoop)
-
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
-
static void aeApiFree(aeEventLoop *eventLoop)
-
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
-
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)
-
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
同時,因為各個函數所需要的參數不同,我們在每一個子子產品内部通過一個
aeApiState
來存儲需要的上下文資訊:
// select
typedef struct aeApiState {
fd_set rfds, wfds;
fd_set _rfds, _wfds;
} aeApiState;
// epoll
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
這些上下文資訊會存儲在
eventLoop
的
void *state
中,不會暴露到上層,隻在目前子子產品中使用。
封裝 select 函數
select
可以監控 FD 的可讀、可寫以及出現錯誤的情況。
在介紹 I/O 多路複用子產品如何對
select
函數封裝之前,先來看一下
select
函數使用的大緻流程:
int fd = /* file descriptor */
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(fd, &rfds)
for ( ; ; ) {
select(fd+1, &rfds, NULL, NULL, NULL);
if (FD_ISSET(fd, &rfds)) {
/* file descriptor `fd` becomes readable */
}
}
- 初始化一個可讀的
集合,儲存需要監控可讀性的 FD;fd_set
- 使用
将FD_SET
加入fd
;rfds
- 調用
方法監控select
中的 FD 是否可讀;rfds
- 當
傳回時,檢查 FD 的狀态并完成對應的操作。select
而在 Redis 的
ae_select
檔案中代碼的組織順序也是差不多的,首先在
aeApiCreate
函數中初始化
rfds
wfds
:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
FD_ZERO(&state->rfds);
FD_ZERO(&state->wfds);
eventLoop->apidata = state;
return 0;
}
而
aeApiAddEvent
aeApiDelEvent
會通過
FD_SET
FD_CLR
修改
fd_set
中對應 FD 的标志位:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
return 0;
}
整個
ae_select
子子產品中最重要的函數就是
aeApiPoll
,它是實際調用
select
函數的部分,其作用就是在 I/O 多路複用函數傳回時,将對應的 FD 加入
aeEventLoop
fired
數組中,并傳回事件的個數:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
封裝 epoll 函數
Redis 對
epoll
的封裝其實也是類似的,使用
epoll_create
建立
epoll
中使用的
epfd
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
在
aeApiAddEvent
中使用
epoll_ctl
向
epfd
中添加需要監控的 FD 以及監聽的事件:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
由于
epoll
相比
select
機制略有不同,在
epoll_wait
函數傳回時并不需要周遊所有的 FD 檢視讀寫情況;在
epoll_wait
函數傳回時會提供一個
epoll_event
數組:
typedef union epoll_data {
void *ptr;
int fd; /* 檔案描述符 */
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll 事件 */
epoll_data_t data;
};
其中儲存了發生的事件(
epoll
EPOLLIN
EPOLLOUT
EPOLLERR
)以及發生該事件的 FD。
EPOLLHUP
aeApiPoll
函數隻需要将
epoll_event
數組中存儲的資訊加入
eventLoop
fired
數組中,将資訊傳遞給上層子產品:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
子子產品的選擇
因為 Redis 需要在多個平台上運作,同時為了最大化執行的效率與性能,是以會根據編譯平台的不同選擇不同的 I/O 多路複用函數作為子子產品,提供給上層統一的接口;在 Redis 中,我們通過宏定義的使用,合理的選擇不同的子子產品:
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
因為
select
函數是作為 POSIX 标準中的系統調用,在不同版本的作業系統上都會實作,是以将其作為保底方案:
Redis 會優先選擇時間複雜度為 $O(1)$ 的 I/O 多路複用函數作為底層實作,包括 Solaries 10 中的
evport
、Linux 中的
epoll
和 macOS/FreeBSD 中的
kqueue
,上述的這些函數都使用了核心内部的結構,并且能夠服務幾十萬的檔案描述符。
但是如果目前編譯環境沒有上述函數,就會選擇
select
作為備選方案,由于其在使用時會掃描全部監聽的描述符,是以其時間複雜度較差 $O(n)$,并且隻能同時服務 1024 個檔案描述符,是以一般并不會以
select
作為第一方案使用。
總結
Redis 對于 I/O 多路複用子產品的設計非常簡潔,通過宏保證了 I/O 多路複用子產品在不同平台上都有着優異的性能,将不同的 I/O 多路複用函數封裝成相同的 API 提供給上層使用。
整個子產品使 Redis 能以單程序運作的同時服務成千上萬個檔案描述符,避免了由于多程序應用的引入導緻代碼實作複雜度的提升,減少了出錯的可能性。
熬夜不易,點選請老王喝杯烈酒!!!!!!!