天天看点

游戏排行榜-跳表实现原理分析

前言

做游戏的一般都有游戏排行榜的需求,要查一下某个uid的积分排名第几,这里我给大家推荐之前我们使用的一种排序算法,跳表skiplist。

跳表是一个随机化的数据结构。它允许快速查询一个有序连续元素的数据链表。跳跃列表的平均查找和插入时间复杂度都是O(log n),优于普通队列的O(n)。性能上和红黑树,AVL树不相上下,但跳表的原理非常简单,目前Redis和LevelDB中都有用到。

跳表是一种可以替代平衡树的数据结构。跳表追求的是概率性平衡,而不是严格平衡。因此,跟平衡二叉树相比,跳表的插入和删除操作要简单得多,执行也更快。

跳表和平衡二叉树

为什么跳表可以高效的获取rank呢?只能说跳表的数据结构设计巧妙。

跳表本身提供的功能类似于平衡二叉树以及高级变种,可以对目标值进行快速查找,时间复杂度为O(lgN)。但是跳表的实现原理比实现一颗高效的平衡二叉树(比如红黑树)要简单太多,这是跳表非常大的一个优势。

关键在于,跳表计算某个score的排名次序,与在跳表中找到这个score的时间复杂度是一样的,仍旧是O(lgN)。反观二叉树系列,它们找到一个值也很快,但是要想知道这个值排名第几,似乎只能按照先序遍历的方式来统计排在前面的值个数。

其实跳表获取排名的思路也是数一下前面有多少个值,但因为”跳跃”的关系,统计的过程被加速了,因而rank效率更高。

跳表find原理

因为rank的计算过程,实际是伴随find某个score同时进行的,所以首先得知道find是如何进行的。

跳表本质上就是多层索引的链表,上述图中最下面一排是level1索引,串联了跳表中所有节点:5,11,20,30,68,99,跳表数据结构保证了插入位置有序。

每个节点的高度是随机确定的,所以有的节点可以串联到level2或者level3等更高层的链表中。跳表实现确保了,如果节点是高度3,那么会同时串联在level1,level2,level3的链表中。

当然,跳表不是真的随机确定插入的节点高度,而是让高的节点更少,矮的节点更多,最终产生的效果就是上图中的效果,即Level3链表的节点很少,而level2链表的节点多一些,level1链表串联了所有的节点。当我们要查找数字68的时候,我们会先header节点的最高层链表level3开始向后查找,发现68>20则走到20节点上;发68<99则降低高度到level2;发现68>30则走到30节点上;发现68<99则降低高度到level1;发现68==68,找到了目标节点。为什么要从最高层链表开始呢?因为高层链表串联的节点之间稀疏,跨度大,所以可以快速推进;一旦发现高层链表没有线索了,则需要下降高度到更稠密的链表索引中,继续向目标推进;直到某一个高度的链表索引中找到了目标;或者到最低层链表也没有找到目标,则说明目标值不存在。

相反,如果我们直接从最底层链表向后查找,性能就蜕化为一个普通链表了,当然最终一定能找到目标/找不到目标,但就缺少了”跳表”的机会了。

跳表insert原理

插入和查找过程类似,但需要多做一点事情。这里是插入数字80,白色是最终插入的位置,蓝色是此前就有的节点。

我们依旧从header节点的level3开始向后推进,每次下降level之前把当前level所处的node记录下来,也就是图中红色圈出来的节点。

然后,我们随机确定了80节点的高度是2,那么接下来各个level的链表该如何建设呢?奇迹就出现了,我们在每一level用红色圈出来的节点,其实就是每一level刚好小于80的那个节点,可以作为80在该level的链表前驱。

因为80节点高度定位了2,所以插入到了level1和level2这两层链表,其中level2对整个跳表做出了突出贡献,因为80和30之间跳过了68,可以为之后的目标查找贡献自己的跳板能力。

跳表delete原理

删除一个节点比较简单,其实还是先逐级下降找到目标节点,用红色圈出每一level的前驱。

这里删除80节点:

需要注意,对于每一level中的红圈节点,需要判断其后继是不是80,如果是才需要在该level链表中摘除,否则说明该level没有串联80节点。

跳表rank原理

之前说过,跳表计算rank实际是经历了一次对目标值的查找过程,并在这个过程中累加出来的。

在跳表中,会为每个节点在每一level维护下一跳的距离span值,比如level3中从header节点跳到20节点,实际跨越了5,11,所以header在level3的span=3。

随着对目标值68的查找,我们在不同level向右移动的过程中就只需要累加span,比如在level2中20跳30就只需要1步,所以span加1即可,最终我们可以得到68的rank其实就是3+1+1=5,即排名第5,其前方的数字是5,11,20,30,就是这样一个原理。

那么问题就是每个节点在不同level的span怎么维护比较高效?其实在插入/删除的过程中,我们可以顺便就把span更新了。

回到这张插入80的图片。

我们先圈出了在level1~3的3个前驱节点(20,30,68),它们在整个跳表中的rank我们都可以在推进过程中累加出来。

在level2,80链到30的后面,怎么算出30的span=2呢?首先我们知道68的rank,所以就知道80的rank=68的rank+1;我们也知道30的rank,所以用80的rank – 30的rank,就是30跳到80越过的节点个数,也就是30的span。

在level3,80链在68的后面,怎么算出68的span=1呢?一样的道理,我们知道68和80的rank,做减法就是68的span。

上述已经把受影响的前驱节点的span更新完成了,但是新插入节点80的span还没设置。

其实我们在更新30和68的span之前,知道30和68的旧span值(30到99和68到99的跳数)。对于level2来说,只需要用30的旧span-30的新span就是80在level2的新span值。对于level3来说,只需要用68的旧span-68的新span就是80在level3的新span值。

这就可以了吗?

上面的有几张图片是错误的,它们在level3的20没有连到99上,在跳表中这是不可能存在的,一定会有索引链过去,这是网上的错误图片。

我们脑补一下最后一张图片中缺失的那根线,然后想一下level3的20的span的值需不需要更新?

答案当然是需要了,因为在20和99之间插入了一个80,这要是level3中20跳跃到99要经过的节点。

所以,对于高于插入节点的level,我们需要对圈红的节点的span+1处理。

跳表和Redis

跳跃表在redis中主要是有序表的一种底层实现。对于普通链表的查找,即使有序,我们也不能使用二分法,需要从头开始,一个一个找,时间复杂度为O(n)。而对于跳跃表,从名字可以看出跳跃表的优势就在于可以跳跃。如何做到呢?在于其特殊的层设计。比如我们查找46,普通链表只能从头开始查找,比对-3,2,17...直到46,要比对7次。但是对于跳跃表,我们可以从最高层开始查找:

第一步:在L4层直接与55比对,发现大了,退回到第3层

第二步:在L3层与21比对,发现小了,继续往前比对55,发现大了,退回到第二层

第三步:在L2层与37比对,发现小了,往前,与55比对,发现大了,退回到第一层

第四步:在第1层,与46比对,查找成功。

共比对了6次,比普通链表只节省了一次,似乎没什么优势。但如果细想,当链表比较长的时候,在高层查找时,跳过的元素数量将相当可观,提速的效果将非常明显。比如如果元素在55之后,在L4层,我们直接就跳过了7个元素,这是非常大的进步。

跳跃表在redis中的实现

跳跃表节点定义和跳跃表描述符定义

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
// member 对象
    robj *obj;
// 分值
    double score;
// 后退指针
    struct zskiplistNode *backward;
// 层
    struct zskiplistLevel {
// 前进指针
        struct zskiplistNode *forward;
// 节点在该层和前向节点的距离
        unsigned int span;
    } level[];
} zskiplistNode;


typedef struct zskiplist {
// 头节点,尾节点
    struct zskiplistNode *header, *tail;
// 节点数量
    unsigned long length;
// 目前表内节点的最大层数
    int level;
} zskiplist;      

新建跳跃表

主要是初始化描述符对象和初始化头结点。这里需要注意的是,头结点默认层数是为32的,但描述符中指示跳跃表层数初始化为1。

zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;


// 申请内存
    zsl = zmalloc(sizeof(*zsl));
// 初始化跳跃表属性,层数初始化为1,长度初始化为0
    zsl->level = 1;
    zsl->length = 0;


// 创建一个层数为32,分值为0,成员对象为NULL的表头结点
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
// 设定每层的forward指针指向NULL
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
// 设定backward指向NULL
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}      

插入新节点

分为三步:

第一步,查找每一层的插入点,所谓插入点指新节点插入后作为新节点前继的节点

redis使用了一个数组来记录

zskiplistNode *update[ZSKIPLIST_MAXLEVEL]      

另外一个数组来记录插入点前继节点排名,所谓排名就是就链表中的位置,这个有什么用呢?主要是用来更新span字段

unsigned int rank[ZSKIPLIST_MAXLEVEL];      

第二步,新建插入点,层数随机

这一步,要注意的是,如果新节点的层数大于跳跃表的层数,需要更新跳跃表的层数外,还要做:

1 将大于原跳跃表层数的更新节点设置为头结点

2 将大于原跳跃表层数的更新点前前继节点排名设置为0

3将更新点的前继节点设置为跳跃表长度,为啥是长度?因为整个层除了头结点没有其他节点

 update[i]->level[i].span = zsl->length;

第三步,修改节点指针和跨度之span

span指示节点与后继节点的距离,如下图,o1.L1 和o2.L1距离为1,o1.L3 和o3.L3距离为2

插入新节点必然涉及到插入处前继和后继 节点指针的改,这个跟普通链表没有什么区别。至于span值的修改,因为在两个节点之间插入新节点,那么原来两点的距离就会产生改变,新节点与后继节点的跨度span也需要设置,这个怎么理解呢?

比如对于上图,要在o2和o3之间插入新节点,

update[0]=o2, rank[0]=2

update[1]=o2, rank[1]=2

update[2]=o1,rank[2]=1

update[3]=o1,rank[3]=1

update[4]=header,rank[1]=0

要设置新节点在L4层与后继节点的距离:

在L4层,在插入新节点之前,update[3]即o1的后继节点为o3,o1.level[3].span=2,插入后,新节点的后继节点为o3,怎么计算新节点在L4层的span呢?

o新.level[3].span 就是o新和o3的距离

我们知道的是插入新节点前:

1) o1和o3的距离(o1.level[3].span),

2) o1和o2的排名(rank[3]和rank[0])

3)间接知道o2和o1的距离(rank[0]-rank[3])

4)间接知道o2和o3的距离(o1.level[3].span-(rank[0]-rank[3]))

看图可知:插入节点后,o新和o3的距离,实质就是是插入节点前o2和o3的距离

下面是具体的源码实现:

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
// updata[]数组记录每一层位于插入节点的前一个节点
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  
// rank[]记录每一层位于插入节点的前一个节点的排名
//在查找某个节点的过程中,将沿途访问过的所有层的跨度累计起来,得到的结果就是目标节点在跳跃表中的排位
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
  
    int i, level;
 
    serverAssert(!isnan(score));
// 表头节点
    x = zsl->header;
  
// 从最高层开始查找(最高层节点少,跳越快)
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
 
        //rank[i]用来记录第i层达到插入位置的所跨越的节点总数,也就是该层最接近(小于)给定score的排名  
        //rank[i]初始化为上一层所跨越的节点总数,因为上一层已经加过
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
 
    //后继节点不为空,并且后继节点的score比给定的score小  
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
             //score相同,但节点的obj比给定的obj小  
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            //记录跨越了多少个节点  
            rank[i] += x->level[i].span;
      
//查找下一个节点
            x = x->level[i].forward;
        }
// 存储当前层上位于插入节点的前一个节点,找下一层的插入节点
        update[i] = x;
    }
  
    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */
 
// 此处假设插入节点的成员对象不存在于当前跳跃表内,即不存在重复的节点
    // 随机生成一个level值
    level = zslRandomLevel();
 
 
// 如果level大于当前存储的最大level值
// 设定rank数组中大于原level层以上的值为0--为什么设置为0
// 同时设定update数组大于原level层以上的数据
    if (level > zsl->level) {
 
        for (i = zsl->level; i < level; i++) {
 
//因为这一层没有节点,所以重置rank[i]为0
            rank[i] = 0;
//因为这一层还没有节点,所以节点的前一个节点都是头节点
            update[i] = zsl->header;
    
//在未添加新节点之前,需要更新的节点跨越的节点数目自然就是zsl->length---因为整个层只有一个头结点----->言外之意头结点的span都是链表长度
            update[i]->level[i].span = zsl->length;
        }
// 更新level值(max层数)
        zsl->level = level;
    }
 
// 创建插入节点
    x = zslCreateNode(level,score,obj);
    for (i = 0; i < level; i++) {
 
// 针对跳跃表的每一层,改变其forward指针的指向
        x->level[i].forward = update[i]->level[i].forward;
 
//插入位置节点的后继就是新节点  
        update[i]->level[i].forward = x;
 
        /* update span covered by update[i] as x is inserted here */
 
//rank[i]: 在第i层,update[i]->score的排名 
//rank[0] - rank[i]: update[0]->score与update[i]->score之间间隔了几个数
 
// A3 ----------------------------- [I] -> F3
// A2 ----------------> D2 -------- [I] -> F2
// A1 ---------> C1 --> D1 -------- [I] -> F1
// A0 --> B0 --> C0 --> D0 --> E0 - [I] -> F0
    
//x->level[i].span = 从x到update[i]->forword的span数目, 
//原来的update[i]->level[i].span = 从update[i]到update[i]->level[i]->forward的span数目 
//所以x->level[i].span = 原来的update[i]->level[i].span - (rank[0] - rank[i]); 
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
 
 
//对于update[i]->level[i].span值的更新由于在update[i]与update[i]->level[i]->forward之间又添加了x, 
//update[i]->level[i].span = 从update[i]到x的span数目, 
//由于update[0]后面肯定是新添加的x,所以自然新的update[i]->level[i].span = (rank[0] - rank[i]) + 1; 
 
//提示:update[i]和x[i]之间肯定没有节点了
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
 
 
//
//另外需要注意当level > zsl->level时,update[i] = zsl->header的span处理 
 
 
    /* increment span for untouched levels */
 
// 更新高层的span值
    for (i = level; i < zsl->level; i++) {
//因为下层中间插入了x,而高层没有,所以多了一个跨度
        update[i]->level[i].span++;
    }
 
// 设定插入节点的backward指针
//如果插入节点的前一个节点都是头节点,则插入节点的后向指针为NULL?
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
 
//如果插入节点的0层存前向节点则前向节点的后向指针为插入节点
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
//否则该节点为跳跃表的尾节点
        zsl->tail = x;
 
// 跳跃表长度+1
    zsl->length++;
 
//返回插入的节点
    return x;
}      

删除指定节点

删除节点相对简单一些,提供了根据排名删除节点和根据分数删除节点两个API,主要涉及如下步骤:

1)找到待删除节点在每一层的前继节点,存在updatte数组中

2)调用zslDeleteNode处理因为删除节点而引发的指针修改和span修改,以及跳跃表层数和长度修改

3)释放待删除节点

下面是源码的实现:

unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long traversed = 0, removed = 0;
    int i;
 
    x = zsl->header;
    //寻找待更新的节点
    for (i = zsl->level-1; i >= 0; i--) {
        //指针前移的必要条件是前继指针不为空
        while (x->level[i].forward && (traversed + x->level[i].span) < start) {
            //排名累加
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
 
    //下面的节点排名肯定大于等于start
    traversed++;
    x = x->level[0].forward;
    while (x && traversed <= end) {
        //逐个删除后继节点,直到end为止
        zskiplistNode *next = x->level[0].forward;
        zslDeleteNode(zsl,x,update);
        dictDelete(dict,x->obj);
        zslFreeNode(x);
        removed++;
        //每删除一个节点,排名加1
        traversed++;
        x = next;
    }
    return removed;
}






void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        //如果待更新节点(待删除节点的前一节点)的后继节点是待删除节点,则需要处理待更新节点的后继指针
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            
            //这里有可能为NULL,比如删除最后一个节点
            update[i]->level[i].forward = x->level[i].forward;
    
        //待删除节点没有出现在此层--跨度减1即可
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    //处理待删除节点的后一节点(如果存在的话)
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    //跳跃表的层数处理,如果表头层级的前向指针为空,说明这一层已经没有元素,层数要减一
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    
    //跳跃表长度减一
    zsl->length--;
}
 




int zslDelete(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;
 
    x = zsl->header;
    // 遍历所有层,记录删除节点后需要被修改的节点到 update 数组  
    for (i = zsl->level-1; i >= 0; i--) {
        //指针前移首要条件是前向节点指针不为空,次要条件是分数小于指定分数,或即使分数相等,节点成员对象也不相等------> 前向指针前移的必要条件:分数小于或等于指定分数
        while (x->level[i].forward && //前向指针不为空
            (x->level[i].forward->score < score || //前向节点分数小于指定分数
                (x->level[i].forward->score == score &&  //前向节点分数等于指定分数
                compareStringObjects(x->level[i].forward->obj,obj) < 0)))//前向节点成员对象不相同
            x = x->level[i].forward;
        //保存待删除节点的前一节点指针
        update[i] = x;
    }
    // 因为多个不同的 member 可能有相同的 score  
    // 所以要确保 x 的 member 和 score 都匹配时,才进行删除  
 
    x = x->level[0].forward;
 
    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
        zslDeleteNode(zsl, x, update);
        zslFreeNode(x);
        return 1;
    }
    return 0; /* not found */
}      

跳跃表的查询

跳跃表提供了根据排名查询元素,以及根据分数或群排名的API,间接提供了根据分数获取元素的API,查询体现了跳跃表的优势,但实现相对简单,主要是判断在当前层比对的元素是否是否小于给定元素,如果小于,且其后继指针不为空,则继续往前查找(这效率是很高的),否则往下一层找(效率相对低一点):

unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;
 
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        //指针前移的必要条件是后继指针不为空
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
            rank += x->level[i].span;
            //排名累加
            x = x->level[i].forward;
        }
 
        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->obj && equalStringObjects(x->obj,o)) {
            return rank;
        }
    }
    return 0;
}




zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;
 
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        if (traversed == rank) {
            return x;
        }
    }
    return NULL;
}      

好了,跳表的源码实际上在github上有很多成熟的代码:https://github.com/search?q=skiplist

我将在下一篇文章给大家介绍使用redis来实现游戏里常用的积分权值排行,日排行,周排行,月排行功能。

如果你觉得此文章对你有用,记得点关注,分享给更多的小伙伴

继续阅读