一、跳躍表簡介
跳躍表(skiplist)是一種随機化的資料結構,由 William Pugh 在論文《Skip lists: a probabilistic alternative to balanced trees》中提出,是一種可以于平衡樹媲美的階層化連結清單結構——查找、删除、添加等操作都可以在對數期望時間下完成,以下是一個典型的跳躍表例子:
我們在上一篇中提到了 Redis 的五種基本結構中,有一個叫做 有序清單 zset 的資料結構,它類似于 Java 中的 SortedSet 和 HashMap 的結合體,一方面它是一個 set 保證了内部 value 的唯一性,另一方面又可以給每個 value 賦予一個排序的權重值 score,來達到 排序 的目的。
它的内部實作就依賴了一種叫做 「跳躍清單」 的資料結構。
為什麼使用跳躍表
首先,因為 zset 要支援随機的插入和删除,是以它 不宜使用數組來實作,關于排序問題,我們也很容易就想到 紅黑樹/ 平衡樹 這樣的樹形結構,為什麼 Redis 不使用這樣一些結構呢?
- 性能考慮: 在高并發的情況下,樹形結構需要執行一些類似于 rebalance 這樣的可能涉及整棵樹的操作,相對來說跳躍表的變化隻涉及局部 (下面詳細說);
- 實作考慮: 在複雜度與紅黑樹相同的情況下,跳躍表實作起來更簡單,看起來也更加直覺;
基于以上的一些考慮,Redis 基于 William Pugh 的論文做出一些改進後采用了 跳躍表 這樣的結構。
本質是解決查找問題
我們先來看一個普通的連結清單結構:
我們需要這個連結清單按照 score 值進行排序,這也就意味着,當我們需要添加新的元素時,我們需要定位到插入點,這樣才可以繼續保證連結清單是有序的,通常我們會使用 二分查找法,但二分查找是有序數組的,連結清單沒辦法進行位置定位,我們除了周遊整個找到第一個比給定資料大的節點為止 (時間複雜度 O(n)) 似乎沒有更好的辦法。
但假如我們每相鄰兩個節點之間就增加一個指針,讓指針指向下一個節點,如下圖:
這樣所有新增的指針連成了一個新的連結清單,但它包含的資料卻隻有原來的一半 (圖中的為 3,11)。
現在假設我們想要查找資料時,可以根據這條新的連結清單查找,如果碰到比待查找資料大的節點時,再回到原來的連結清單中進行查找,比如,我們想要查找 7,查找的路徑則是沿着下圖中标注出的紅色指針所指向的方向進行的:
這是一個略微極端的例子,但我們仍然可以看到,通過新增加的指針查找,我們不再需要與連結清單上的每一個節點逐一進行比較,這樣改進之後需要比較的節點數大概隻有原來的一半。
利用同樣的方式,我們可以在新産生的連結清單上,繼續為每兩個相鄰的節點增加一個指針,進而産生第三層連結清單:
在這個新的三層連結清單結構中,我們試着 查找 13,那麼沿着最上層連結清單首先比較的是 11,發現 11 比 13 小,于是我們就知道隻需要到 11 後面繼續查找,進而一下子跳過了 11 前面的所有節點。
可以想象,當連結清單足夠長,這樣的多層連結清單結構可以幫助我們跳過很多下層節點,進而加快查找的效率。
更進一步的跳躍表
跳躍表 skiplist 就是受到這種多層連結清單結構的啟發而設計出來的。按照上面生成連結清單的方式,上面每一層連結清單的節點個數,是下面一層的節點個數的一半,這樣查找過程就非常類似于一個二分查找,使得查找的時間複雜度可以降低到 O(logn)。
但是,這種方法在插入資料的時候有很大的問題。新插入一個節點之後,就會打亂上下相鄰兩層連結清單上節點個數嚴格的 2:1 的對應關系。如果要維持這種對應關系,就必須把新插入的節點後面的所有節點 (也包括新插入的節點) 重新進行調整,這會讓時間複雜度重新蛻化成 O(n)。删除資料也有同樣的問題。
skiplist 為了避免這一問題,它不要求上下相鄰兩層連結清單之間的節點個數有嚴格的對應關系,而是 為每個節點随機出一個層數(level)。比如,一個節點随機出的層數是 3,那麼就把它鍊入到第 1 層到第 3 層這三層連結清單中。為了表達清楚,下圖展示了如何通過一步步的插入操作進而形成一個 skiplist 的過程:
從上面的建立和插入的過程中可以看出,每一個節點的層數(level)是随機出來的,而且新插入一個節點并不會影響到其他節點的層數,是以,插入操作隻需要修改節點前後的指針,而不需要對多個節點都進行調整,這就降低了插入操作的複雜度。
現在我們假設從我們剛才建立的這個結構中查找 23 這個不存在的數,那麼查找路徑會如下圖:
二、跳躍表的實作
Redis 中的跳躍表由
server.h/zskiplistNode
和
server.h/zskiplist
兩個結構定義,前者為跳躍表節點,後者則儲存了跳躍節點的相關資訊,同之前的
集合 list
結構類似,其實隻有
zskiplistNode
就可以實作了,但是引入後者是為了更加友善的操作:
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
// value
sds ele;
// 分值
double score;
// 後退指針
struct zskiplistNode *backward;
// 層
struct zskiplistLevel {
// 前進指針
struct zskiplistNode *forward;
// 跨度
unsigned long span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
// 跳躍表頭指針
struct zskiplistNode *header, *tail;
// 表中節點的數量
unsigned long length;
// 表中層數最大的節點的層數
int level;
} zskiplist;
正如文章開頭畫出來的那張标準的跳躍表那樣。
随機層數
對于每一個新插入的節點,都需要調用一個随機算法給它配置設定一個合理的層數,源碼在
t_zset.c/zslRandomLevel(void)
中被定義:
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
直覺上期望的目标是 50% 的機率被配置設定到
Level 1
,25% 的機率被配置設定到
Level 2
,12.5% 的機率被配置設定到
Level 3
,以此類推...有 2-63 的機率被配置設定到最頂層,因為這裡每一層的晉升率都是 50%。
Redis 跳躍表預設允許最大的層數是 32,被源碼中
ZSKIPLIST_MAXLEVEL
定義,當
Level[0]
有 264 個元素時,才能達到 32 層,是以定義 32 完全夠用了。
建立跳躍表
這個過程比較簡單,在源碼中的
t_zset.c/zslCreate
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
// 申請記憶體空間
zsl = zmalloc(sizeof(*zsl));
// 初始化層數為 1
zsl->level = 1;
// 初始化長度為 0
zsl->length = 0;
// 建立一個層數為 32,分數為 0,沒有 value 值的跳躍表頭節點
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
// 跳躍表頭節點初始化
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
// 将跳躍表頭節點的所有前進指針 forward 設定為 NULL
zsl->header->level[j].forward = NULL;
// 将跳躍表頭節點的所有跨度 span 設定為 0
zsl->header->level[j].span = 0;
}
// 跳躍表頭節點的後退指針 backward 置為 NULL
zsl->header->backward = NULL;
// 表頭指向跳躍表尾節點的指針置為 NULL
zsl->tail = NULL;
return zsl;
}
即執行完之後建立了如下結構的初始化跳躍表:
插入節點實作
這幾乎是最重要的一段代碼了,但總體思路也比較清晰簡單,如果了解了上面所說的跳躍表的原理,那麼很容易理清楚插入節點時發生的幾個動作 (幾乎跟連結清單類似):
- 找到目前我需要插入的位置 (其中包括相同 score 時的處理);
- 建立新節點,調整前後的指針指向,完成插入;
為了友善閱讀,我把源碼
t_zset.c/zslInsert
定義的插入函數拆成了幾個部分
第一部分:聲明需要存儲的變量
// 存儲搜尋路徑
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// 存儲經過的節點跨度
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
第二部分:搜尋目前節點插入位置
serverAssert(!isnan(score));
x = zsl->header;
// 逐漸降級尋找目标節點,得到 "搜尋路徑"
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 如果 score 相等,還需要比較 value 值
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 記錄 "搜尋路徑"
update[i] = x;
}
讨論: 有一種極端的情況,就是跳躍表中的所有 score 值都是一樣,zset 的查找性能會不會退化為 O(n) 呢?
從上面的源碼中我們可以發現 zset 的排序元素不隻是看 score 值,也會比較 value 值 (字元串比較)
第三部分:生成插入節點
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();
// 如果随機生成的 level 超過了目前最大 level 需要更新跳躍表的資訊
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 建立新節點
x = zslCreateNode(level,score,ele);
第四部分:重排前向指針
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
第五部分:重排後向指針并傳回
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
節點删除實作
删除過程由源碼中的
t_zset.c/zslDeleteNode
定義,和插入過程類似,都需要先把這個 "搜尋路徑" 找出來,然後對于每個層的相關節點重排一下前向後向指針,同時還要注意更新一下最高層數
maxLevel
,直接放源碼 (如果了解了插入這裡還是很容易了解的):
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
節點更新實作
當我們調用
ZADD
方法時,如果對應的 value 不存在,那就是插入過程,如果這個 value 已經存在,隻是調整一下 score 的值,那就需要走一個更新流程。
假設這個新的 score 值并不會帶來排序上的變化,那麼就不需要調整位置,直接修改元素的 score 值就可以了,但是如果排序位置改變了,那就需要調整位置,該如何調整呢?
從源碼
t_zset.c/zsetAdd
函數
1350
行左右可以看到,Redis 采用了一個非常簡單的政策:
/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
把這個元素删除再插入這個,需要經過兩次路徑搜尋,從這一點上來看,Redis 的
ZADD
代碼似乎還有進一步優化的空間。
元素排名的實作
跳躍表本身是有序的,Redis 在 skiplist 的 forward 指針上進行了優化,給每一個 forward 指針都增加了
span
屬性,用來 表示從前一個節點沿着目前層的 forward 指針跳到目前這個節點中間會跳過多少個節點。在上面的源碼中我們也可以看到 Redis 在插入、删除操作時都會小心翼翼地更新
span
值的大小。
是以,沿着 "搜尋路徑",把所有經過節點的跨度
span
值進行累加就可以算出目前元素的最終 rank 值了:
/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
// span 累加
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
return 0;
}
擴充閱讀
- 跳躍表 Skip List 的原理和實作(Java) - https://blog.csdn.net/DERRANTCM/article/details/79063312
- 【算法導論33】跳躍表(Skip list)原理與java實作 - https://blog.csdn.net/brillianteagle/article/details/52206261
參考資料
- 《Redis 設計與實作》 - http://redisbook.com/
- 【官方文檔】Redis 資料類型介紹 - http://www.redis.cn/topics/data-types-intro.html
- 《Redis 深度曆險》 - https://book.douban.com/subject/30386804/
- Redis 源碼 - https://github.com/antirez/redis
- Redis 快速入門 - 易百教程 - https://www.yiibai.com/redis/redis_quick_guide.html
- Redis【入門】就這一篇! - https://www.wmyskxz.com/2018/05/31/redis-ru-men-jiu-zhe-yi-pian/
- Redis為什麼用跳表而不用平衡樹? - https://mp.weixin.qq.com/s?__biz=MzA4NTg1MjM0Mg==&mid=2657261425&idx=1&sn=d840079ea35875a8c8e02d9b3e44cf95&scene=21#wechat_redirect
- 為啥 redis 使用跳表(skiplist)而不是使用 red-black? - 知乎@于康 - https://www.zhihu.com/question/20202931
- 本文已收錄至我的 Github 程式員成長系列 【More Than Java】,學習,不止 Code,歡迎 star:https://github.com/wmyskxz/MoreThanJava
- 個人公衆号 :wmyskxz,個人獨立域名部落格:wmyskxz.com,堅持原創輸出,下方掃碼關注,2020,與您共同成長!
非常感謝各位人才能 看到這裡,如果覺得本篇文章寫得不錯,覺得 「我沒有三顆心髒」有點東西 的話,求點贊,求關注,求分享,求留言!
創作不易,各位的支援和認可,就是我創作的最大動力,我們下篇文章見!