redis是用c語言的寫的緩存伺服器,有高性能和多種資料類型支援的特性,廣受網際網路公司喜愛。
我們要分析其啟動過程,首先就要先找到其入口。
當然我們應該是要先分析 Makefile 檔案,然後找到最終編譯成的檔案,然後再順勢找到C語言入口 main(); 這裡咱們就不費那事了,一是這事很枯燥,二是我也不知道找不找到得到。是以,就直接找到入口吧: 在 src/server.c 中,main() 函數就是了。
引用網上大牛的話歸納一下,main 函數執行的過程分以下幾步:
1. Redis 會設定一些回調函數,目前時間,随機數的種子。回調函數實際上什麼?舉個例子,比如 Q/3 要給 Redis 發送一個關閉的指令,讓它去做一些優雅的關閉,做一些掃尾清楚的工作,這個工作如果不設計回調函數,它其實什麼都不會幹。其實 C 語言的程式跑在作業系統之上,Linux 作業系統本身就是提供給我們事件機制的回調注冊功能,是以它會設計這個回調函數,讓你注冊上,關閉的時候優雅的關閉,然後它在後面可以做一些業務邏輯。
2. 不管任何軟體,肯定有一份配置檔案需要配置。首先在伺服器端會把它預設的一份配置做一個初始化。
3. Redis 在 3.0 版本正式釋出之前其實已經有篩選這個模式了,但是這個模式,我很少在生産環境在用。Redis 可以初始化這個模式,比較複雜。
4. 解析啟動的參數。其實不管什麼軟體,它在初始化的過程當中,配置都是由兩部分組成的。第一部分,靜态的配置檔案;第二部分,動态啟動的時候,main,就是參數給它的時候進去配置。
5. 把服務端的東西拿過來,裝載 Config 配置檔案,loadServerConfig。
6. 初始化伺服器,initServer。
7. 從磁盤裝載資料。
8. 有一個主循環程式開始幹活,用來處理用戶端的請求,并且把這個請求轉到後端的業務邏輯,幫你完成指令執行,然後吐資料,這麼一個過程。
我們以源碼浏覽形式,來看看具體實作。
main 函數入口:
注意 server 是一個全局變量,各函數進行操作時,都直接對其操作。
// struct redisServer server;
// src/server.c
int main(int argc, char **argv) {
struct timeval tv;
int j;
// 測試環境變量設定
#ifdef REDIS_TEST
if (argc == 3 && !strcasecmp(argv[1], "test")) {
if (!strcasecmp(argv[2], "ziplist")) {
return (argc, argv);
} else if (!strcasecmp(argv[2], "quicklist")) {
quicklistTest(argc, argv);
} else if (!strcasecmp(argv[2], "intset")) {
return intsetTest(argc, argv);
} else if (!strcasecmp(argv[2], "zipmap")) {
return zipmapTest(argc, argv);
} else if (!strcasecmp(argv[2], "sha1test")) {
return sha1Test(argc, argv);
} else if (!strcasecmp(argv[2], "util")) {
return utilTest(argc, argv);
} else if (!strcasecmp(argv[2], "sds")) {
return sdsTest(argc, argv);
} else if (!strcasecmp(argv[2], "endianconv")) {
return endianconvTest(argc, argv);
} else if (!strcasecmp(argv[2], "crc64")) {
return crc64Test(argc, argv);
}
return -1; /* test not found */
}
#endif
/* We need to initialize our libraries, and the server configuration. */
#ifdef INIT_SETPROCTITLE_REPLACEMENT
spt_init(argc, argv)ziplistTest;
#endif
// 設定些預設值, 随機數等等
setlocale(LC_COLLATE,"");
zmalloc_enable_thread_safeness();
// oom 回調處理
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
server.sentinel_mode = checkForSentinelMode(argc,argv);
// 初始化伺服器預設配置, 将變化展現到 server 變量上
initServerConfig();
/* Store the executable path and arguments in a safe place in order
* to be able to restart the server later. */
server.executable = getAbsolutePath(argv[0]);
server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
server.exec_argv[argc] = NULL;
for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
/* We need to init sentinel right now as parsing the configuration file
* in sentinel mode will have the effect of populating the sentinel
* data structures with master nodes to monitor. */
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
// 加載配置檔案及其他指令
/* Check if we need to start in redis-check-rdb mode. We just execute
* the program main. However the program is part of the Redis executable
* so that we can easily execute an RDB check on loading errors. */
if (strstr(argv[0],"redis-check-rdb") != NULL)
exit(redis_check_rdb_main(argv,argc));
if (argc >= 2) {
j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
char *configfile = NULL;
/* Handle special options --help and --version */
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0) version();
if (strcmp(argv[1], "--help") == 0 ||
strcmp(argv[1], "-h") == 0) usage();
if (strcmp(argv[1], "--test-memory") == 0) {
if (argc == 3) {
memtest(atoi(argv[2]),50);
exit(0);
} else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
exit(1);
}
}
/* First argument is the config file name? */
if (argv[j][0] != '-' || argv[j][1] != '-') {
configfile = argv[j];
server.configfile = getAbsolutePath(configfile);
/* Replace the config file in server.exec_argv with
* its absoulte path. */
zfree(server.exec_argv[j]);
server.exec_argv[j] = zstrdup(server.configfile);
j++;
}
/* All the other options are parsed and conceptually appended to the
* configuration file. For instance --port 6380 will generate the
* string "port 6380\n" to be parsed after the actual file name
* is parsed, if any. */
while(j != argc) {
if (argv[j][0] == '-' && argv[j][1] == '-') {
/* Option name */
if (!strcmp(argv[j], "--check-rdb")) {
/* Argument has no options, need to skip for parsing. */
j++;
continue;
}
if (sdslen(options)) options = sdscat(options,"\n");
options = sdscat(options,argv[j]+2);
options = sdscat(options," ");
} else {
/* Option argument */
options = sdscatrepr(options,argv[j],strlen(argv[j]));
options = sdscat(options," ");
}
j++;
}
if (server.sentinel_mode && configfile && *configfile == '-') {
serverLog(LL_WARNING,
"Sentinel config from STDIN not allowed.");
serverLog(LL_WARNING,
"Sentinel needs config file on disk to save state. Exiting...");
exit(1);
}
resetServerSaveParams();
loadServerConfig(configfile,options);
sdsfree(options);
} else {
serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
}
server.supervised = redisIsSupervised(server.supervised_mode);
int background = server.daemonize && !server.supervised;
if (background) daemonize();
// 初始化伺服器
// 重點如: 綁定監聽端口号,設定 acceptTcpHandler 回調函數
initServer();
if (background || server.pidfile) createPidFile();
redisSetProcTitle(argv[0]);
redisAsciiArt();
checkTcpBacklogSettings();
if (!server.sentinel_mode) {
/* Things not needed when running in Sentinel mode. */
serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION);
#ifdef __linux__
linuxMemoryWarnings();
#endif
// 從磁盤裝載資料進行恢複或者初始化
loadDataFromDisk();
if (server.cluster_enabled) {
if (verifyClusterConfigWithData() == C_ERR) {
serverLog(LL_WARNING,
"You can't have keys in a DB different than DB 0 when in "
"Cluster mode. Exiting.");
exit(1);
}
}
if (server.ipfd_count > 0)
serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port);
if (server.sofd > 0)
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
} else {
sentinelIsRunning();
}
/* Warning the user about suspicious maxmemory setting. */
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}
// 主循環服務, 隻有收到 stop 指令後,才會退出
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
// 關閉服務
aeDeleteEventLoop(server.el);
return 0;
}
如上,即是redis的整個main方法了,整個啟動流程也算是一目了然了。大概流程也不出預料,環境設定、預設參數、配置檔案加載、初始化服務、恢複資料、死循環。
配置參數什麼的都不用瞅了,但是對于哨兵、叢集什麼的,又太深入了。咱們還是先蜻蜓點水下,主要看年初始化伺服器的時候做了些啥事!
初始化伺服器:
// src/server.c, 在main中調用
void initServer(void) {
int j;
// 注冊幾個事件響應處理器,比如前台模式運作或者調試模式的處理
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}
// 初始化用戶端相關的參數,設定到 server 中
server.pid = getpid();
server.current_client = NULL;
server.clients = listCreate();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.clients_pending_write = listCreate();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
server.system_memory_size = zmalloc_get_memory_size();
// 全局共享對象, 比如 OK, 1-10000, ...
// 性能優化, 避免對相同的對象反複建立
createSharedObjects();
adjustOpenFilesLimit();
// 建立事件循環對象 (aeEventLoop), 在 ae.c 中實作
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
// 建立db對象,所有資料存儲其中
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Open the TCP listening socket for the user commands. */
// 打開服務端口監聽
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
}
/* Abort if there are no listening sockets at all. */
if (server.ipfd_count == 0 && server.sofd < 0) {
serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
/* Create the Redis databases, and initialize other internal state. */
// 初始化各db,實際就是由這麼幾個數組來動作db的
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].eviction_pool = evictionPoolAlloc();
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
// pub/sub 參數初始化
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate();
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
server.cronloops = 0;
// rdb,aof 參數初始化
server.rdb_child_pid = -1;
server.aof_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
server.resident_set_size = 0;
server.lastbgsave_status = C_OK;
server.aof_last_write_status = C_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
updateCachedTime();
/* Create out timers, that's our main way to process background
* operations. */
// 建立定時器,用于運作背景事務,每隔1s運作一次
// 由 serverCron 承載任務,執行任務如 名額統計,記錄檔持久化,db擴容,用戶端管理...
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 建立socket檔案監控, 由 acceptTcpHandler 承載處理
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
// 如果開啟了AOF功能,就打開AOF檔案
/* Open the AOF file if needed. */
if (server.aof_state == AOF_ON) {
server.aof_fd = open(server.aof_filename,
O_WRONLY|O_APPEND|O_CREAT,0644);
if (server.aof_fd == -1) {
serverLog(LL_WARNING, "Can't open the append-only file: %s",
strerror(errno));
exit(1);
}
}
/* 32 bit instances are limited to 4GB of address space, so if there is
* no explicit limit in the user provided configuration we set a limit
* at 3 GB using maxmemory with 'noeviction' policy'. This avoids
* useless crashes of the Redis instance for out of memory. */
if (server.arch_bits == 32 && server.maxmemory == 0) {
serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
}
if (server.cluster_enabled) clusterInit();
replicationScriptCacheInit();
// lua 腳本初始化
scriptingInit(1);
// 初始化慢查詢日志變量
slowlogInit();
// 延遲監控初始化,僅建立變量
latencyMonitorInit();
// 初始化幾個系統必須的線程(線程池),執行任務,while死循環
bioInit();
}
通過以上,我們可以清楚明白,在初始化伺服器時,高大上的C都幹了啥。總體來說就是: 設定系統回調、開啟端口監聽、開啟socket監聽、開啟背景任務、開啟AOF、腳本初始化、線程池初始化。。。 (做這些事是容易的,難的是設計之初如何架構其功能)
下面我們來看幾個初始伺服器時的關鍵函數方法。
1. aeEventLoop 的建立
aeEventLoop 是後續進行任務處理的重要資料結構。
// ae.c, 建立 aeEventLoop 對象,封裝底層的 事件模式,統一對外服務
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
// 根據系統不同,選擇不同的實作,C裡面的多态自然是用 #ifdef 來實作了
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
// 選擇不同的io模型, 優先級: evport > epoll > kqueue > select
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
// epoll 實作
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
// ae_epoll.c, linux 建立epoll句柄
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
2. acceptTcpHandler, 對于網絡請求的接入處理
// networking.c, acceptTcpHandler
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
// 擷取fd, ip, port
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
// 建立用戶端對象,加入到 server.clients 中
acceptCommonHandler(cfd,0,cip);
}
}
// anet.c, 解析 ip, port, fd
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
int fd;
struct sockaddr_storage sa;
socklen_t salen = sizeof(sa);
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
return ANET_ERR;
if (sa.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&sa;
if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
if (port) *port = ntohs(s->sin_port);
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
if (port) *port = ntohs(s->sin6_port);
}
return fd;
}
// anet.c, 調用系統函數擷取 socket 資料
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
int fd;
while(1) {
fd = accept(s,sa,len);
if (fd == -1) {
if (errno == EINTR)
continue;
else {
anetSetError(err, "accept: %s", strerror(errno));
return ANET_ERR;
}
}
break;
}
return fd;
}
3. bioInit 線程建立
// bio.c
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects */
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
// bioProcessBackgroundJobs 用于執行線程任務
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
二、主循環服務
接下來我們看看另一個重要的流程,主循環服務。 redis作為一個存儲服務,必定需要一直運作等待,這就是while死循環的應用了。在前面各種環境初始化完成後,進入while循環服務。
// src/ae.c 主循環服務
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// eventLoop 會被 acceptTcpHandler 進行資料填充
// 此處 beforesleep 為外部初始化的
// aeSetBeforeSleepProc(), 設定 beforeSleep
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 由 aeProcessEvents 處理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
很簡單,就做兩件事: beforesleep, aeProcessEvents, 看起來 aeProcessEvents() 是個核對服務。我們可以先觀察其行為。
1. aeProcessEvents, 處理各種事件(資料準備)
// ae.c
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
*
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 擷取最近 timer事件, 用于判定是否有需要執行至少一個時間事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
// 擷取等待事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 此處将會調用前面設定好的 acceptTcpHandler 服務
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
// 時間事件處理, serverCron 調用
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
// ae_epoll.c, 調用系統底層, 擷取網絡就緒事件, 放入 eventLoop->fired 中
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
// 将系統事件類型轉換為 redis 的事件類型
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
2. 主循環服務之 beforeSleep
beforeSleep是在進入 aeMain之前,直接綁定在 el 上的。 是在主循環中進行檢測的條件,但其承擔了重要的作用,比如客戶請求的指令解析和處理!
// server.c, beforeSleep
/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
* for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
* later in this function. */
if (server.cluster_enabled) clusterBeforeSleep();
/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
/* Send all the slaves an ACK request if at least one client blocked
* during the previous event loop iteration. */
if (server.get_ack_from_slaves) {
robj *argv[3];
argv[0] = createStringObject("REPLCONF",8);
argv[1] = createStringObject("GETACK",6);
argv[2] = createStringObject("*",1); /* Not used argument. */
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[2]);
server.get_ack_from_slaves = 0;
}
/* Unblock all the clients blocked for synchronous replication
* in WAIT. */
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
/* Try to process pending commands for clients that were just unblocked. */
// 處理可用的用戶端請求
if (listLength(server.unblocked_clients))
processUnblockedClients();
// AOF刷盤服務
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
// 将一些被挂起的資料寫入用戶端socket中
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
}
// blocking.c, 處理被解阻塞的用戶端連接配接, 順便處理用戶端請求
/* This function is called in the beforeSleep() function of the event loop
* in order to process the pending input buffer of clients that were
* unblocked after a blocking operation. */
void processUnblockedClients(void) {
listNode *ln;
client *c;
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
serverAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients,ln);
c->flags &= ~CLIENT_UNBLOCKED;
/* Process remaining data in the input buffer, unless the client
* is blocked again. Actually processInputBuffer() checks that the
* client is not blocked before to proceed, but things may change and
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->querybuf && sdslen(c->querybuf) > 0) {
processInputBuffer(c);
}
}
}
}
// networking.c, 處理接收到的資料, 調起下遊處理服務
void processInputBuffer(client *c) {
server.current_client = c;
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
/* Return if clients are paused. */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
/* Determine request type when unknown. */
// 根據第一個字元是否是 *, 分為兩種類型協定, 處理方式不同
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
// 經過前面請求解析後,進入請求處理核心流程
if (processCommand(c) == C_OK)
resetClient(c);
}
}
server.current_client = NULL;
}
// server.c, 根據網絡子產品解析好的用戶端指令,進行相應的業務處理
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* If C_OK is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) {
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
// 根據第一個參數 查找處理指令,在 server.c 的頂部有定義: redisCommandTable
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
// 以下是一系列判斷,是否符合指令執行前提
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReply(c,shared.noautherr);
return C_OK;
}
/* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;
if (server.cluster->state != CLUSTER_OK) {
flagTransaction(c);
clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
return C_OK;
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
flagTransaction(c);
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
}
/* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do
* is returning an error. */
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;
/* It was impossible to free enough memory, and the command the client
* is trying to execute is denied during OOM conditions? Error. */
if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;
}
}
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand))
{
flagTransaction(c);
if (server.aof_last_write_status == C_OK)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno)));
return C_OK;
}
/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
flagTransaction(c);
addReply(c, shared.noreplicaserr);
return C_OK;
}
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return C_OK;
}
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
/* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
* we are a slave with a broken link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return C_OK;
}
/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
return C_OK;
}
/* Lua script too slow? Only allow a limited number of commands. */
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return C_OK;
}
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 由 call 函數執行各自的 command
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}
到此,整個redis的啟動及簡要的請求處理流程就完成了。
下面以兩個UML來重新審視整個流程。
1. redisServer 初始化時序圖

2. 主循環服務時序圖
總體來說,就單個指令的執行流程來說,簡單到 就是一個 指令表的查找,到資料處理響應。
不要害怕今日的苦,你要相信明天,更苦!