本文所引用的源碼全部來自Redis2.8.2版本。
Redis中skiplist資料結構與API相關檔案是:redis.h與t_zset.c。
http://blog.csdn.net/acceptedxukai/article/details/8923174 這是我之前寫的關于skiplist最傳統的實作,功能遠不如Redis中跳表的強大,但是代碼簡短,比較容易了解。
轉載請注明,文章來自:http://blog.csdn.net/acceptedxukai/article/details/17333673
一、跳躍表簡介
跳躍表是一種随機化資料結構,基于并聯的連結清單,其效率可以比拟平衡二叉樹,查找、删除、插入等操作都可以在對數期望時間内完成,對比平衡樹,跳躍表的實作要簡單直覺很多。
以下是一個跳躍表的例圖(來自維基百科):
從圖中可以看出跳躍表主要有以下幾個部分構成:
1、 表頭head:負責維護跳躍表的節點指針
2、 節點node:實際儲存元素值,每個節點有一層或多層
3、 層level:儲存着指向該層下一個節點的指針
4、 表尾tail:全部由null組成
跳躍表的周遊總是從高層開始,然後随着元素值範圍的縮小,慢慢降低到低層。
二、Redis中跳躍表的資料結構
Redis作者為了适合自己功能的需要,對原來的跳躍表進行了一下修改:
1、 允許重複的score值:多個不同的元素(member)的score值可以相同2、 進行元素對比的時候,不僅要檢查score值,還需要檢查member:當score值相等時,需要比較member域進行比較。3、 結構儲存一個tail指針:跳躍表的表尾指針4、 每個節點都有一個高度為1層的前驅指針,用于從底層表尾向表頭方向周遊
跳躍表資料結構如下(redis.h):
typedef struct zskiplistNode {
robj *obj; //節點資料
double score;
struct zskiplistNode*backward; //前驅
struct zskiplistLevel {
struct zskiplistNode*forward;//後繼
unsigned int span;//該層跨越的節點數量
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode*header, *tail;
unsigned long length;//節點的數目
int level;//目前表的最大層數
} zskiplist;
資料結構中可能span這個參數最不好了解了,下面簡單解釋一下:
參照上面跳躍表的例圖,head節點的span所有level的值都将是1;node 1在level 0 span值為1,因為跨越1個元素都将走到下一個節點2,在level 1 span值為2,因為需要跨越2個元素(node 2,node 3)才能到達下一個節點3。
關于span的解釋可以詳看: http://stackoverflow.com/questions/10458572/what-does-the-skiplistnode-variable-span-mean-in-redis-h
三、簡單列出跳躍表的API,他們的作用以及算法複雜度
約定O(N)表示對于元素個數的表達,O(L)表示對于跳表層數的表達
函數名 | 作用 | 複雜度 |
zslCreateNode | 建立并傳回一個跳表節點 | O(1) |
zslCreate | 建立并初始化一個跳躍表 | O(L) |
zslFreeNode | 釋放給定的節點 | O(1) |
zslFree | 釋放給定的跳躍表 | O(N) |
zslRandomLevel | 得到新節點的層數(抛硬币法的改進) | O(1) |
zslInsert | 将給定的score與member建立節點并添加到跳表中 | O(logN) |
zslDeleteNode | 删除給定的跳表節點 | O(L) |
zslDelete | 删除給定的score與member在跳表中比對的節點 | O(logN) |
zslIsInRange | 檢查跳表中的元素score值是否在給定的範圍内 | O(1) |
zslFirstInRange | 查找第一個符合給定範圍的節點 | O(logN) |
zslLastInRange | 查找最後一個符合給定範圍的節點 | O(logN) |
zslDeleteRangeByScore | 删除score值在給定範圍内的節點 | O(logN)+O(M) |
zslDeleteRangeByRank | 删除排名在給定範圍内的節點 | O(logN)+O(M) |
zslGetRank | 傳回給定score與member在集合中的排名 | O(logN) |
zslGetElementByRank | 根據給定的rank來查找元素 | O(logN) |
四、上述API源碼的簡單解析
4.1 zslCreateNode
//建立一個skiplist節點,需要傳入所在的level,score,以及儲存的數值obj
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->obj = obj;
return zn;
}
zmalloc是Redis在系統函數malloc上自己封裝的函數,主要為了友善對記憶體使用情況的計算。
4.2 zslCreate
//建立skiplist,header不存儲任何資料
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//ZSKIPLIST_MAXLEVEL = 32
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;//後繼
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;//前驅
zsl->tail = NULL;//尾指針
return zsl;
}
4.3 zslRandomLevel
int zslRandomLevel(void) {//為新的skiplist節點生成該節點level數目
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//0.25
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
4.4 zslInsert
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
x = zsl->header;//header不存儲資料
//從高向低
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
//rank[i]用來記錄第i層達到插入位置的所跨越的節點總數,也就是該層最接近(小于)給定score的排名
//rank[i]初始化為上一層所跨越的節點總數
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
//後繼節點不為空,并且後繼節點的score比給定的score小
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
//score相同,但節點的obj比給定的obj小
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
rank[i] += x->level[i].span;//記錄跨越了多少個節點
x = x->level[i].forward;//繼續向右走
}
update[i] = x;//儲存通路的節點,并且将目前x移動到下一層
}
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not. */
level = zslRandomLevel();//計算新的level
if (level > zsl->level) {//新的level > zsl->level,需要進行更新
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;//需要更新的節點就是header
update[i]->level[i].span = zsl->length;
//在未添加新節點之前,需要更新的節點跨越的節點數目自然就是zsl->length,
}
zsl->level = level;
}
x = zslCreateNode(level,score,obj);//建立新節點
//開始插入節點
for (i = 0; i < level; i++) {
//新節點的後繼就是插入位置節點的後繼
x->level[i].forward = update[i]->level[i].forward;
//插入位置節點的後繼就是新節點
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
/**
rank[i]: 在第i層,update[i]->score的排名
rank[0] - rank[i]: update[0]->score與update[i]->score之間間隔了幾個數,即span數目
對于update[i]->level[i].span值的更新由于在update[i]與update[i]->level[i]->forward之間又添加了x,
update[i]->level[i].span = 從update[i]到x的span數目,
由于update[0]後面肯定是新添加的x,是以自然新的update[i]->level[i].span = (rank[0] - rank[i]) + 1;
x->level[i].span = 從x到update[i]->forword的span數目,
原來的update[i]->level[i].span = 從update[i]到update[i]->level[i]->forward的span數目
是以x->level[i].span = 原來的update[i]->level[i].span - (rank[0] - rank[i]);
另外需要注意當level > zsl->level時,update[i] = zsl->header的span處理
*/
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
//如果新節點的level小于原來skiplist的level,那麼在上層沒有insert新節點的span需要加1
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];//前驅指針
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
4.5 zslDeleteNode
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {//update[i]->level[i]的後繼等于要删除節點x
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {//處理前驅節點
x->level[0].forward->backward = x->backward;
} else {//否則,更新tail
zsl->tail = x->backward;
}
//收縮level
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
4.6 zslDelete
/* Delete an element with matching score/object from the skiplist. */
//根據score, obj來删除節點
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
// 周遊所有層,記錄删除節點後需要被修改的節點到 update 數組
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
// 因為多個不同的 member 可能有相同的 score
// 是以要確定 x 的 member 和 score 都比對時,才進行删除
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
} else {
return 0; /* not found */
}
return 0; /* not found */
}
4.8 zslFirstInRange
/* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
//找到跳躍表中第一個符合給定範圍的元素
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,&range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *OUT* of range. */
while (x->level[i].forward &&
!zslValueGteMin(x->level[i].forward->score,&range))
x = x->level[i].forward;
}
/* This is an inner range, so the next node cannot be NULL. */
x = x->level[0].forward;
redisAssert(x != NULL);
/* Check if score <= max. */
if (!zslValueLteMax(x->score,&range)) return NULL;
return x;
}
4.9 zslLastInRange
/* Find the last node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
//找到跳躍表中最後一個符合給定範圍的元素
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,&range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *IN* range. */
while (x->level[i].forward &&
zslValueLteMax(x->level[i].forward->score,&range))
x = x->level[i].forward;
}
/* This is an inner range, so this node cannot be NULL. */
redisAssert(x != NULL);
/* Check if score >= min. */
if (!zslValueGteMin(x->score,&range)) return NULL;
return x;
}
4.10 zslDeleteRangeByScore
/* Delete all the elements with score between min and max from the skiplist.
* Min and max are inclusive, so a score >= min || score <= max is deleted.
* Note that this function takes the reference to the hash table view of the
* sorted set, in order to remove the elements from the hash table too. */
//删除給定範圍内的 score 的元素。
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (range.minex ?//是否是閉區間
x->level[i].forward->score <= range.min :
x->level[i].forward->score < range.min))
x = x->level[i].forward;
update[i] = x;
}
/* Current node is the last with score < or <= min. */
x = x->level[0].forward;
/* Delete nodes while in range. */
while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);//删除節點x
dictDelete(dict,x->obj);//在字典中也删除
zslFreeNode(x);
removed++;
x = next;
}
return removed;//删除的節點個數
}
4.11 zslDeleteRangeByRank
/* Delete all the elements with rank between start and end from the skiplist.
* Start and end are inclusive. Note that start and end need to be 1-based */
//删除給定排序範圍内的所有節點[start,end]
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long traversed = 0, removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) < start) {
traversed += x->level[i].span;//排名
x = x->level[i].forward;
}
update[i] = x;
}
traversed++;
x = x->level[0].forward;
while (x && traversed <= end) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);//删除節點x
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
traversed++;
x = next;
}
return removed;
}
4.12 zslGetRank
//得到score在skiplist中的排名,如果元素不在skiplist中,傳回0
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;//排名,加上該層跨越的節點數目
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->obj && equalStringObjects(x->obj,o)) {// 找到目标元素
return rank;
}
}
return 0;
}
4.13 zslGetElementByRank
//根據給定的 rank 查找元素
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;//排名
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}
上述13個API函數中可能内部調用的一些函數沒有列出,相關代碼請檢視Redis 2.8.2源碼。
五、小結
跳躍表(Skiplist)是一種随機化資料結構,它在查找、插入、删除等操作的期望時間複雜度都能達到對數級,并且編碼相對簡單許多,跳躍表目前是Redis中用于存儲有序集合的底層資料結構,另外可以存儲有序集的資料結構是字典,Redis中還有一種底層資料結構intset可以用來存儲有序整數集。
Redis作者通過對原有的跳躍表進行修改,包括span的設計、score值可以重複、添加tail與backward指針等,進而實作了排序功能,從尾至頭反向周遊的功能等。
最後感謝黃健宏(huangz1990)的Redis設計與實作及其他對Redis2.6源碼的相關注釋對我在研究Redis2.8源碼方面的幫助。