天天看點

阿裡二面,Redis底層ZSet實作壓縮清單和跳表如何選擇

作者:網際網路技術學堂

Redis底層ZSet實作壓縮清單和跳表如何選擇

Redis是一款非常流行的NoSQL資料庫,它支援多種資料結構,包括字元串、哈希、清單、集合和有序集合。在Redis中,有序集合是一個非常常用的資料結構,它提供了排序、去重和範圍查找等功能。實作有序集合可以采用兩種資料結構:壓縮清單和跳表。那麼,Redis底層ZSet實作壓縮清單和跳表如何選擇呢?下面我将詳細介紹一下這個問題。

阿裡二面,Redis底層ZSet實作壓縮清單和跳表如何選擇

背景

Redis的有序集合(Sorted Set)是基于一個資料結構ZSet實作的。ZSet中的元素是唯一的,每個元素都會關聯一個分數(score),Redis使用分數來對元素進行排序。在Redis中,ZSet有兩種實作方式:一種是基于壓縮清單(ziplist)的實作方式,另一種是基于跳表(skiplist)的實作方式。在Redis的源碼中,可以看到有兩個檔案:ziplist.c和skiplist.c分别實作了這兩種資料結構。

那麼,Redis底層ZSet實作壓縮清單和跳表如何選擇呢?這個問題的答案是:Redis會根據實際情況自動選擇使用哪種資料結構。下面我們将詳細介紹這個過程。

實作流程

壓縮清單實作

壓縮清單是Redis中一種非常簡單的資料結構,它可以同時存儲多個元素,并且支援快速的插入、删除和查找操作。在Redis中,壓縮清單的實作方式比較簡單,它隻需要在記憶體中開辟一段連續的空間,然後将多個元素按照特定的格式存儲在這段空間中即可。

對于有序集合而言,壓縮清單中存儲的元素需要按照分數從小到大排序。Redis的實作方式是将元素按照分數從小到大依次存儲在壓縮清單中,并且在元素值和分數值之間使用一個特殊的位元組來分隔它們。當需要查找某個元素時,Redis隻需要在壓縮清單中從頭到尾周遊一遍,找到分數值大于等于指定值的元素即可。

阿裡二面,Redis底層ZSet實作壓縮清單和跳表如何選擇

下面是壓縮清單實作有序集合的一些關鍵代碼:

typedef struct zskiplistNode {
// 元素的分數值
double score;
// 元素的值
robj *obj;
// 前向指針
struct zskiplistNode *backward;
// 層級結構
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;

typedef struct zskiplist {
// 表頭和表尾
struct zskiplistNode *header, *tail;
// 表中節點的數量
unsigned long length;
// 表中層數最大的節點的層數
int level;
} zskiplist;

// 建立一個新的節點
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
// 配置設定節點所需的空間
zskiplistNode *zn = malloc(sizeof(zn) + levelsizeof(struct zskiplistLevel));
// 初始化節點資訊
zn->score = score;
zn->obj = obj;
return zn;
}

// 插入一個新的節點
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
// 查找插入位置
x = zsl->header;
for (i = zsl->level; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
// 計算跨度
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 記錄前驅節點和跨度
update[i] = x;
rank[i] = (i == 0) ? 0 : rank[i-1];
}
// 計算新節點的層數
level = zslRandomLevel();
// 如果新節點的層數比表中節點的層數要大,則需要更新表頭節點的層級資訊
if (level > zsl->level) {
for (i = zsl->level+1; i <= level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 建立新節點
x = zslCreateNode(level,score,obj);
// 更新節點的前後指針和跨度資訊
for (i = 0; i <= level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 更新其他節點的跨度資訊
for (i = level+1; i <= zsl->level; i++) {
update[i]->level[i].span++;
}
// 更新表中節點的數量
zsl->length++;
return x;
}

// 删除一個節點
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
// 更新其他節點的前驅指針和跨度
for (i = 0; i <= zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
// 更新表中節點的數量和表頭節點的層級資訊
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while (zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) {
zsl->level--;
}
zsl->length--;}

// 查找指定分值的節點
zskiplistNode *zslFind(zskiplist *zsl, double score) {
zskiplistNode *x;
int i;
x = zsl->header;
// 從表頭開始查找
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && x->level[i].forward->score < score) {
x = x->level[i].forward;
}
}
// 如果找到節點的分值與指定分值相同,則傳回該節點,否則傳回 NULL
x = x->level[0].forward;
if (x && score == x->score) {
return x;
} else {
return NULL;
}
}

// 查找指定分值範圍内的節點
unsigned long zslGetRank(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
// 從表頭開始查找
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
if (x->obj && equalStringObjects(x->obj,obj)) {
return rank;
}
}
return 0;
}

// 擷取指定排名的節點
zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
// 從表頭開始查找
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank) {
traversed += x->level[i].span;
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}

// 從表中删除指定分值範圍内的節點
unsigned long zslDelete(zskiplist *zsl, double min, double max) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
x = zsl->header;
// 從表頭開始查找
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && x->level[i].forward->score < min) {
x = x->level[i].forward;
}
update[i] = x;
}
x = x->level[0].forward;
// 删除分值在指定範圍内的節點
while (x && x->score <= max) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
removed++;
x = next;
}
return removed;
}           

從上述代碼可以看出,跳表的實作并不是特别複雜,通過結構體 `zskiplist` 存儲跳表的基本資訊,通過結構體 `zskiplistNode` 存儲跳表的節點資訊,通過操作函數來完成對跳表的增删改查等操作。

在 Redis 中,使用跳表實作有序集合有如下優點

  • 有序集合中的資料是有序的,可以在資料的範圍内快速查找并傳回指定數量的資料,進而滿足排序需求;
  • 跳表相比于紅黑樹,在插入和删除操作上,時間複雜度更穩定,不會像紅黑樹那樣出現退化現象;
  • 跳表實作相對較簡單,不需要像紅黑樹那樣實作複雜的平衡性維護算法。

雖然 Redis 中有序集合使用跳表作為底層實作,但是對于一些特定場景下的資料集合,跳表并不是最優的選擇。對于資料量非常大且資料密集的集合,使用壓縮清單作為底層實作可能更優。下面我們來看一下 Redis 中壓縮清單的實作。

阿裡二面,Redis底層ZSet實作壓縮清單和跳表如何選擇

Redis 中壓縮清單的實作

在 Redis 中,壓縮清單是一種可以同時存儲字元串和整數的連續記憶體空間,是實作 Redis 清單、哈希表和有序集合等資料結構的基礎。

Redis 中的壓縮清單由多個節點組成,每個節點包含一個位元組數組和一個指向下一個節點的指針。在 Redis 中,每個壓縮清單節點的結構體定義如下:

typedef struct zlentry {
unsigned int prevrawlensize, prevrawlen;
unsigned int lensize, len;
unsigned int headersize;
unsigned char encoding;
unsigned char *p;
} zlentry;           

其中,`prevrawlensize` 和 `prevrawlen` 記錄前一個節點的長度資訊,`lensize和len 記錄目前節點的長度資訊,headersize 記錄目前節點頭部資訊的長度,encoding 記錄目前節點值的編碼方式,p` 指向目前節點的值。在 Redis 中,使用壓縮清單實作有序集合的實作非常簡單,隻需要在壓縮清單中存儲有序集合的元素即可。下面我們來看一下使用壓縮清單實作有序集合的具體實作。

Redis 中使用壓縮清單實作有序集合主要涉及到以下幾個操作

  • 插入元素
  • 删除元素
  • 查找元素
  • 擷取有序集合中的元素數量
  • 擷取指定範圍内的元素

其中,插入、删除和查找元素都比較簡單,隻需要周遊壓縮清單并進行相應操作即可。擷取有序集合中的元素數量和擷取指定範圍内的元素則需要進行一些特殊處理。在擷取有序集合中的元素數量時,需要通過周遊壓縮清單,并記錄每個節點的元素數量來實作。在擷取指定範圍内的元素時,也需要通過周遊壓縮清單,并記錄每個節點的分值來實作。

下面我們來看一下使用壓縮清單實作有序集合的插入元素操作的具體實作。

unsigned char *zlInsert(unsigned char *zl, unsigned char *p, unsigned char *s) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
size_t offset;
int nextdiff = 0, prevdiff = 0;
unsigned char encoding = 0;
long long value = 123456789; // 假設要插入的元素是整數類型,值為 123456789
zlentry entry, tail;
if (p[0] != ZIPLIST_END) {
entry = ziplistGetEntry(p);
nextdiff = p - entry.p + entry.headersize + entry.len;
} else {
tail = ziplistGetEntry(zl + ZIPLIST_HEADER_SIZE + curlen - 1);
nextdiff = 1;
}
entry.p = s;
entry.len = encoding ? 5 : strlen((char *)s);
entry.encoding = encoding;
entry.prevrawlen = 0; /* Fix after layout is clear. */
reqlen = entry.headersize + entry.len + (entry.encoding ? 4 : 0);
zl = ziplistResize(zl, curlen + reqlen);
p = zl + ZIPLIST_HEADER_SIZE + curlen + nextdiff;
if (p[-1] != ZIPLIST_END) {
prevdiff = ziplistPrevLen(p);
} else {
tail = ziplistGetEntry(zl + ZIPLIST_HEADER_SIZE + curlen - 1);
prevdiff = curlen - tail.headersize - tail.len - ZIPLIST_HEADER_SIZE;
}
offset = p - zl;
if (prevdiff < 254) {
zl = ziplist           

在上面的代碼中,我們首先計算出目前壓縮清單的長度,然後計算出需要插入元素的長度,然後調用 ziplistResize 函數來擴充壓縮清單的長度,確定有足夠的空間來存儲新的元素。接着,我們周遊壓縮清單,找到需要插入的位置,并計算出目前位置和前一個位置的偏移量。然後,我們将新元素插入到目前位置,并調整前一個節點和後一個節點的指針和偏移量資訊。最後,我們更新壓縮清單的長度資訊并傳回新的壓縮清單。

在上面的代碼中,我們假設需要插入的元素是整數類型,值為 123456789。如果需要插入的元素是字元串類型,我們隻需要将 value 變量替換成字元串的值即可。

總結

在 Redis 中,有序集合的底層實作使用了兩種資料結構:壓縮清單和跳表。其中,壓縮清單是一種緊湊的線性結構,用于存儲有序集合的元素,它在存儲單個元素時非常節省空間,并且支援在常數時間内進行插入、删除和查找操作。而跳表則是一種基于連結清單的資料結構,支援在對數時間内進行查找和插入操作,它的空間占用比較高,但是由于支援快速查找,是以在有序集合的實作中也得到了廣泛應用。

在 Redis 中,有序集合的底層實作主要依賴于具體的使用場景和資料特征來選擇使用哪種資料結構。具體來說,當有序集合的元素數量較少且每個元素的長度比較短時,可以使用壓縮清單來實作有序集合,這樣可以減少空間的占用,提高資料的讀寫性能。

繼續閱讀