天天看點

Redis 技術專題系列之網絡架構和線程模型

作者:網際網路技術學堂

引言

Redis 是一個高性能、記憶體存儲的鍵值資料庫。它支援豐富的資料結構,如字元串、哈希、清單、集合和有序集合。Redis 的性能得益于其網絡架構和線程模型。

本文将介紹 Redis 的網絡架構和線程模型,希望能夠幫助讀者更好地了解 Redis 的性能優化和運維管理。

大家好,這裡是網際網路技術學堂,留下你的點贊、關注、分享,支援一下吧,謝謝。

Redis 技術專題系列之網絡架構和線程模型

網絡架構

Redis 的網絡架構是基于單線程的事件循環模型。它采用了 I/O 多路複用技術,利用單個線程處理所有用戶端請求和網絡 I/O 操作。

網絡程式設計離不開 Socket,網絡 I/O 模型常用的無非是同步阻塞、同步非阻塞、異步阻塞、異步非阻塞,高性能網絡伺服器常見的線程模型也就是基于 EventLoop 模式的單線程模型。

Redis 技術專題系列之網絡架構和線程模型

當用戶端連接配接到 Redis 伺服器時,伺服器會為每個用戶端配置設定一個檔案描述符,并将其加入到事件循環機制中。當有資料可讀或可寫時,Redis 會觸發相應的事件,單線程通過事件循環機制排程事件處理函數來處理用戶端請求和網絡 I/O 操作。

這種單線程的事件循環模型帶來了以下優點:

  1. 簡單:不需要線程切換和同步機制,減少了鎖競争和上下文切換的開銷。
  2. 可擴充性:通過多個 Redis 執行個體和資料分片來實作橫向擴充。
  3. 高可用性:通過主從複制和哨兵機制來實作高可用性。
Redis 技術專題系列之網絡架構和線程模型

線程模型

Redis 采用的是單線程的事件循環模型,但它并不是單程序的。Redis 可以通過配置檔案中的 daemonize 選項将自己變成守護程序,同時可以通過 fork 函數來建立子程序來處理持久化操作和複制操作。

子程序主要用于持久化和複制操作,Redis 采用了寫時複制(Copy-On-Write,簡稱 COW)技術來優化子程序的性能。當子程序需要進行寫操作時,Redis 會将需要修改的資料複制一份,修改完成後再将修改後的資料替換原來的資料。

這種寫時複制技術帶來了以下優點:

  1. 性能:減少了記憶體複制的開銷,提高了寫操作的性能。
  2. 資料:避免了在多個程序之間共享同一塊記憶體帶來的資料問題。

總體來說,Redis 的線程模型和網絡架構都是非常不錯的,可以幫助 Redis 實作高性能、高可用和可擴充的特性。是以,掌握 Redis 的網絡架構和線程模型是 Redis 開發和運維的重要知識點,也是高頻面試題的考點之一。

Redis 技術專題系列之網絡架構和線程模型

源碼層面分析網絡架構

Redis 的網絡架構主要包含以下兩個部分:

1、事件驅動模型

Redis 使用 epoll 等事件驅動模型來處理用戶端的連接配接、讀寫操作。具體實作代碼在 ae.c 檔案中。

首先,Redis 會建立一個事件循環機制,不斷地監聽和處理事件。在初始化 Redis 伺服器時,通過 aeCreateEventLoop() 函數建立一個事件循環機制。

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}           

然後,Redis 會将用戶端的連接配接事件和讀寫事件加入到事件循環機制中。具體實作代碼在 networking.c 檔案中。

static void acceptCommonHandler(int fd, int flags, char *ip) {
int cport, cfd, max = server.maxclients;
char cip[NET_IP_STR_LEN];
listNode *ln;
client *c;
if ((cfd = anetTcpAccept(server.neterr, fd, cip, &cport)) == ANET_ERR) {
if (errno != EWOULDBLOCK) {
serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
}
return;
}
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);
if (server.tcpkeepalive)
anetKeepAlive(NULL, cfd, server.tcpkeepalive);
if (max && listLength(server.clients) >= max) {
serverLog(LL_VERBOSE,"WARNING: maximum number of clients reached");
close(cfd);
return;
}
ln = listAddNodeTail(server.clients,NULL);
c = zmalloc(sizeof(*c));
c->connfd = cfd;
c->firsttime = time(NULL);
c->lastinteraction = c->lastping = c->lastoutput = server.unixtime;
c->authenticated = 0;
c->fd = cfd;
c->name = NULL;
c->querybuf = sdsempty();
c->reqtype = 0;
c->argc = 0           

在上面的代碼中,anetTcpAccept() 函數用于監聽用戶端的連接配接事件,并傳回連接配接用戶端的檔案描述符。如果有用戶端連接配接,就會執行 listAddNodeTail() 函數将用戶端添加到 server.clients 連結清單中,并為用戶端配置設定一個 client 結構體對象。在 client 結構體對象中,儲存了用戶端的連接配接描述符、讀寫緩沖區、近一次互動時間等資訊。

然後,Redis 會将用戶端的連接配接描述符和讀寫事件加入到事件循環機制中,使得 Redis 能夠及時地處理用戶端的請求。具體實作代碼在 ae.c 檔案中。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}           

在上面的代碼中,aeCreateFileEvent() 函數用于将檔案描述符和事件類型加入到事件循環機制中,實作了對用戶端的讀寫事件監聽。

2、多路複用模型

Redis 的多路複用模型主要使用了 epoll 函數實作,可以同時監聽多個用戶端的讀寫事件。具體實作代碼在 ae_epoll.c 檔案中。

static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,           

在上面的代碼中,aeApiCreate() 函數用于建立 epoll 執行個體,同時初始化事件數組和事件循環機制。aeApiAddEvent() 函數用于将檔案描述符和事件類型加入到 epoll 執行個體中,實作了對用戶端的讀寫事件監聽。

同時,在 ae.c 檔案中,還有以下關于多路複用模型的代碼:

/* Process events in the main loop */
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
/* Wait for milliseconds for events to happen.
* If milliseconds is -1, wait indefinitely.
* Returns the number of events processed. */
int aeWait(int fd, int mask, long long milliseconds) {
struct epoll_event events[1];
int retval, numevents = 0;
if (milliseconds != -1) {
milliseconds /= 1000;
milliseconds++; /* rounding up */
}
retval = epoll_wait(fd, events, 1, milliseconds);
if (retval > 0) {
if (events[0].events & EPOLLIN) numevents |= AE_READABLE;
if (events[0].events & EPOLLOUT) numevents |= AE_WRITABLE;
if (events[0].events & EPOLLERR) numevents |= AE_WRITABLE;
if (events[0].events & EPOLLHUP) numevents |= AE_WRITABLE;
return numevents;
} else {
return retval;
}
}
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed. */
void aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
/* Nothing to do, return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return;
/* Note that we want to sleep if we have no events to process. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tv } else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Note the fe->mask & mask & ... code: maybe an already
* processed event removed an element that fired and we
* still didn't processed, so we check if the event is still
* valid. */
if (fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
if (fe->mask & mask & AE_WRITABLE) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
if (fe->mask & mask & AE_ERROR) {
fe->efileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
if (fe->mask & mask & AE_HUP) {
fe->hupfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
processed++;
}
}           

`aeMain()` 函數是 Redis 事件循環的核心,其中的 `while` 循環不斷處理事件,如果事件循環被設定為停止狀态,那麼就跳出循環。

`aeWait()` 函數主要用于等待事件的發生,如果等待的時間為 -1,表示等待無限長的時間,否則就等待指定的時間後傳回。

`aeProcessEvents()` 函數用于處理檔案事件和時間事件,其中 `aeSearchNearestTimer()` 函數用于尋找近的時間事件,并将其加入到時間事件的連結清單中,然後計算出時間事件距離目前時間的時間差,用于設定 epoll_wait() 函數的逾時時間。

在 `ae.c` 檔案中,還有以下與 Redis 事件循環相關的代碼:

/* Create a new event loop */
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events ==)           

總結

在 Redis 的事件循環中,主要采用了多路複用技術來實作,即使用 epoll 等系統調用來監控檔案描述符,以及使用時間事件來處理一些需要在指定時間後執行的任務。

Redis 的事件循環采用單線程的方式實作,但是在處理檔案事件時,會使用 I/O 多路複用技術來提高性能,同時還會使用時間事件來實作一些定時任務。

在 Redis 的事件循環中,時間事件和檔案事件是分别處理的,時間事件使用連結清單來儲存,檔案事件使用 epoll 或 select 等系統調用來實作。在每次循環中,Redis 事件循環都會處理所有已就緒的檔案事件,并且會尋找并處理近的時間事件,進而實作了良好的事件處理和定時任務的執行。

了解 Redis 事件循環的實作原理,有助于我們更好地了解 Redis 的工作原理,并且能夠更好地調試和優化 Redis 伺服器,也是面試中常被問及的高頻知識點。

下一篇: IPHONE開發

繼續閱讀