天天看點

Redis内部資料結構詳解之跳躍表(skiplist)一、跳躍表簡介二、Redis中跳躍表的資料結構三、簡單列出跳躍表的API,他們的作用以及算法複雜度 四、上述API源碼的簡單解析五、小結

本文所引用的源碼全部來自Redis2.8.2版本。

Redis中skiplist資料結構與API相關檔案是:redis.h與t_zset.c。

http://blog.csdn.net/acceptedxukai/article/details/8923174 這是我之前寫的關于skiplist最傳統的實作,功能遠不如Redis中跳表的強大,但是代碼簡短,比較容易了解。

轉載請注明,文章來自:http://blog.csdn.net/acceptedxukai/article/details/17333673

一、跳躍表簡介

跳躍表是一種随機化資料結構,基于并聯的連結清單,其效率可以比拟平衡二叉樹,查找、删除、插入等操作都可以在對數期望時間内完成,對比平衡樹,跳躍表的實作要簡單直覺很多。

以下是一個跳躍表的例圖(來自維基百科):

Redis内部資料結構詳解之跳躍表(skiplist)一、跳躍表簡介二、Redis中跳躍表的資料結構三、簡單列出跳躍表的API,他們的作用以及算法複雜度 四、上述API源碼的簡單解析五、小結

從圖中可以看出跳躍表主要有以下幾個部分構成:

1、  表頭head:負責維護跳躍表的節點指針

2、  節點node:實際儲存元素值,每個節點有一層或多層

3、  層level:儲存着指向該層下一個節點的指針

4、  表尾tail:全部由null組成

跳躍表的周遊總是從高層開始,然後随着元素值範圍的縮小,慢慢降低到低層。

二、Redis中跳躍表的資料結構

 Redis作者為了适合自己功能的需要,對原來的跳躍表進行了一下修改:
1、  允許重複的score值:多個不同的元素(member)的score值可以相同
2、  進行元素對比的時候,不僅要檢查score值,還需要檢查member:當score值相等時,需要比較member域進行比較。
3、  結構儲存一個tail指針:跳躍表的表尾指針
4、  每個節點都有一個高度為1層的前驅指針,用于從底層表尾向表頭方向周遊
跳躍表資料結構如下(redis.h):
typedef struct zskiplistNode {
    robj *obj; //節點資料
    double score;
    struct zskiplistNode*backward; //前驅
    struct zskiplistLevel {
        struct zskiplistNode*forward;//後繼
        unsigned int span;//該層跨越的節點數量
    } level[];
} zskiplistNode;
 
typedef struct zskiplist {
    struct zskiplistNode*header, *tail;
    unsigned long length;//節點的數目
    int level;//目前表的最大層數
} zskiplist;
           

資料結構中可能span這個參數最不好了解了,下面簡單解釋一下:

參照上面跳躍表的例圖,head節點的span所有level的值都将是1;node 1在level 0 span值為1,因為跨越1個元素都将走到下一個節點2,在level 1 span值為2,因為需要跨越2個元素(node 2,node 3)才能到達下一個節點3。

關于span的解釋可以詳看: http://stackoverflow.com/questions/10458572/what-does-the-skiplistnode-variable-span-mean-in-redis-h

三、簡單列出跳躍表的API,他們的作用以及算法複雜度

約定O(N)表示對于元素個數的表達,O(L)表示對于跳表層數的表達
函數名 作用 複雜度
zslCreateNode 建立并傳回一個跳表節點 O(1)
zslCreate 建立并初始化一個跳躍表 O(L)
zslFreeNode 釋放給定的節點 O(1)
zslFree 釋放給定的跳躍表 O(N)
zslRandomLevel 得到新節點的層數(抛硬币法的改進) O(1)
zslInsert 将給定的score與member建立節點并添加到跳表中 O(logN)
zslDeleteNode 删除給定的跳表節點 O(L)
zslDelete 删除給定的score與member在跳表中比對的節點 O(logN)
zslIsInRange 檢查跳表中的元素score值是否在給定的範圍内 O(1)
zslFirstInRange 查找第一個符合給定範圍的節點 O(logN)
zslLastInRange 查找最後一個符合給定範圍的節點 O(logN)
zslDeleteRangeByScore 删除score值在給定範圍内的節點 O(logN)+O(M)
zslDeleteRangeByRank 删除排名在給定範圍内的節點 O(logN)+O(M)
zslGetRank 傳回給定score與member在集合中的排名 O(logN)
zslGetElementByRank 根據給定的rank來查找元素 O(logN)

 四、上述API源碼的簡單解析

4.1 zslCreateNode

//建立一個skiplist節點,需要傳入所在的level,score,以及儲存的數值obj
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->obj = obj;
    return zn;
}
           
zmalloc是Redis在系統函數malloc上自己封裝的函數,主要為了友善對記憶體使用情況的計算。

4.2  zslCreate

//建立skiplist,header不存儲任何資料
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//ZSKIPLIST_MAXLEVEL = 32
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;//後繼
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;//前驅
    zsl->tail = NULL;//尾指針
    return zsl;
}
           

4.3 zslRandomLevel

int zslRandomLevel(void) {//為新的skiplist節點生成該節點level數目
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//0.25
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
           

4.4 zslInsert

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    redisAssert(!isnan(score));
    x = zsl->header;//header不存儲資料
    //從高向低
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        //rank[i]用來記錄第i層達到插入位置的所跨越的節點總數,也就是該層最接近(小于)給定score的排名
        //rank[i]初始化為上一層所跨越的節點總數
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        //後繼節點不為空,并且後繼節點的score比給定的score小
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                //score相同,但節點的obj比給定的obj小
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            rank[i] += x->level[i].span;//記錄跨越了多少個節點
            x = x->level[i].forward;//繼續向右走
        }
        update[i] = x;//儲存通路的節點,并且将目前x移動到下一層
    }
    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */
    level = zslRandomLevel();//計算新的level
    if (level > zsl->level) {//新的level > zsl->level,需要進行更新
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;//需要更新的節點就是header
            update[i]->level[i].span = zsl->length;
            //在未添加新節點之前,需要更新的節點跨越的節點數目自然就是zsl->length,
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,obj);//建立新節點
    //開始插入節點
    for (i = 0; i < level; i++) {
        //新節點的後繼就是插入位置節點的後繼
        x->level[i].forward = update[i]->level[i].forward;
        //插入位置節點的後繼就是新節點
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        /**
            rank[i]: 在第i層,update[i]->score的排名
            rank[0] - rank[i]: update[0]->score與update[i]->score之間間隔了幾個數,即span數目
            對于update[i]->level[i].span值的更新由于在update[i]與update[i]->level[i]->forward之間又添加了x,
            update[i]->level[i].span = 從update[i]到x的span數目,
            由于update[0]後面肯定是新添加的x,是以自然新的update[i]->level[i].span = (rank[0] - rank[i]) + 1;
            x->level[i].span = 從x到update[i]->forword的span數目,
            原來的update[i]->level[i].span = 從update[i]到update[i]->level[i]->forward的span數目
            是以x->level[i].span = 原來的update[i]->level[i].span - (rank[0] - rank[i]);

            另外需要注意當level > zsl->level時,update[i] = zsl->header的span處理
        */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    //如果新節點的level小于原來skiplist的level,那麼在上層沒有insert新節點的span需要加1
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];//前驅指針
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
           

4.5 zslDeleteNode

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {//update[i]->level[i]的後繼等于要删除節點x
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {//處理前驅節點
        x->level[0].forward->backward = x->backward;
    } else {//否則,更新tail
        zsl->tail = x->backward;
    }
    //收縮level
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}
           

4.6 zslDelete

/* Delete an element with matching score/object from the skiplist. */
//根據score, obj來删除節點
int zslDelete(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    // 周遊所有層,記錄删除節點後需要被修改的節點到 update 數組
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0)))
            x = x->level[i].forward;
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    // 因為多個不同的 member 可能有相同的 score
    // 是以要確定 x 的 member 和 score 都比對時,才進行删除
    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
        zslDeleteNode(zsl, x, update);
        zslFreeNode(x);
        return 1;
    } else {
        return 0; /* not found */
    }
    return 0; /* not found */
}
           

4.8 zslFirstInRange

/* Find the first node that is contained in the specified range.
 * Returns NULL when no element is contained in the range. */
//找到跳躍表中第一個符合給定範圍的元素
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
    zskiplistNode *x;
    int i;

    /* If everything is out of range, return early. */
    if (!zslIsInRange(zsl,&range)) return NULL;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* Go forward while *OUT* of range. */
        while (x->level[i].forward &&
            !zslValueGteMin(x->level[i].forward->score,&range))
                x = x->level[i].forward;
    }

    /* This is an inner range, so the next node cannot be NULL. */
    x = x->level[0].forward;
    redisAssert(x != NULL);

    /* Check if score <= max. */
    if (!zslValueLteMax(x->score,&range)) return NULL;
    return x;
}
           

4.9 zslLastInRange

/* Find the last node that is contained in the specified range.
 * Returns NULL when no element is contained in the range. */
//找到跳躍表中最後一個符合給定範圍的元素
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
    zskiplistNode *x;
    int i;

    /* If everything is out of range, return early. */
    if (!zslIsInRange(zsl,&range)) return NULL;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* Go forward while *IN* range. */
        while (x->level[i].forward &&
            zslValueLteMax(x->level[i].forward->score,&range))
                x = x->level[i].forward;
    }

    /* This is an inner range, so this node cannot be NULL. */
    redisAssert(x != NULL);

    /* Check if score >= min. */
    if (!zslValueGteMin(x->score,&range)) return NULL;
    return x;
}
           

4.10 zslDeleteRangeByScore

/* Delete all the elements with score between min and max from the skiplist.
 * Min and max are inclusive, so a score >= min || score <= max is deleted.
 * Note that this function takes the reference to the hash table view of the
 * sorted set, in order to remove the elements from the hash table too. */
//删除給定範圍内的 score 的元素。
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long removed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (range.minex ?//是否是閉區間
            x->level[i].forward->score <= range.min :
            x->level[i].forward->score < range.min))
                x = x->level[i].forward;
        update[i] = x;
    }

    /* Current node is the last with score < or <= min. */
    x = x->level[0].forward;

    /* Delete nodes while in range. */
    while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
        zskiplistNode *next = x->level[0].forward;
        zslDeleteNode(zsl,x,update);//删除節點x
        dictDelete(dict,x->obj);//在字典中也删除
        zslFreeNode(x);
        removed++;
        x = next;
    }
    return removed;//删除的節點個數
}
           

4.11 zslDeleteRangeByRank

/* Delete all the elements with rank between start and end from the skiplist.
 * Start and end are inclusive. Note that start and end need to be 1-based */
//删除給定排序範圍内的所有節點[start,end]
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long traversed = 0, removed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) < start) {
            traversed += x->level[i].span;//排名
            x = x->level[i].forward;
        }
        update[i] = x;
    }

    traversed++;
    x = x->level[0].forward;
    while (x && traversed <= end) {
        zskiplistNode *next = x->level[0].forward;
        zslDeleteNode(zsl,x,update);//删除節點x
        dictDelete(dict,x->obj);
        zslFreeNode(x);
        removed++;
        traversed++;
        x = next;
    }
    return removed;
}
           

4.12 zslGetRank

//得到score在skiplist中的排名,如果元素不在skiplist中,傳回0
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
            rank += x->level[i].span;//排名,加上該層跨越的節點數目
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->obj && equalStringObjects(x->obj,o)) {// 找到目标元素
            return rank;
        }
    }
    return 0;
}
           

4.13 zslGetElementByRank

//根據給定的 rank 查找元素
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            traversed += x->level[i].span;//排名
            x = x->level[i].forward;
        }
        if (traversed == rank) {
            return x;
        }
    }
    return NULL;
}
           
上述13個API函數中可能内部調用的一些函數沒有列出,相關代碼請檢視Redis 2.8.2源碼。

五、小結

跳躍表(Skiplist)是一種随機化資料結構,它在查找、插入、删除等操作的期望時間複雜度都能達到對數級,并且編碼相對簡單許多,跳躍表目前是Redis中用于存儲有序集合的底層資料結構,另外可以存儲有序集的資料結構是字典,Redis中還有一種底層資料結構intset可以用來存儲有序整數集。

Redis作者通過對原有的跳躍表進行修改,包括span的設計、score值可以重複、添加tail與backward指針等,進而實作了排序功能,從尾至頭反向周遊的功能等。

最後感謝黃健宏(huangz1990)的Redis設計與實作及其他對Redis2.6源碼的相關注釋對我在研究Redis2.8源碼方面的幫助。

繼續閱讀