天天看点

阿里二面,Redis底层ZSet实现压缩列表和跳表如何选择

作者:互联网技术学堂

Redis底层ZSet实现压缩列表和跳表如何选择

Redis是一款非常流行的NoSQL数据库,它支持多种数据结构,包括字符串、哈希、列表、集合和有序集合。在Redis中,有序集合是一个非常常用的数据结构,它提供了排序、去重和范围查找等功能。实现有序集合可以采用两种数据结构:压缩列表和跳表。那么,Redis底层ZSet实现压缩列表和跳表如何选择呢?下面我将详细介绍一下这个问题。

阿里二面,Redis底层ZSet实现压缩列表和跳表如何选择

背景

Redis的有序集合(Sorted Set)是基于一个数据结构ZSet实现的。ZSet中的元素是唯一的,每个元素都会关联一个分数(score),Redis使用分数来对元素进行排序。在Redis中,ZSet有两种实现方式:一种是基于压缩列表(ziplist)的实现方式,另一种是基于跳表(skiplist)的实现方式。在Redis的源码中,可以看到有两个文件:ziplist.c和skiplist.c分别实现了这两种数据结构。

那么,Redis底层ZSet实现压缩列表和跳表如何选择呢?这个问题的答案是:Redis会根据实际情况自动选择使用哪种数据结构。下面我们将详细介绍这个过程。

实现流程

压缩列表实现

压缩列表是Redis中一种非常简单的数据结构,它可以同时存储多个元素,并且支持快速的插入、删除和查找操作。在Redis中,压缩列表的实现方式比较简单,它只需要在内存中开辟一段连续的空间,然后将多个元素按照特定的格式存储在这段空间中即可。

对于有序集合而言,压缩列表中存储的元素需要按照分数从小到大排序。Redis的实现方式是将元素按照分数从小到大依次存储在压缩列表中,并且在元素值和分数值之间使用一个特殊的字节来分隔它们。当需要查找某个元素时,Redis只需要在压缩列表中从头到尾遍历一遍,找到分数值大于等于指定值的元素即可。

阿里二面,Redis底层ZSet实现压缩列表和跳表如何选择

下面是压缩列表实现有序集合的一些关键代码:

typedef struct zskiplistNode {
// 元素的分数值
double score;
// 元素的值
robj *obj;
// 前向指针
struct zskiplistNode *backward;
// 层级结构
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;

typedef struct zskiplist {
// 表头和表尾
struct zskiplistNode *header, *tail;
// 表中节点的数量
unsigned long length;
// 表中层数最大的节点的层数
int level;
} zskiplist;

// 创建一个新的节点
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
// 分配节点所需的空间
zskiplistNode *zn = malloc(sizeof(zn) + levelsizeof(struct zskiplistLevel));
// 初始化节点信息
zn->score = score;
zn->obj = obj;
return zn;
}

// 插入一个新的节点
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
// 查找插入位置
x = zsl->header;
for (i = zsl->level; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
// 计算跨度
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 记录前驱节点和跨度
update[i] = x;
rank[i] = (i == 0) ? 0 : rank[i-1];
}
// 计算新节点的层数
level = zslRandomLevel();
// 如果新节点的层数比表中节点的层数要大,则需要更新表头节点的层级信息
if (level > zsl->level) {
for (i = zsl->level+1; i <= level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 创建新节点
x = zslCreateNode(level,score,obj);
// 更新节点的前后指针和跨度信息
for (i = 0; i <= level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 更新其他节点的跨度信息
for (i = level+1; i <= zsl->level; i++) {
update[i]->level[i].span++;
}
// 更新表中节点的数量
zsl->length++;
return x;
}

// 删除一个节点
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
// 更新其他节点的前驱指针和跨度
for (i = 0; i <= zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
// 更新表中节点的数量和表头节点的层级信息
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while (zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) {
zsl->level--;
}
zsl->length--;}

// 查找指定分值的节点
zskiplistNode *zslFind(zskiplist *zsl, double score) {
zskiplistNode *x;
int i;
x = zsl->header;
// 从表头开始查找
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && x->level[i].forward->score < score) {
x = x->level[i].forward;
}
}
// 如果找到节点的分值与指定分值相同,则返回该节点,否则返回 NULL
x = x->level[0].forward;
if (x && score == x->score) {
return x;
} else {
return NULL;
}
}

// 查找指定分值范围内的节点
unsigned long zslGetRank(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
// 从表头开始查找
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
if (x->obj && equalStringObjects(x->obj,obj)) {
return rank;
}
}
return 0;
}

// 获取指定排名的节点
zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
// 从表头开始查找
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank) {
traversed += x->level[i].span;
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}

// 从表中删除指定分值范围内的节点
unsigned long zslDelete(zskiplist *zsl, double min, double max) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
x = zsl->header;
// 从表头开始查找
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && x->level[i].forward->score < min) {
x = x->level[i].forward;
}
update[i] = x;
}
x = x->level[0].forward;
// 删除分值在指定范围内的节点
while (x && x->score <= max) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
removed++;
x = next;
}
return removed;
}           

从上述代码可以看出,跳表的实现并不是特别复杂,通过结构体 `zskiplist` 存储跳表的基本信息,通过结构体 `zskiplistNode` 存储跳表的节点信息,通过操作函数来完成对跳表的增删改查等操作。

在 Redis 中,使用跳表实现有序集合有如下优点

  • 有序集合中的数据是有序的,可以在数据的范围内快速查找并返回指定数量的数据,从而满足排序需求;
  • 跳表相比于红黑树,在插入和删除操作上,时间复杂度更稳定,不会像红黑树那样出现退化现象;
  • 跳表实现相对较简单,不需要像红黑树那样实现复杂的平衡性维护算法。

虽然 Redis 中有序集合使用跳表作为底层实现,但是对于一些特定场景下的数据集合,跳表并不是最优的选择。对于数据量非常大且数据密集的集合,使用压缩列表作为底层实现可能更优。下面我们来看一下 Redis 中压缩列表的实现。

阿里二面,Redis底层ZSet实现压缩列表和跳表如何选择

Redis 中压缩列表的实现

在 Redis 中,压缩列表是一种可以同时存储字符串和整数的连续内存空间,是实现 Redis 列表、哈希表和有序集合等数据结构的基础。

Redis 中的压缩列表由多个节点组成,每个节点包含一个字节数组和一个指向下一个节点的指针。在 Redis 中,每个压缩列表节点的结构体定义如下:

typedef struct zlentry {
unsigned int prevrawlensize, prevrawlen;
unsigned int lensize, len;
unsigned int headersize;
unsigned char encoding;
unsigned char *p;
} zlentry;           

其中,`prevrawlensize` 和 `prevrawlen` 记录前一个节点的长度信息,`lensize和len 记录当前节点的长度信息,headersize 记录当前节点头部信息的长度,encoding 记录当前节点值的编码方式,p` 指向当前节点的值。在 Redis 中,使用压缩列表实现有序集合的实现非常简单,只需要在压缩列表中存储有序集合的元素即可。下面我们来看一下使用压缩列表实现有序集合的具体实现。

Redis 中使用压缩列表实现有序集合主要涉及到以下几个操作

  • 插入元素
  • 删除元素
  • 查找元素
  • 获取有序集合中的元素数量
  • 获取指定范围内的元素

其中,插入、删除和查找元素都比较简单,只需要遍历压缩列表并进行相应操作即可。获取有序集合中的元素数量和获取指定范围内的元素则需要进行一些特殊处理。在获取有序集合中的元素数量时,需要通过遍历压缩列表,并记录每个节点的元素数量来实现。在获取指定范围内的元素时,也需要通过遍历压缩列表,并记录每个节点的分值来实现。

下面我们来看一下使用压缩列表实现有序集合的插入元素操作的具体实现。

unsigned char *zlInsert(unsigned char *zl, unsigned char *p, unsigned char *s) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
size_t offset;
int nextdiff = 0, prevdiff = 0;
unsigned char encoding = 0;
long long value = 123456789; // 假设要插入的元素是整数类型,值为 123456789
zlentry entry, tail;
if (p[0] != ZIPLIST_END) {
entry = ziplistGetEntry(p);
nextdiff = p - entry.p + entry.headersize + entry.len;
} else {
tail = ziplistGetEntry(zl + ZIPLIST_HEADER_SIZE + curlen - 1);
nextdiff = 1;
}
entry.p = s;
entry.len = encoding ? 5 : strlen((char *)s);
entry.encoding = encoding;
entry.prevrawlen = 0; /* Fix after layout is clear. */
reqlen = entry.headersize + entry.len + (entry.encoding ? 4 : 0);
zl = ziplistResize(zl, curlen + reqlen);
p = zl + ZIPLIST_HEADER_SIZE + curlen + nextdiff;
if (p[-1] != ZIPLIST_END) {
prevdiff = ziplistPrevLen(p);
} else {
tail = ziplistGetEntry(zl + ZIPLIST_HEADER_SIZE + curlen - 1);
prevdiff = curlen - tail.headersize - tail.len - ZIPLIST_HEADER_SIZE;
}
offset = p - zl;
if (prevdiff < 254) {
zl = ziplist           

在上面的代码中,我们首先计算出当前压缩列表的长度,然后计算出需要插入元素的长度,然后调用 ziplistResize 函数来扩展压缩列表的长度,确保有足够的空间来存储新的元素。接着,我们遍历压缩列表,找到需要插入的位置,并计算出当前位置和前一个位置的偏移量。然后,我们将新元素插入到当前位置,并调整前一个节点和后一个节点的指针和偏移量信息。最后,我们更新压缩列表的长度信息并返回新的压缩列表。

在上面的代码中,我们假设需要插入的元素是整数类型,值为 123456789。如果需要插入的元素是字符串类型,我们只需要将 value 变量替换成字符串的值即可。

总结

在 Redis 中,有序集合的底层实现使用了两种数据结构:压缩列表和跳表。其中,压缩列表是一种紧凑的线性结构,用于存储有序集合的元素,它在存储单个元素时非常节省空间,并且支持在常数时间内进行插入、删除和查找操作。而跳表则是一种基于链表的数据结构,支持在对数时间内进行查找和插入操作,它的空间占用比较高,但是由于支持快速查找,因此在有序集合的实现中也得到了广泛应用。

在 Redis 中,有序集合的底层实现主要依赖于具体的使用场景和数据特征来选择使用哪种数据结构。具体来说,当有序集合的元素数量较少且每个元素的长度比较短时,可以使用压缩列表来实现有序集合,这样可以减少空间的占用,提高数据的读写性能。

继续阅读