zadd
用來往有序集合中添加值
> zadd count 1 paxi 2 maokitty
(integer) 2
複制
zadd文法為 zadd key [nx|xx] [ch] [incr] score member ,詳情戳我
通過
zrank
可以找到key對應的排序
> zrank count paxi
0
複制
其中0表示它是目前排序集合裡面分數最小的。 在redis的底層實作中,就用到了skiplist作為實作排序集合的資料結構
SkipList的基本性質
它維護了n個元素的動态集合,有很大的可能性,每個操作的時間花銷是O(lgn)
如何加速一個有序連結清單的搜尋速度

對于上面連結清單的搜尋,最差情況下是要全部搜尋一遍,時間花銷是O(n)。
再造一個有序的連結清單,它有指向第一個清單的指針,并從第一個清單中複制部分的值,詳情如下
對上述的結構搜尋,它的步驟如下:
- 先從頂層開始查找,直到要找的值比目前查到的值要小; 比如要找66,在頂層查到42再往前查是72,比66要大,是以停在了42
- 從第一步中的位置往底層走。 比如第一步中停在了42,往下走仍然在底層的42上
- 從底層往右繼續查找,知道找到元素或者這個元素根本不存在
命名頂層連結清單為L1,底層連結清單為L2,那麼整個查找所需要的時間為
隻會查找L2的一部分,假設均分100個元素為5份,那麼底部最多找20個
要使得查詢時間最小有
,假設|L2|=n,那麼可得查找時間為
。
實際上可以得到的是,2個list,得到的
,3個list得到的是
,k個list得到的是
,那麼lgn層得到的是
skiplist中的搜尋與插入
理想情況下的skiplist它有lgn層
比如說要搜尋 72 ,它的搜尋路徑為圖中紅色所示的部分,從上到下,從左到右
插入元素
在最底層的連結清單中,它維護了所有的元素,關鍵是,那個元素應該插入到上面的一層呢?理想情況下,解決方案是扔一個公平的硬币(正反的機率都是1/2),如果扔到的是正面朝上,那麼就讓目前元素同時存儲到上一層連結清單,那麼平均來說,1/2的元素會存儲到第二層,1/4的元素存儲到第三層等等
證明
n個元素的skiplist有非常大的機率隻有O(lgn)層
假設它會比lgn層要大:
這意味着至少存在一個元素需要往上存儲的次數大于 clgn 次,而總原數是n個,也就是說這個機率必定小于所有元素都往上存儲的并集
BOOLE 不等式:對于任意的事件 E1,E2,...,Ek 有
也就是說
另α=c-1,也就是說,層數超過 lgn的機率小于
,當α很大的時候,機率就很小,是以可以使用 O(lgn)
在skiplist中搜尋元素,有非常大的機率期望的搜尋時間是O(lgn)
考慮已經找到了元素,現在開始回顧整個過程
- 找到的元素從底層開始
- 如果找到元素上面沒有元素,那麼往左回溯
- 如果找到的元素上面有元素,那麼往上回溯
- 直到回溯到了最上一層
- 這個過程中往上需要查找的次數肯定是小于等于總共的層數,而它有很大的可能性就是O(lgn)。
- 對于總的移動的次數來講,它的發生肯定是經過了 clgn 次的向上移動,它就等價于抛硬币的總次數,直到有 clgn 次都是正面朝上再停止,而這個事件發生的機率也就是O(lgn)
根據
Chernoff Bounds
,假設Y是代表在m次獨立随機的抛硬币過程中出現head的次數,其中每次抛硬币獲得head的機率是p,那麼對于任意的r>0有:
E[Y]表示期望值,比如m次,機率p是1/2,那麼E[Y]=m/2
根據
Chernoff Bounds
,可以得出對于任意一個c,一定存在一個d,使得抛硬币 dlgn 次,有很大的機率至少有 clgn 次硬币朝上
m=dlgn,p=1/2
綜上可以得到兩個事實:
- A:層數有很大的機率小于等與 clgn
- B:在獲得 clgn 次向上移動的過程中,需要移動的次數有很大的機率小于等于 dlgn
要這兩個同時發生Pr(A & B)有
redis中zadd過程源碼分析
在執行zadd指令後,實際上根據目前key的數量和member的大小,會有不同的存儲方式
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetZiplistObject();
}
複制
- 使用zset結構
- 使用zsetziplist結構,ziplist請戳我
zset的結構如下
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
複制
在這個結構中使用的zskiplist結構如下
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
複制
連結清單中使用到的 zskiplistNode ,有如下定義
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;
複制
當需要往skiplist中插入元素時,主要過程如下
Code.SLICE.source("x = zsl->header;")
.interpretation("拿到跳表的頭節點");
Code.SLICE.source("for (i = zsl->level-1; i >= 0; i--) {" +
" /* store rank that is crossed to reach the insert position */" +
" rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];" +
" while (x->level[i].forward &&" +
" (x->level[i].forward->score < score ||" +
" (x->level[i].forward->score == score &&" +
" sdscmp(x->level[i].forward->ele,ele) < 0)))" +
" {" +
" rank[i] += x->level[i].span;" +
" x = x->level[i].forward;" +
" }" +
" update[i] = x;" +
" }")
.interpretation("從跳表的最高層開始,1層1層的找需要插入的score對應的位置")
.interpretation("1: rank用來計算元素在排序清單中排的順序,rank[i]表示新節點的前一個節點與每層距離頭節點的跨度")
.interpretation("2: 隻要目前節點的分數小于要插入節點的分數,并且目前節點的前頭還有,那麼就一直往前周遊,記錄下來層級之間的跨度,和最後需要插入元素的節點的前一個節點")
.interpretation("3: 如果分數一模一樣,則比較key,key值大,仍然往前周遊")
.interpretation("4: 注意最高層的下标是 level-1");
Code.SLICE.source("level = zslRandomLevel();")
.interpretation("随機産生層級");
Code.SLICE.source("if (level > zsl->level) {" +
" for (i = zsl->level; i < level; i++) {" +
" rank[i] = 0;" +
" update[i] = zsl->header;" +
" update[i]->level[i].span = zsl->length;" +
" }" +
" zsl->level = level;" +
" }")
.interpretation("如果産生的層級大于目前提跳表的最大層級,那麼将目前層級置為最高的層級")
.interpretation("1: 在所有新增的層都記下頭節點和跳表的長度");
Code.SLICE.source("x = zslCreateNode(level,score,ele);")
.interpretation("建立一個跳表的節點對象,作為需要新插入的節點");
Code.SLICE.source(" for (i = 0; i < level; i++) {" +
" x->level[i].forward = update[i]->level[i].forward;" +
" update[i]->level[i].forward = x;" +
"" +
" /* update span covered by update[i] as x is inserted here */" +
" x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);" +
" update[i]->level[i].span = (rank[0] - rank[i]) + 1;" +
" }")
.interpretation("周遊新元素所在層以及它下面的所有層級,插入新的元素,由于下層是插入新元素,那麼這些位置的跨度必然會使得原有跨度變成兩半")
.interpretation("1: 在周遊的時候已經記下了下面每一層的插入位置的前一個節點,那麼新的節點的下一個節點就是已經查找位置的下一個節點,而要插入位置的元素它的下一個節點,就是新插入的節點")
.interpretation("2:Rank[0]表示第一層的總共跨度,也就是新元素在跳表中的排序,rank[i]是新節點的前一個節點在每層距離頭節點的跨度,在插入新元素之前,前後的總跨度是 update[i]->level[i].span ")
.interpretation("3: 假設原有節點的跨度是4,原有兩個節點的位置分别是 1和4,假設新插入的位置是 3, rank[0]-rank[i]的值2,那麼新節點的跨度就是 4-2=2(2表示新節點和新節點的下一個節點),位置1的節點的跨度就是 4-2+1=3");
Code.SLICE.source("/* increment span for untouched levels */" +
" for (i = level; i < zsl->level; i++) {" +
" update[i]->level[i].span++;" +
" }")
.interpretation("在新插入層級之上的層級,它們下方由于都新插入了一個節點,那麼跨度均加1即可");
Code.SLICE.source("x->backward = (update[0] == zsl->header) ? NULL : update[0];")
.interpretation("如果新插入節點的前一個接單是頭節點,則不設定後向指針,否則設定後向指針為它的前一個節點")
.interpretation("1 這裡可以看到頭節點其實隻是作為1個指針使用,并不參與存值");
Code.SLICE.source("if (x->level[0].forward)" +
" x->level[0].forward->backward = x;" +
" else" +
" zsl->tail = x;")
.interpretation("如果新節點前面仍然存在節點,那麼新節點的前一個節點的後節點就是新節點本身,否則說明新節點就是尾結點")
.interpretation("1: 這裡可以看出隻有第一層才是雙向的連結清單");
Code.SLICE.source("zsl->length++;" +
" return x;")
.interpretation("插入了新的節點x,那麼跳表總長度加1,傳回建立的節點即可");
複制
更詳細的源碼分析可以看附錄的完整源碼追蹤過程
redis中的skiplist總結
- 如果節點不多,允許使用ziplist會直接使用ziplist,超過門檻值使用跳表;
- 每個跳表節點維護了層級結構,沒有備援的存儲多餘的key和score;隻有最底下一層是雙向連結清單;
- 随機增長層級是通過random實作
附錄
zadd源碼追蹤完整過程
張鐵蕾-Redis内部資料結構詳解(6)——skiplist
書:redis的設計與實作
書:redis開發與運維
MIT算法課