天天看點

從源碼看redis的sorted set與skipList詳解

zadd

用來往有序集合中添加值

> zadd count 1 paxi 2 maokitty
(integer) 2
           

複制

zadd文法為 zadd key [nx|xx] [ch] [incr] score member ,詳情戳我

通過

zrank

可以找到key對應的排序

> zrank count paxi
0
           

複制

其中0表示它是目前排序集合裡面分數最小的。 在redis的底層實作中,就用到了skiplist作為實作排序集合的資料結構

SkipList的基本性質

它維護了n個元素的動态集合,有很大的可能性,每個操作的時間花銷是O(lgn)

如何加速一個有序連結清單的搜尋速度

從源碼看redis的sorted set與skipList詳解

對于上面連結清單的搜尋,最差情況下是要全部搜尋一遍,時間花銷是O(n)。

再造一個有序的連結清單,它有指向第一個清單的指針,并從第一個清單中複制部分的值,詳情如下

從源碼看redis的sorted set與skipList詳解

對上述的結構搜尋,它的步驟如下:

  • 先從頂層開始查找,直到要找的值比目前查到的值要小; 比如要找66,在頂層查到42再往前查是72,比66要大,是以停在了42
  • 從第一步中的位置往底層走。 比如第一步中停在了42,往下走仍然在底層的42上
  • 從底層往右繼續查找,知道找到元素或者這個元素根本不存在

命名頂層連結清單為L1,底層連結清單為L2,那麼整個查找所需要的時間為

從源碼看redis的sorted set與skipList詳解
隻會查找L2的一部分,假設均分100個元素為5份,那麼底部最多找20個

要使得查詢時間最小有

從源碼看redis的sorted set與skipList詳解

,假設|L2|=n,那麼可得查找時間為

從源碼看redis的sorted set與skipList詳解

實際上可以得到的是,2個list,得到的
從源碼看redis的sorted set與skipList詳解

,3個list得到的是

從源碼看redis的sorted set與skipList詳解

,k個list得到的是

從源碼看redis的sorted set與skipList詳解

,那麼lgn層得到的是

從源碼看redis的sorted set與skipList詳解

skiplist中的搜尋與插入

理想情況下的skiplist它有lgn層

從源碼看redis的sorted set與skipList詳解

比如說要搜尋 72 ,它的搜尋路徑為圖中紅色所示的部分,從上到下,從左到右

插入元素

在最底層的連結清單中,它維護了所有的元素,關鍵是,那個元素應該插入到上面的一層呢?理想情況下,解決方案是扔一個公平的硬币(正反的機率都是1/2),如果扔到的是正面朝上,那麼就讓目前元素同時存儲到上一層連結清單,那麼平均來說,1/2的元素會存儲到第二層,1/4的元素存儲到第三層等等

證明

n個元素的skiplist有非常大的機率隻有O(lgn)層

假設它會比lgn層要大:

從源碼看redis的sorted set與skipList詳解

這意味着至少存在一個元素需要往上存儲的次數大于 clgn 次,而總原數是n個,也就是說這個機率必定小于所有元素都往上存儲的并集

從源碼看redis的sorted set與skipList詳解
BOOLE 不等式:對于任意的事件 E1,E2,...,Ek 有
從源碼看redis的sorted set與skipList詳解

也就是說

從源碼看redis的sorted set與skipList詳解

另α=c-1,也就是說,層數超過 lgn的機率小于

從源碼看redis的sorted set與skipList詳解

,當α很大的時候,機率就很小,是以可以使用 O(lgn)

在skiplist中搜尋元素,有非常大的機率期望的搜尋時間是O(lgn)

考慮已經找到了元素,現在開始回顧整個過程

  • 找到的元素從底層開始
  • 如果找到元素上面沒有元素,那麼往左回溯
  • 如果找到的元素上面有元素,那麼往上回溯
  • 直到回溯到了最上一層
  1. 這個過程中往上需要查找的次數肯定是小于等于總共的層數,而它有很大的可能性就是O(lgn)。
  2. 對于總的移動的次數來講,它的發生肯定是經過了 clgn 次的向上移動,它就等價于抛硬币的總次數,直到有 clgn 次都是正面朝上再停止,而這個事件發生的機率也就是O(lgn)

根據

Chernoff Bounds

,假設Y是代表在m次獨立随機的抛硬币過程中出現head的次數,其中每次抛硬币獲得head的機率是p,那麼對于任意的r>0有:

從源碼看redis的sorted set與skipList詳解
E[Y]表示期望值,比如m次,機率p是1/2,那麼E[Y]=m/2

根據

Chernoff Bounds

,可以得出對于任意一個c,一定存在一個d,使得抛硬币 dlgn 次,有很大的機率至少有 clgn 次硬币朝上

m=dlgn,p=1/2

綜上可以得到兩個事實:

  1. A:層數有很大的機率小于等與 clgn
  2. B:在獲得 clgn 次向上移動的過程中,需要移動的次數有很大的機率小于等于 dlgn

要這兩個同時發生Pr(A & B)有

從源碼看redis的sorted set與skipList詳解

redis中zadd過程源碼分析

在執行zadd指令後,實際上根據目前key的數量和member的大小,會有不同的存儲方式

if (server.zset_max_ziplist_entries == 0 ||
    server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
    zobj = createZsetObject();
} else {
    zobj = createZsetZiplistObject();
}
           

複制

  1. 使用zset結構
  2. 使用zsetziplist結構,ziplist請戳我

zset的結構如下

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;
           

複制

在這個結構中使用的zskiplist結構如下

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;
           

複制

連結清單中使用到的 zskiplistNode ,有如下定義

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;
           

複制

當需要往skiplist中插入元素時,主要過程如下

Code.SLICE.source("x = zsl->header;")
        .interpretation("拿到跳表的頭節點");
Code.SLICE.source("for (i = zsl->level-1; i >= 0; i--) {" +
        "        /* store rank that is crossed to reach the insert position */" +
        "        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];" +
        "        while (x->level[i].forward &&" +
        "                (x->level[i].forward->score < score ||" +
        "                    (x->level[i].forward->score == score &&" +
        "                    sdscmp(x->level[i].forward->ele,ele) < 0)))" +
        "        {" +
        "            rank[i] += x->level[i].span;" +
        "            x = x->level[i].forward;" +
        "        }" +
        "        update[i] = x;" +
        "    }")
        .interpretation("從跳表的最高層開始,1層1層的找需要插入的score對應的位置")
        .interpretation("1: rank用來計算元素在排序清單中排的順序,rank[i]表示新節點的前一個節點與每層距離頭節點的跨度")
        .interpretation("2: 隻要目前節點的分數小于要插入節點的分數,并且目前節點的前頭還有,那麼就一直往前周遊,記錄下來層級之間的跨度,和最後需要插入元素的節點的前一個節點")
        .interpretation("3: 如果分數一模一樣,則比較key,key值大,仍然往前周遊")
        .interpretation("4: 注意最高層的下标是 level-1");
Code.SLICE.source("level = zslRandomLevel();")
        .interpretation("随機産生層級");
Code.SLICE.source("if (level > zsl->level) {" +
        "        for (i = zsl->level; i < level; i++) {" +
        "            rank[i] = 0;" +
        "            update[i] = zsl->header;" +
        "            update[i]->level[i].span = zsl->length;" +
        "        }" +
        "        zsl->level = level;" +
        "    }")
        .interpretation("如果産生的層級大于目前提跳表的最大層級,那麼将目前層級置為最高的層級")
        .interpretation("1: 在所有新增的層都記下頭節點和跳表的長度");
Code.SLICE.source("x = zslCreateNode(level,score,ele);")
        .interpretation("建立一個跳表的節點對象,作為需要新插入的節點");
Code.SLICE.source(" for (i = 0; i < level; i++) {" +
        "        x->level[i].forward = update[i]->level[i].forward;" +
        "        update[i]->level[i].forward = x;" +
        "" +
        "        /* update span covered by update[i] as x is inserted here */" +
        "        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);" +
        "        update[i]->level[i].span = (rank[0] - rank[i]) + 1;" +
        "    }")
        .interpretation("周遊新元素所在層以及它下面的所有層級,插入新的元素,由于下層是插入新元素,那麼這些位置的跨度必然會使得原有跨度變成兩半")
        .interpretation("1: 在周遊的時候已經記下了下面每一層的插入位置的前一個節點,那麼新的節點的下一個節點就是已經查找位置的下一個節點,而要插入位置的元素它的下一個節點,就是新插入的節點")
        .interpretation("2:Rank[0]表示第一層的總共跨度,也就是新元素在跳表中的排序,rank[i]是新節點的前一個節點在每層距離頭節點的跨度,在插入新元素之前,前後的總跨度是 update[i]->level[i].span ")
        .interpretation("3: 假設原有節點的跨度是4,原有兩個節點的位置分别是 1和4,假設新插入的位置是 3, rank[0]-rank[i]的值2,那麼新節點的跨度就是 4-2=2(2表示新節點和新節點的下一個節點),位置1的節點的跨度就是 4-2+1=3");
Code.SLICE.source("/* increment span for untouched levels */" +
        "    for (i = level; i < zsl->level; i++) {" +
        "        update[i]->level[i].span++;" +
        "    }")
        .interpretation("在新插入層級之上的層級,它們下方由于都新插入了一個節點,那麼跨度均加1即可");
Code.SLICE.source("x->backward = (update[0] == zsl->header) ? NULL : update[0];")
        .interpretation("如果新插入節點的前一個接單是頭節點,則不設定後向指針,否則設定後向指針為它的前一個節點")
        .interpretation("1 這裡可以看到頭節點其實隻是作為1個指針使用,并不參與存值");
Code.SLICE.source("if (x->level[0].forward)" +
        "        x->level[0].forward->backward = x;" +
        "    else" +
        "        zsl->tail = x;")
        .interpretation("如果新節點前面仍然存在節點,那麼新節點的前一個節點的後節點就是新節點本身,否則說明新節點就是尾結點")
        .interpretation("1: 這裡可以看出隻有第一層才是雙向的連結清單");
Code.SLICE.source("zsl->length++;" +
        "    return x;")
        .interpretation("插入了新的節點x,那麼跳表總長度加1,傳回建立的節點即可");
           

複制

更詳細的源碼分析可以看附錄的完整源碼追蹤過程

redis中的skiplist總結

  1. 如果節點不多,允許使用ziplist會直接使用ziplist,超過門檻值使用跳表;
  2. 每個跳表節點維護了層級結構,沒有備援的存儲多餘的key和score;隻有最底下一層是雙向連結清單;
  3. 随機增長層級是通過random實作

附錄

zadd源碼追蹤完整過程

張鐵蕾-Redis内部資料結構詳解(6)——skiplist

書:redis的設計與實作

書:redis開發與運維

MIT算法課