天天看點

遊戲排行榜-跳表實作原理分析

前言

做遊戲的一般都有遊戲排行榜的需求,要查一下某個uid的積分排名第幾,這裡我給大家推薦之前我們使用的一種排序算法,跳表skiplist。

跳表是一個随機化的資料結構。它允許快速查詢一個有序連續元素的資料連結清單。跳躍清單的平均查找和插入時間複雜度都是O(log n),優于普通隊列的O(n)。性能上和紅黑樹,AVL樹不相上下,但跳表的原理非常簡單,目前Redis和LevelDB中都有用到。

跳表是一種可以替代平衡樹的資料結構。跳表追求的是機率性平衡,而不是嚴格平衡。是以,跟平衡二叉樹相比,跳表的插入和删除操作要簡單得多,執行也更快。

跳表和平衡二叉樹

為什麼跳表可以高效的擷取rank呢?隻能說跳表的資料結構設計巧妙。

跳表本身提供的功能類似于平衡二叉樹以及進階變種,可以對目标值進行快速查找,時間複雜度為O(lgN)。但是跳表的實作原理比實作一顆高效的平衡二叉樹(比如紅黑樹)要簡單太多,這是跳表非常大的一個優勢。

關鍵在于,跳表計算某個score的排名次序,與在跳表中找到這個score的時間複雜度是一樣的,仍舊是O(lgN)。反觀二叉樹系列,它們找到一個值也很快,但是要想知道這個值排名第幾,似乎隻能按照先序周遊的方式來統計排在前面的值個數。

其實跳表擷取排名的思路也是數一下前面有多少個值,但因為”跳躍”的關系,統計的過程被加速了,因而rank效率更高。

跳表find原理

因為rank的計算過程,實際是伴随find某個score同時進行的,是以首先得知道find是如何進行的。

跳表本質上就是多層索引的連結清單,上述圖中最下面一排是level1索引,串聯了跳表中所有節點:5,11,20,30,68,99,跳表資料結構保證了插入位置有序。

每個節點的高度是随機确定的,是以有的節點可以串聯到level2或者level3等更高層的連結清單中。跳表實作確定了,如果節點是高度3,那麼會同時串聯在level1,level2,level3的連結清單中。

當然,跳表不是真的随機确定插入的節點高度,而是讓高的節點更少,矮的節點更多,最終産生的效果就是上圖中的效果,即Level3連結清單的節點很少,而level2連結清單的節點多一些,level1連結清單串聯了所有的節點。當我們要查找數字68的時候,我們會先header節點的最高層連結清單level3開始向後查找,發現68>20則走到20節點上;發68<99則降低高度到level2;發現68>30則走到30節點上;發現68<99則降低高度到level1;發現68==68,找到了目标節點。為什麼要從最高層連結清單開始呢?因為高層連結清單串聯的節點之間稀疏,跨度大,是以可以快速推進;一旦發現高層連結清單沒有線索了,則需要下降高度到更稠密的連結清單索引中,繼續向目标推進;直到某一個高度的連結清單索引中找到了目标;或者到最低層連結清單也沒有找到目标,則說明目标值不存在。

相反,如果我們直接從最底層連結清單向後查找,性能就蛻化為一個普通連結清單了,當然最終一定能找到目标/找不到目标,但就缺少了”跳表”的機會了。

跳表insert原理

插入和查找過程類似,但需要多做一點事情。這裡是插入數字80,白色是最終插入的位置,藍色是此前就有的節點。

我們依舊從header節點的level3開始向後推進,每次下降level之前把目前level所處的node記錄下來,也就是圖中紅色圈出來的節點。

然後,我們随機确定了80節點的高度是2,那麼接下來各個level的連結清單該如何建設呢?奇迹就出現了,我們在每一level用紅色圈出來的節點,其實就是每一level剛好小于80的那個節點,可以作為80在該level的連結清單前驅。

因為80節點高度定位了2,是以插入到了level1和level2這兩層連結清單,其中level2對整個跳表做出了突出貢獻,因為80和30之間跳過了68,可以為之後的目标查找貢獻自己的跳闆能力。

跳表delete原理

删除一個節點比較簡單,其實還是先逐級下降找到目标節點,用紅色圈出每一level的前驅。

這裡删除80節點:

需要注意,對于每一level中的紅圈節點,需要判斷其後繼是不是80,如果是才需要在該level連結清單中摘除,否則說明該level沒有串聯80節點。

跳表rank原理

之前說過,跳表計算rank實際是經曆了一次對目标值的查找過程,并在這個過程中累加出來的。

在跳表中,會為每個節點在每一level維護下一跳的距離span值,比如level3中從header節點跳到20節點,實際跨越了5,11,是以header在level3的span=3。

随着對目标值68的查找,我們在不同level向右移動的過程中就隻需要累加span,比如在level2中20跳30就隻需要1步,是以span加1即可,最終我們可以得到68的rank其實就是3+1+1=5,即排名第5,其前方的數字是5,11,20,30,就是這樣一個原理。

那麼問題就是每個節點在不同level的span怎麼維護比較高效?其實在插入/删除的過程中,我們可以順便就把span更新了。

回到這張插入80的圖檔。

我們先圈出了在level1~3的3個前驅節點(20,30,68),它們在整個跳表中的rank我們都可以在推進過程中累加出來。

在level2,80鍊到30的後面,怎麼算出30的span=2呢?首先我們知道68的rank,是以就知道80的rank=68的rank+1;我們也知道30的rank,是以用80的rank – 30的rank,就是30跳到80越過的節點個數,也就是30的span。

在level3,80鍊在68的後面,怎麼算出68的span=1呢?一樣的道理,我們知道68和80的rank,做減法就是68的span。

上述已經把受影響的前驅節點的span更新完成了,但是新插入節點80的span還沒設定。

其實我們在更新30和68的span之前,知道30和68的舊span值(30到99和68到99的跳數)。對于level2來說,隻需要用30的舊span-30的新span就是80在level2的新span值。對于level3來說,隻需要用68的舊span-68的新span就是80在level3的新span值。

這就可以了嗎?

上面的有幾張圖檔是錯誤的,它們在level3的20沒有連到99上,在跳表中這是不可能存在的,一定會有索引鍊過去,這是網上的錯誤圖檔。

我們腦補一下最後一張圖檔中缺失的那根線,然後想一下level3的20的span的值需不需要更新?

答案當然是需要了,因為在20和99之間插入了一個80,這要是level3中20跳躍到99要經過的節點。

是以,對于高于插入節點的level,我們需要對圈紅的節點的span+1處理。

跳表和Redis

跳躍表在redis中主要是有序表的一種底層實作。對于普通連結清單的查找,即使有序,我們也不能使用二分法,需要從頭開始,一個一個找,時間複雜度為O(n)。而對于跳躍表,從名字可以看出跳躍表的優勢就在于可以跳躍。如何做到呢?在于其特殊的層設計。比如我們查找46,普通連結清單隻能從頭開始查找,比對-3,2,17...直到46,要比對7次。但是對于跳躍表,我們可以從最高層開始查找:

第一步:在L4層直接與55比對,發現大了,退回到第3層

第二步:在L3層與21比對,發現小了,繼續往前比對55,發現大了,退回到第二層

第三步:在L2層與37比對,發現小了,往前,與55比對,發現大了,退回到第一層

第四步:在第1層,與46比對,查找成功。

共比對了6次,比普通連結清單隻節省了一次,似乎沒什麼優勢。但如果細想,當連結清單比較長的時候,在高層查找時,跳過的元素數量将相當可觀,提速的效果将非常明顯。比如如果元素在55之後,在L4層,我們直接就跳過了7個元素,這是非常大的進步。

跳躍表在redis中的實作

跳躍表節點定義和跳躍表描述符定義

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
// member 對象
    robj *obj;
// 分值
    double score;
// 後退指針
    struct zskiplistNode *backward;
// 層
    struct zskiplistLevel {
// 前進指針
        struct zskiplistNode *forward;
// 節點在該層和前向節點的距離
        unsigned int span;
    } level[];
} zskiplistNode;


typedef struct zskiplist {
// 頭節點,尾節點
    struct zskiplistNode *header, *tail;
// 節點數量
    unsigned long length;
// 目前表内節點的最大層數
    int level;
} zskiplist;      

建立跳躍表

主要是初始化描述符對象和初始化頭結點。這裡需要注意的是,頭結點預設層數是為32的,但描述符中訓示跳躍表層數初始化為1。

zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;


// 申請記憶體
    zsl = zmalloc(sizeof(*zsl));
// 初始化跳躍表屬性,層數初始化為1,長度初始化為0
    zsl->level = 1;
    zsl->length = 0;


// 建立一個層數為32,分值為0,成員對象為NULL的表頭結點
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
// 設定每層的forward指針指向NULL
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
// 設定backward指向NULL
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}      

插入新節點

分為三步:

第一步,查找每一層的插入點,所謂插入點指新節點插入後作為新節點前繼的節點

redis使用了一個數組來記錄

zskiplistNode *update[ZSKIPLIST_MAXLEVEL]      

另外一個數組來記錄插入點前繼節點排名,所謂排名就是就連結清單中的位置,這個有什麼用呢?主要是用來更新span字段

unsigned int rank[ZSKIPLIST_MAXLEVEL];      

第二步,建立插入點,層數随機

這一步,要注意的是,如果新節點的層數大于跳躍表的層數,需要更新跳躍表的層數外,還要做:

1 将大于原跳躍表層數的更新節點設定為頭結點

2 将大于原跳躍表層數的更新點前前繼節點排名設定為0

3将更新點的前繼節點設定為跳躍表長度,為啥是長度?因為整個層除了頭結點沒有其他節點

 update[i]->level[i].span = zsl->length;

第三步,修改節點指針和跨度之span

span訓示節點與後繼節點的距離,如下圖,o1.L1 和o2.L1距離為1,o1.L3 和o3.L3距離為2

插入新節點必然涉及到插入處前繼和後繼 節點指針的改,這個跟普通連結清單沒有什麼差別。至于span值的修改,因為在兩個節點之間插入新節點,那麼原來兩點的距離就會産生改變,新節點與後繼節點的跨度span也需要設定,這個怎麼了解呢?

比如對于上圖,要在o2和o3之間插入新節點,

update[0]=o2, rank[0]=2

update[1]=o2, rank[1]=2

update[2]=o1,rank[2]=1

update[3]=o1,rank[3]=1

update[4]=header,rank[1]=0

要設定新節點在L4層與後繼節點的距離:

在L4層,在插入新節點之前,update[3]即o1的後繼節點為o3,o1.level[3].span=2,插入後,新節點的後繼節點為o3,怎麼計算新節點在L4層的span呢?

o新.level[3].span 就是o新和o3的距離

我們知道的是插入新節點前:

1) o1和o3的距離(o1.level[3].span),

2) o1和o2的排名(rank[3]和rank[0])

3)間接知道o2和o1的距離(rank[0]-rank[3])

4)間接知道o2和o3的距離(o1.level[3].span-(rank[0]-rank[3]))

看圖可知:插入節點後,o新和o3的距離,實質就是是插入節點前o2和o3的距離

下面是具體的源碼實作:

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
// updata[]數組記錄每一層位于插入節點的前一個節點
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  
// rank[]記錄每一層位于插入節點的前一個節點的排名
//在查找某個節點的過程中,将沿途通路過的所有層的跨度累計起來,得到的結果就是目标節點在跳躍表中的排位
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
  
    int i, level;
 
    serverAssert(!isnan(score));
// 表頭節點
    x = zsl->header;
  
// 從最高層開始查找(最高層節點少,跳越快)
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
 
        //rank[i]用來記錄第i層達到插入位置的所跨越的節點總數,也就是該層最接近(小于)給定score的排名  
        //rank[i]初始化為上一層所跨越的節點總數,因為上一層已經加過
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
 
    //後繼節點不為空,并且後繼節點的score比給定的score小  
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
             //score相同,但節點的obj比給定的obj小  
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            //記錄跨越了多少個節點  
            rank[i] += x->level[i].span;
      
//查找下一個節點
            x = x->level[i].forward;
        }
// 存儲目前層上位于插入節點的前一個節點,找下一層的插入節點
        update[i] = x;
    }
  
    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */
 
// 此處假設插入節點的成員對象不存在于目前跳躍表内,即不存在重複的節點
    // 随機生成一個level值
    level = zslRandomLevel();
 
 
// 如果level大于目前存儲的最大level值
// 設定rank數組中大于原level層以上的值為0--為什麼設定為0
// 同時設定update數組大于原level層以上的資料
    if (level > zsl->level) {
 
        for (i = zsl->level; i < level; i++) {
 
//因為這一層沒有節點,是以重置rank[i]為0
            rank[i] = 0;
//因為這一層還沒有節點,是以節點的前一個節點都是頭節點
            update[i] = zsl->header;
    
//在未添加新節點之前,需要更新的節點跨越的節點數目自然就是zsl->length---因為整個層隻有一個頭結點----->言外之意頭結點的span都是連結清單長度
            update[i]->level[i].span = zsl->length;
        }
// 更新level值(max層數)
        zsl->level = level;
    }
 
// 建立插入節點
    x = zslCreateNode(level,score,obj);
    for (i = 0; i < level; i++) {
 
// 針對跳躍表的每一層,改變其forward指針的指向
        x->level[i].forward = update[i]->level[i].forward;
 
//插入位置節點的後繼就是新節點  
        update[i]->level[i].forward = x;
 
        /* update span covered by update[i] as x is inserted here */
 
//rank[i]: 在第i層,update[i]->score的排名 
//rank[0] - rank[i]: update[0]->score與update[i]->score之間間隔了幾個數
 
// A3 ----------------------------- [I] -> F3
// A2 ----------------> D2 -------- [I] -> F2
// A1 ---------> C1 --> D1 -------- [I] -> F1
// A0 --> B0 --> C0 --> D0 --> E0 - [I] -> F0
    
//x->level[i].span = 從x到update[i]->forword的span數目, 
//原來的update[i]->level[i].span = 從update[i]到update[i]->level[i]->forward的span數目 
//是以x->level[i].span = 原來的update[i]->level[i].span - (rank[0] - rank[i]); 
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
 
 
//對于update[i]->level[i].span值的更新由于在update[i]與update[i]->level[i]->forward之間又添加了x, 
//update[i]->level[i].span = 從update[i]到x的span數目, 
//由于update[0]後面肯定是新添加的x,是以自然新的update[i]->level[i].span = (rank[0] - rank[i]) + 1; 
 
//提示:update[i]和x[i]之間肯定沒有節點了
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
 
 
//
//另外需要注意當level > zsl->level時,update[i] = zsl->header的span處理 
 
 
    /* increment span for untouched levels */
 
// 更新高層的span值
    for (i = level; i < zsl->level; i++) {
//因為下層中間插入了x,而高層沒有,是以多了一個跨度
        update[i]->level[i].span++;
    }
 
// 設定插入節點的backward指針
//如果插入節點的前一個節點都是頭節點,則插入節點的後向指針為NULL?
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
 
//如果插入節點的0層存前向節點則前向節點的後向指針為插入節點
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
//否則該節點為跳躍表的尾節點
        zsl->tail = x;
 
// 跳躍表長度+1
    zsl->length++;
 
//傳回插入的節點
    return x;
}      

删除指定節點

删除節點相對簡單一些,提供了根據排名删除節點和根據分數删除節點兩個API,主要涉及如下步驟:

1)找到待删除節點在每一層的前繼節點,存在updatte數組中

2)調用zslDeleteNode處理因為删除節點而引發的指針修改和span修改,以及跳躍表層數和長度修改

3)釋放待删除節點

下面是源碼的實作:

unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long traversed = 0, removed = 0;
    int i;
 
    x = zsl->header;
    //尋找待更新的節點
    for (i = zsl->level-1; i >= 0; i--) {
        //指針前移的必要條件是前繼指針不為空
        while (x->level[i].forward && (traversed + x->level[i].span) < start) {
            //排名累加
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
 
    //下面的節點排名肯定大于等于start
    traversed++;
    x = x->level[0].forward;
    while (x && traversed <= end) {
        //逐個删除後繼節點,直到end為止
        zskiplistNode *next = x->level[0].forward;
        zslDeleteNode(zsl,x,update);
        dictDelete(dict,x->obj);
        zslFreeNode(x);
        removed++;
        //每删除一個節點,排名加1
        traversed++;
        x = next;
    }
    return removed;
}






void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        //如果待更新節點(待删除節點的前一節點)的後繼節點是待删除節點,則需要處理待更新節點的後繼指針
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            
            //這裡有可能為NULL,比如删除最後一個節點
            update[i]->level[i].forward = x->level[i].forward;
    
        //待删除節點沒有出現在此層--跨度減1即可
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    //處理待删除節點的後一節點(如果存在的話)
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    //跳躍表的層數處理,如果表頭層級的前向指針為空,說明這一層已經沒有元素,層數要減一
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    
    //跳躍表長度減一
    zsl->length--;
}
 




int zslDelete(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;
 
    x = zsl->header;
    // 周遊所有層,記錄删除節點後需要被修改的節點到 update 數組  
    for (i = zsl->level-1; i >= 0; i--) {
        //指針前移首要條件是前向節點指針不為空,次要條件是分數小于指定分數,或即使分數相等,節點成員對象也不相等------> 前向指針前移的必要條件:分數小于或等于指定分數
        while (x->level[i].forward && //前向指針不為空
            (x->level[i].forward->score < score || //前向節點分數小于指定分數
                (x->level[i].forward->score == score &&  //前向節點分數等于指定分數
                compareStringObjects(x->level[i].forward->obj,obj) < 0)))//前向節點成員對象不相同
            x = x->level[i].forward;
        //儲存待删除節點的前一節點指針
        update[i] = x;
    }
    // 因為多個不同的 member 可能有相同的 score  
    // 是以要確定 x 的 member 和 score 都比對時,才進行删除  
 
    x = x->level[0].forward;
 
    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
        zslDeleteNode(zsl, x, update);
        zslFreeNode(x);
        return 1;
    }
    return 0; /* not found */
}      

跳躍表的查詢

跳躍表提供了根據排名查詢元素,以及根據分數或群排名的API,間接提供了根據分數擷取元素的API,查詢展現了跳躍表的優勢,但實作相對簡單,主要是判斷在目前層比對的元素是否是否小于給定元素,如果小于,且其後繼指針不為空,則繼續往前查找(這效率是很高的),否則往下一層找(效率相對低一點):

unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;
 
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        //指針前移的必要條件是後繼指針不為空
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
            rank += x->level[i].span;
            //排名累加
            x = x->level[i].forward;
        }
 
        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->obj && equalStringObjects(x->obj,o)) {
            return rank;
        }
    }
    return 0;
}




zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;
 
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        if (traversed == rank) {
            return x;
        }
    }
    return NULL;
}      

好了,跳表的源碼實際上在github上有很多成熟的代碼:https://github.com/search?q=skiplist

我将在下一篇文章給大家介紹使用redis來實作遊戲裡常用的積分權值排行,日排行,周排行,月排行功能。

如果你覺得此文章對你有用,記得點關注,分享給更多的小夥伴

繼續閱讀