天天看點

Redis:del/unlink 指令源碼解析define dictFreeKey(d, entry) \define dictFreeVal(d, entry) \

Redis(四):del/unlink 指令源碼解析

本篇我們來看一下删除過程。

對于用戶端來說,删除操作無需區分何種資料類型,隻管進行 del 操作即可。

零、删除指令 del 的定義

主要有兩個: del/unlink, 差别是 unlink 速度會更快, 因為其使用了異步删除優化模式, 其定義如下:

// 辨別隻有一個 w, 說明就是一個普通的寫操作,沒啥好說的
{"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}
// 辨別為 wF, 說明它是一個快速寫的操作,其實就是有一個異步優化的過程,稍後詳解
{"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0}
           

一、delCommand

delCommand 的作用就是直接删除某個 key 的資料,釋放記憶體即可。

// db.c, del 指令處理

void delCommand(client *c) {

// 同步删除
delGenericCommand(c,0);           

}

/ This command implements DEL and LAZYDEL. /

void delGenericCommand(client *c, int lazy) {

int numdel = 0, j;

for (j = 1; j < c->argc; j++) {
    // 自動過期資料清理
    expireIfNeeded(c->db,c->argv[j]);
    // 此處分同步删除和異步删除, 主要差别在于對于複雜資料類型的删除方面,如hash,list,set...
    // 針對 string 的删除是完全一樣的
    int deleted  = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
                          dbSyncDelete(c->db,c->argv[j]);
    // 寫指令的傳播問題
    if (deleted) {
        signalModifiedKey(c->db,c->argv[j]);
        notifyKeyspaceEvent(NOTIFY_GENERIC,
            "del",c->argv[j],c->db->id);
        server.dirty++;
        numdel++;
    }
}
// 響應删除資料量, 粒度到 key 級别
addReplyLongLong(c,numdel);           

架構代碼一看即明,隻是相比于我們普通的删除是多了不少事情。否則也不存在設計了。

二、unlinkCommand

如下,其實和del是一毛一樣的,僅是變化了一個 lazy 辨別而已。

// db.c, unlink 删除處理

void unlinkCommand(client *c) {

// 與 del 一緻,隻是 lazy 辨別不一樣
delGenericCommand(c,1);           

三、删除資料過程詳解

删除資料分同步和異步兩種實作方式,道理都差不多,隻是一個是背景删一個是前台删。我們分别來看看。

  1. 同步删除 dbSyncDelete

同步删除很簡單,隻要把對應的key删除,val删除就行了,如果有内層引用,則進行遞歸删除即可。

// db.c, 同步删除資料

/ Delete a key, value, and associated expiration entry if any, from the DB /

int dbSyncDelete(redisDb db, robj key) {

/* Deleting an entry from the expires dict will not free the sds of
 * the key, because it is shared with the main dictionary. */
// 首先從 expires 隊列删除,然後再從 db->dict 中删除
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
    if (server.cluster_enabled) slotToKeyDel(key);
    return 1;
} else {
    return 0;
}           

// dict.c, 如上, 僅僅是 dictDelete() 就可以了,是以真正的删除動作是在 dict 中實作的。

int dictDelete(dict ht, const void key) {

// nofree: 0, 即要求釋放記憶體
return dictGenericDelete(ht,key,0);           

// dict.c, nofree: 0:要釋放相應的val記憶體, 1:不釋放相應val記憶體隻删除key

/ Search and remove an element /

static int dictGenericDelete(dict d, const void key, int nofree)

{

unsigned int h, idx;
dictEntry *he, *prevHe;
int table;

if (d->ht[0].size == 0) return DICT_ERR; /* d->ht[0].table is NULL */
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
// ht[0] 和 ht[1] 如有可能都進行掃描
for (table = 0; table <= 1; table++) {
    idx = h & d->ht[table].sizemask;
    he = d->ht[table].table[idx];
    prevHe = NULL;
    while(he) {
        if (dictCompareKeys(d, key, he->key)) {
            /* Unlink the element from the list */
            if (prevHe)
                prevHe->next = he->next;
            else
                d->ht[table].table[idx] = he->next;
            // no nofree, 就是要 free 記憶體咯
            if (!nofree) {
                // 看起來 key/value 需要單獨釋放記憶體哦
                dictFreeKey(d, he);
                dictFreeVal(d, he);
            }
            zfree(he);
            d->ht[table].used--;
            return DICT_OK;
        }
        prevHe = he;
        he = he->next;
    }
    // 如果沒有進行 rehashing, 隻需掃描0就行了
    if (!dictIsRehashing(d)) break;
}
return DICT_ERR; /* not found */           

其實對于有GC收集器的語言來說,根本不用關注記憶體的釋放問題,自有背景工具處理,然而對于 c 語言這種級别語言,則是需要自行關注記憶體的。這也是本文存在的意義,不然對于一個 hash 表的元素删除操作,如上很難嗎?并沒有。

下面,我們就來看看 redis 是如何具體釋放記憶體的吧。

// dict.h, 釋放key, value 的邏輯也是非常簡單,用一個宏就定義好了

// 釋放依賴于 keyDestructor, valDestructor

define dictFreeKey(d, entry) \

if ((d)->type->keyDestructor) \
    (d)->type->keyDestructor((d)->privdata, (entry)->key)           

define dictFreeVal(d, entry) \

if ((d)->type->valDestructor) \
    (d)->type->valDestructor((d)->privdata, (entry)->v.val)           

// 是以,我們有必要回去看看 key,value 的析構方法

// 而這,又依賴于具體的資料類型,也就是你在 setXXX 的時候用到的資料類型

// 我們看一下這個 keyDestructor,valDestructor 初始化的樣子

// server.c kv的析構函數定義

/ Db->dict, keys are sds strings, vals are Redis objects. /

dictType dbDictType = {

dictSdsHash,                /* hash function */
NULL,                       /* key dup */
NULL,                       /* val dup */
dictSdsKeyCompare,          /* key compare */
dictSdsDestructor,          /* key destructor */
dictObjectDestructor   /* val destructor */           

};

// 1. 先看看 key destructor, key 的釋放

// server.c, 直接調用 sds 提供的服務即可

void dictSdsDestructor(void privdata, void val)

DICT_NOTUSED(privdata);
// sds 直接釋放key就行了
sdsfree(val);           

// sds.c, 真正釋放 value 記憶體

/ Free an sds string. No operation is performed if 's' is NULL. /

void sdsfree(sds s) {

if (s == NULL) return;
// zfree, 确實很簡單嘛, 因為 sds 是連續的記憶體空間,直接使用系統提供的方法即可删除
s_free((char*)s-sdsHdrSize(s[-1]));           

// 2. value destructor 對value的釋放, 如果說 key 一定是string格式的話,value可主不一定了,因為 redis提供豐富的資料類型呢

// server.c

void dictObjectDestructor(void privdata, void val)

DICT_NOTUSED(privdata);

if (val == NULL) return; /* Lazy freeing will set value to NULL. */
decrRefCount(val);           

// 減少 value 的引用計數

void decrRefCount(robj *o) {

if (o->refcount == 1) {
    switch(o->type) {
        // string 類型
        case OBJ_STRING: freeStringObject(o); break;
        // list 類型
        case OBJ_LIST: freeListObject(o); break;
        // set 類型
        case OBJ_SET: freeSetObject(o); break;
        // zset 類型
        case OBJ_ZSET: freeZsetObject(o); break;
        // hash 類型
        case OBJ_HASH: freeHashObject(o); break;
        default: serverPanic("Unknown object type"); break;
    }
    zfree(o);
} else {
    if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
    if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}           

額,可以看出,對key的釋放自然是簡單之極。而對 value 則謹慎許多,首先它表面上隻對引用做減操作。隻有發隻剩下1個引用即隻有目前引用的情況下,本次釋放就是最後一次釋放,是以才會回收記憶體。

// 在介紹不同資料類型的記憶體釋放前,我們可以先來看下每個元素的資料結構

// dict.h

typedef struct dictEntry {

// 存儲 key 字段内容
void *key;
// 用一個聯合體存儲value
union {
    // 存儲資料時使用 *val 存儲
    void *val;
    uint64_t u64;
    // 存儲過期時間時使用該字段
    int64_t s64;
    // 存儲 score 時使用
    double d;
} v;
// 存在hash沖突時,作連結清單使用
struct dictEntry *next;           

} dictEntry;

// 1. string 類型的釋放

// object.c

void freeStringObject(robj *o) {

// 直接調用 sds服務釋放
if (o->encoding == OBJ_ENCODING_RAW) {
    sdsfree(o->ptr);
}           

// 2. list 類型的釋放

void freeListObject(robj *o) {

switch (o->encoding) {
case OBJ_ENCODING_QUICKLIST:
    quicklistRelease(o->ptr);
    break;
default:
    serverPanic("Unknown list encoding type");
}           

// quicklist.c

/ Free entire quicklist. /

void quicklistRelease(quicklist *quicklist) {

unsigned long len;
quicklistNode *current, *next;

current = quicklist->head;
len = quicklist->len;
// 連結清單依次疊代就可以釋放完成了
while (len--) {
    next = current->next;
    // 釋放list具體值
    zfree(current->zl);
    quicklist->count -= current->count;
    // 釋放list對象
    zfree(current);

    quicklist->len--;
    current = next;
}
zfree(quicklist);           

// 3. set 類型的釋放

// object.c, set 分兩種類型, ht, intset

void freeSetObject(robj *o) {

switch (o->encoding) {
case OBJ_ENCODING_HT:
    // hash 類型則需要删除每個 hash 的 kv
    dictRelease((dict*) o->ptr);
    break;
case OBJ_ENCODING_INTSET:
    // intset 直接釋放
    zfree(o->ptr);
    break;
default:
    serverPanic("Unknown set encoding type");
}           

// dict.c,

/ Clear & Release the hash table /

void dictRelease(dict *d)

// ht[0],ht[1] 依次清理
_dictClear(d,&d->ht[0],NULL);
_dictClear(d,&d->ht[1],NULL);
zfree(d);           

/ Destroy an entire dictionary /

int _dictClear(dict d, dictht ht, void(callback)(void *)) {

unsigned long i;

/* Free all the elements */
for (i = 0; i < ht->size && ht->used > 0; i++) {
    dictEntry *he, *nextHe;

    if (callback && (i & 65535) == 0) callback(d->privdata);
    // 元素為空,hash未命中,但隻要 used > 0, 代表就還有需要删除的元素存在
    // 其實對于隻有少數幾個元素的情況下,這個效率就呵呵了
    if ((he = ht->table[i]) == NULL) continue;
    while(he) {
        nextHe = he->next;
        // 這裡的釋放 kv 邏輯和前面是一緻的
        // 看起來像是遞歸,其實不然,因為redis不存在資料類型嵌套問題,比如 hash下存儲hash, 是以不會存在遞歸
        // 具體結構會在後續解讀到
        dictFreeKey(d, he);
        dictFreeVal(d, he);
        zfree(he);
        ht->used--;
        he = nextHe;
    }
}
/* Free the table and the allocated cache structure */
zfree(ht->table);
/* Re-initialize the table */
_dictReset(ht);
return DICT_OK; /* never fails */           

// 4. hash 類型的釋放

// object.c, hash和set其實是很相似的,代碼也做了大量的複用

void freeHashObject(robj *o) {

switch (o->encoding) {
case OBJ_ENCODING_HT:
    // ht 形式與set一緻
    dictRelease((dict*) o->ptr);
    break;
case OBJ_ENCODING_ZIPLIST:
    // ziplist 直接釋放
    zfree(o->ptr);
    break;
default:
    serverPanic("Unknown hash encoding type");
    break;
}           

// 5. zset 類型的釋放

// object.c, zset 的存儲形式與其他幾個

void freeZsetObject(robj *o) {

zset *zs;
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST:
    zs = o->ptr;
    // 釋放dict 資料, ht 0,1 的釋放
    dictRelease(zs->dict);
    // 釋放skiplist 資料, 主要看下這個
    zslFree(zs->zsl);
    zfree(zs);
    break;
case OBJ_ENCODING_ZIPLIST:
    zfree(o->ptr);
    break;
default:
    serverPanic("Unknown sorted set encoding");
}           

// t_zset.c, 釋放跳表資料

/ Free a whole skiplist. /

void zslFree(zskiplist *zsl) {

zskiplistNode *node = zsl->header->level[0].forward, *next;

zfree(zsl->header);
while(node) {
    // 基于第0層資料釋放,也基于第0層做疊代,直到删除完成
    // 因為其他層資料都是引用的第0層的資料,是以釋放時無需關注
    next = node->level[0].forward;
    zslFreeNode(node);
    node = next;
}
zfree(zsl);           

// t_zset 也很簡單,隻是把 node.ele 釋放掉,再把自身釋放到即可

// 這樣的删除方式依賴于其存儲結構,咱們後續再聊

/* Free the specified skiplist node. The referenced SDS string representation

  • of the element is freed too, unless node->ele is set to NULL before calling
  • this function. */
  1. zslFreeNode(zskiplistNode *node) {

    sdsfree(node->ele);

    zfree(node);

  1. 異步删除過程

異步删除按理說會更複雜,更有意思些。隻不過我們前面已經把核心的東西撸了個遍,這剩下的也不多了。

// lazyfree.c,

int dbAsyncDelete(redisDb db, robj key) {

/* Deleting an entry from the expires dict will not free the sds of
 * the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

/* If the value is composed of a few allocations, to free in a lazy way
 * is actually just slower... So under a certain limit we just free
 * the object synchronously. */
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
    robj *val = dictGetVal(de);
    size_t free_effort = lazyfreeGetFreeEffort(val);

    /* If releasing the object is too much work, let's put it into the
     * lazy free list. */
    // 其實異步方法與同步方法的差别在這,即要求 删除的元素影響須大于某閥值(64)
    // 否則按照同步方式直接删除,因為那樣代價更小
    if (free_effort > LAZYFREE_THRESHOLD) {
        // 異步釋放+1,原子操作
        atomicIncr(lazyfree_objects,1,&lazyfree_objects_mutex);
        // 将 value 的釋放添加到異步線程隊列中去,背景處理, 任務類型為 異步釋放記憶體
        bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
        // 設定val為NULL, 以便在外部進行删除時忽略釋放value相關記憶體
        dictSetVal(db->dict,de,NULL);
    }
}

/* Release the key-val pair, or just the key if we set the val
 * field to NULL in order to lazy free it later. */
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
    if (server.cluster_enabled) slotToKeyDel(key);
    return 1;
} else {
    return 0;
}           

// bio.c, 添加異步任務到線程中, 類型由type決定,線程安全地添加

// 然後嘛,背景線程就不會停地運作了任務了

void bioCreateBackgroundJob(int type, void arg1, void arg2, void *arg3) {

struct bio_job *job = zmalloc(sizeof(*job));

job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
// 上鎖操作
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
// 喚醒任務線程
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);           

// bio.c, 背景線程任務架構,總之還是有事情可做了。

void bioProcessBackgroundJobs(void arg) {

struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;

/* Check that the type is within the right interval. */
if (type >= BIO_NUM_OPS) {
    serverLog(LL_WARNING,
        "Warning: bio thread started with wrong type %lu",type);
    return NULL;
}

/* Make the thread killable at any time, so that bioKillThreads()
 * can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
 * receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
    serverLog(LL_WARNING,
        "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
// 任務一直運作
while(1) {
    listNode *ln;

    /* The loop always starts with the lock hold. */
    if (listLength(bio_jobs[type]) == 0) {
        // 注意此處将會釋放鎖喲,以便外部可以添加任務進來
        pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
        continue;
    }
    /* Pop the job from the queue. */
    ln = listFirst(bio_jobs[type]);
    job = ln->value;
    /* It is now possible to unlock the background system as we know have
     * a stand alone job structure to process.*/
    pthread_mutex_unlock(&bio_mutex[type]);

    /* Process the job accordingly to its type. */
    if (type == BIO_CLOSE_FILE) {
        close((long)job->arg1);
    } else if (type == BIO_AOF_FSYNC) {
        aof_fsync((long)job->arg1);
    } 
    // 也就是這玩意了,會去處理送出過來的任務
    else if (type == BIO_LAZY_FREE) {
        /* What we free changes depending on what arguments are set:
         * arg1 -> free the object at pointer.
         * arg2 & arg3 -> free two dictionaries (a Redis DB).
         * only arg3 -> free the skiplist. */
        // 本文介紹的删除value形式,用第一種情況
        if (job->arg1)
            lazyfreeFreeObjectFromBioThread(job->arg1);
        else if (job->arg2 && job->arg3)
            lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
        else if (job->arg3)
            lazyfreeFreeSlotsMapFromBioThread(job->arg3);
    } else {
        serverPanic("Wrong job type in bioProcessBackgroundJobs().");
    }
    zfree(job);

    /* Unblock threads blocked on bioWaitStepOfType() if any. */
    // 喚醒所有相關等待線程
    pthread_cond_broadcast(&bio_step_cond[type]);

    /* Lock again before reiterating the loop, if there are no longer
     * jobs to process we'll block again in pthread_cond_wait(). */
    pthread_mutex_lock(&bio_mutex[type]);
    listDelNode(bio_jobs[type],ln);
    bio_pending[type]--;
}           

// lazyfree.c, 和同步删除一緻了

/* Release objects from the lazyfree thread. It's just decrRefCount()

  • updating the count of objects to release. */
  1. lazyfreeFreeObjectFromBioThread(robj *o) {

    decrRefCount(o);

    atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex);

從此處redis異步處理過程,我們可以看到,redis并不是每次進入異步時都進行異步操作,而是在必要的時候才會進行。這也提示我們,不要為了異步而異步,而是應該計算利弊。

如此,整個 del/unlink 的過程就完成了。總體來說,删除都是基于hash的簡單remove而已,唯一有點難度是對記憶體的回收問題,這其實就是一個簡單的使用引用計數器算法實作的垃圾回收器應該做的事而已。勿須多言。具體過程需依賴于資料的存儲結構,主要目的自然是遞歸釋放空間,避免記憶體洩漏了。

原文位址

https://www.cnblogs.com/yougewe/p/12231419.html