在上一篇文章中 《Redis 指令執行過程(上)》 中,我們首先了解 Redis 指令執行的整體流程,然後細緻分析了從 Redis 啟動到建立 socket 連接配接,再到讀取 socket 資料到輸入緩沖區,解析指令,執行指令等過程的原理和實作細節。接下來,我們來具體看一下 set 和 get 指令的實作細節和如何将指令結果通過輸出緩沖區和 socket 發送給 Redis 用戶端。

set 和 get 指令具體實作
前文講到 processCommand 方法會從輸入緩沖區中解析出對應的 redisCommand,然後調用 call 方法執行解析出來的 redisCommand的 proc 方法。不同指令的的 proc 方法是不同的,比如說名為 set 的 redisCommand 的 proc 是 setCommand 方法,而 get 的則是 getCommand 方法。通過這種形式,實際上實作在Java 中特别常見的多态政策。
void call(client *c, int flags) {
....
c->cmd->proc(c);
....
}
// redisCommand結構體
struct redisCommand {
char *name;
// 對應方法的函數範式
redisCommandProc *proc;
.... // 其他定義
};
// 使用 typedef 定義的别名
typedef void redisCommandProc(client *c);
// 不同的指令,調用不同的方法。
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
.... // 所有的 redis 指令都有
}
setCommand 會判斷set指令是否攜帶了nx、xx、ex或者px等可選參數,然後調用setGenericCommand指令。我們直接來看 setGenericCommand 方法。
setGenericCommand 方法的處理邏輯如下所示:
- 首先判斷 set 的類型是 set_nx 還是 set_xx,如果是 nx 并且 key 已經存在則直接傳回;如果是 xx 并且 key 不存在則直接傳回。
- 調用 setKey 方法将鍵值添加到對應的 Redis 資料庫中。
- 如果有過期時間,則調用 setExpire 将設定過期時間
- 進行鍵空間通知
- 傳回對應的值給用戶端。
// t_string.c
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0;
/**
* 設定了過期時間;expire是robj類型,擷取整數值
*/
if (expire) {
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
return;
if (milliseconds <= 0) {
addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
return;
}
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
/**
* NX,key存在時直接傳回;XX,key不存在時直接傳回
* lookupKeyWrite 是在對應的資料庫中尋找鍵值是否存在
*/
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
/**
* 添加到資料字典
*/
setKey(c->db,key,val);
server.dirty++;
/**
* 過期時間添加到過期字典
*/
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
/**
* 鍵空間通知
*/
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
/**
* 傳回值,addReply 在 get 指令時再具體講解
*/
addReply(c, ok_reply ? ok_reply : shared.ok);
}
具體 setKey 和 setExpire 的方法實作我們這裡就不細講,其實就是将鍵值添加到db的 dict 資料哈希表中,将鍵和過期時間添加到 expires 哈希表中,如下圖所示。
接下來看 getCommand 的具體實作,同樣的,它底層會調用 getGenericCommand 方法。
getGenericCommand 方法會調用 lookupKeyReadOrReply 來從 dict 資料哈希表中查找對應的 key值。如果找不到,則直接傳回 C_OK;如果找到了,則根據值的類型,調用 addReply 或者 addReplyBulk 方法将值添加到輸出緩沖區中。
int getGenericCommand(client *c) {
robj *o;
// 調用 lookupKeyReadOrReply 從資料字典中查找對應的鍵
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
return C_OK;
// 如果是string類型,調用 addReply 單行傳回。如果是其他對象類型,則調用 addReplyBulk
if (o->type != OBJ_STRING) {
addReply(c,shared.wrongtypeerr);
return C_ERR;
} else {
addReplyBulk(c,o);
return C_OK;
}
}
lookupKeyReadWithFlags 會從 redisDb 中查找對應的鍵值對,它首先會調用 expireIfNeeded判斷鍵是否過期并且需要删除,如果為過期,則調用 lookupKey 方法從 dict 哈希表中查找并傳回。具體解釋可以看代碼中的詳細注釋
/*
* 查找key的讀操作,如果key找不到或者已經邏輯上過期傳回 NULL,有一些副作用
* 1 如果key到達過期時間,它會被裝置為過期,并且删除
* 2 更新key的最近通路時間
* 3 更新全局緩存擊中機率
* flags 有兩個值: LOOKUP_NONE 一般都是這個;LOOKUP_NOTOUCH 不修改最近通路時間
*/
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { // db.c
robj *val;
// 檢查鍵是否過期
if (expireIfNeeded(db,key) == 1) {
.... // master和 slave 對這種情況的特殊處理
}
// 查找鍵值字典
val = lookupKey(db,key,flags);
// 更新全局緩存命中率
if (val == NULL)
server.stat_keyspace_misses++;
else
server.stat_keyspace_hits++;
return val;
}
Redis 在調用查找鍵值系列方法前都會先調用 expireIfNeeded 來判斷鍵是否過期,然後根據 Redis 是否配置了懶删除來進行同步删除或者異步删除。關于鍵删除的細節可以檢視
《詳解 Redis 記憶體管理機制和實作》一文。
在判斷鍵釋放過期的邏輯中有兩個特殊情況:
- 如果目前 Redis 是主從結構中的從執行個體,則隻判斷鍵是否過期,不直接對鍵進行删除,而是要等待主執行個體發送過來的删除指令後再進行删除。如果目前 Redis 是主執行個體,則調用 propagateExpire 來傳播過期指令。
- 如果目前正在進行 Lua 腳本執行,因為其原子性和事務性,整個執行過期中時間都按照其開始執行的那一刻計算,也就是說lua執行時未過期的鍵,在它整個執行過程中也都不會過期。
/*
* 在調用 lookupKey*系列方法前調用該方法。
* 如果是slave:
* slave 并不主動過期删除key,但是傳回值仍然會傳回鍵已經被删除。
* master 如果key過期了,會主動删除過期鍵,并且觸發 AOF 和同步操作。
* 傳回值為0表示鍵仍然有效,否則傳回1
*/
int expireIfNeeded(redisDb *db, robj *key) { // db.c
// 擷取鍵的過期時間
mstime_t when = getExpire(db,key);
mstime_t now;
if (when < 0) return 0;
/*
* 如果目前是在執行lua腳本,根據其原子性,整個執行過期中時間都按照其開始執行的那一刻計算
* 也就是說lua執行時未過期的鍵,在它整個執行過程中也都不會過期。
*/
now = server.lua_caller ? server.lua_time_start : mstime();
// slave 直接傳回鍵是否過期
if (server.masterhost != NULL) return now > when;
// master時,鍵未過期直接傳回
if (now <= when) return 0;
// 鍵過期,删除鍵
server.stat_expiredkeys++;
// 觸發指令傳播
propagateExpire(db,key,server.lazyfree_lazy_expire);
// 和鍵空間事件
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
// 根據是否懶删除,調用不同的函數
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}
lookupKey 方法則是通過 dictFind 方法從 redisDb 的 dict 哈希表中查找鍵值,如果能找到,則根據 redis 的 maxmemory_policy 政策來判斷是更新 lru 的最近通路時間,還是調用 updateFU 方法更新其他名額,這些名額可以在後續記憶體不足時對鍵值進行回收。
robj *lookupKey(redisDb *db, robj *key, int flags) {
// dictFind 根據 key 擷取字典的entry
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
// 擷取 value
robj *val = dictGetVal(de);
// 當處于 rdb aof 子程序複制階段或者 flags 不是 LOOKUP_NOTOUCH
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
!(flags & LOOKUP_NOTOUCH))
{
// 如果是 MAXMEMORY_FLAG_LFU 則進行相應操作
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
// 更新最近通路時間
val->lru = LRU_CLOCK();
}
}
return val;
} else {
return NULL;
}
}
将指令結果寫入輸出緩沖區
在所有的 redisCommand 執行的最後,一般都會調用 addReply 方法進行結果傳回,我們的分析也來到了 Redis 指令執行的傳回資料階段。
addReply 方法做了兩件事情:
- prepareClientToWrite 判斷是否需要傳回資料,并且将目前 client 添加到等待寫傳回資料隊列中。
- 調用 _addReplyToBuffer 和 _addReplyObjectToList 方法将傳回值寫入到輸出緩沖區中,等待寫入 socekt。
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
// 需要将響應内容添加到output buffer中。總體思路是,先嘗試向固定buffer添加,添加失敗的話,在嘗試添加到響應連結清單
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c,obj);
} else if (obj->encoding == OBJ_ENCODING_INT) {
.... // 特殊情況的優化
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}
prepareClientToWrite 首先判斷了目前 client是否需要傳回資料:
- Lua 腳本執行的 client 則需要傳回值;
- 如果用戶端發送來 REPLY OFF 或者 SKIP 指令,則不需要傳回值;
- 如果是主從複制時的主執行個體 client,則不需要傳回值;
- 目前是在 AOF loading 狀态的假 client,則不需要傳回值。
接着如果這個 client 還未處于延遲等待寫入 (CLIENT_PENDING_WRITE)的狀态,則将其設定為該狀态,并将其加入到 Redis 的等待寫入傳回值用戶端隊列中,也就是 clients_pending_write隊列。
int prepareClientToWrite(client *c) {
// 如果是 lua client 則直接OK
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
// 用戶端發來過 REPLY OFF 或者 SKIP 指令,不需要發送傳回值
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
// master 作為client 向 slave 發送指令,不需要接收傳回值
if ((c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
// AOF loading 時的假client 不需要傳回值
if (c->fd <= 0) return C_ERR;
// 将client加入到等待寫入傳回值隊列中,下次事件周期會進行傳回值寫入。
if (!clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
// 設定标志位并且将client加入到 clients_pending_write 隊列中
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
// 表示已經在排隊,進行傳回資料
return C_OK;
}
Redis 将存儲等待傳回的響應資料的空間,也就是輸出緩沖區分成兩部分,一個固定大小的 buffer 和一個響應内容資料的連結清單。在連結清單為空并且 buffer 有足夠空間時,則将響應添加到 buffer 中。如果 buffer 滿了則建立一個節點追加到連結清單上。_addReplyToBuffer 和 _addReplyObjectToList 就是分别向這兩個空間寫資料的方法。
固定buffer和響應連結清單,整體上構成了一個隊列。這麼組織的好處是,既可以節省記憶體,不需一開始預先配置設定大塊記憶體,并且可以避免頻繁配置設定、回收記憶體。
上面就是響應内容寫入輸出緩沖區的過程,下面看一下将資料從輸出緩沖區寫入 socket 的過程。
prepareClientToWrite 函數,将用戶端加入到了Redis 的等待寫入傳回值用戶端隊列中,也就是 clients_pending_write 隊列。請求處理的事件處理邏輯就結束了,等待 Redis 下一次事件循環處理時,将響應從輸出緩沖區寫入到 socket 中。
将指令傳回值從輸出緩沖區寫入 socket
在
《Redis 事件機制詳解》一文中我們知道,Redis 在兩次事件循環之間會調用 beforeSleep 方法處理一些事情,而對 clients_pending_write 清單的處理就在其中。
下面的 aeMain 方法就是 Redis 事件循環的主邏輯,可以看到每次循環時都會調用 beforesleep 方法。
void aeMain(aeEventLoop *eventLoop) { // ae.c
eventLoop->stop = 0;
while (!eventLoop->stop) {
/* 如果有需要在事件處理前執行的函數,那麼執行它 */
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
/* 開始處理事件*/
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
beforeSleep 函數會調用 handleClientsWithPendingWrites 函數來處理 clients_pending_write 清單。
handleClientsWithPendingWrites 方法會周遊 clients_pending_write 清單,對于每個 client 都會先調用 writeToClient 方法來嘗試将傳回資料從輸出緩存區寫入到 socekt中,如果還未寫完,則隻能調用 aeCreateFileEvent 方法來注冊一個寫資料事件處理器 sendReplyToClient,等待 Redis 事件機制的再次調用。
這樣的好處是對于傳回資料較少的用戶端,不需要麻煩的注冊寫資料事件,等待事件觸發再寫資料到 socket,而是在下一次事件循環周期就直接将資料寫到 socket中,加快了資料傳回的響應速度。
但是從這裡也會發現,如果 clients_pending_write 隊列過長,則處理時間也會很久,阻塞正常的事件響應處理,導緻 Redis 後續指令延時增加。
// 直接将傳回值寫到client的輸出緩沖區中,不需要進行系統調用,也不需要注冊寫事件處理器
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
// 擷取系統延遲寫隊列的長度
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
// 依次處理
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
// 将緩沖值寫入client的socket中,如果寫完,則跳過之後的操作。
if (writeToClient(c->fd,c,0) == C_ERR) continue;
// 還有資料未寫入,隻能注冊寫事件處理器了
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
// 注冊寫事件處理器 sendReplyToClient,等待執行
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}
sendReplyToClient 方法其實也會調用 writeToClient 方法,該方法就是将輸出緩沖區中的 buf 和 reply 清單中的資料都盡可能多的寫入到對應的 socket中。
// 将輸出緩沖區中的資料寫入socket,如果還有資料未處理則傳回C_OK
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
sds o;
// 仍然有資料未寫入
while(clientHasPendingReplies(c)) {
// 如果緩沖區有資料
if (c->bufpos > 0) {
// 寫入到 fd 代表的 socket 中
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
// 統計本次一共輸出了多少子節
totwritten += nwritten;
// buffer中的資料已經發送,則重置标志位,讓響應的後續資料寫入buffer
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
// 緩沖區沒有資料,從reply隊列中拿
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o);
if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply));
continue;
}
// 将隊列中的資料寫入 socket
nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
// 如果寫入成功,則删除隊列
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
c->reply_bytes -= objlen;
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
}
// 如果輸出的位元組數量已經超過NET_MAX_WRITES_PER_EVENT限制,break
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
server.stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
serverLog(LL_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
return C_ERR;
}
}
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
//如果内容已經全部輸出,删除事件處理器
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
// 資料全部傳回,則關閉client和連接配接
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClient(c);
return C_ERR;
}
}
return C_OK;
}
個人部落格位址,歡迎檢視