天天看點

Redis源碼剖析--壓縮清單ziplistziplist結構ziplist基本操作ziplist小結

Redis源碼剖析–壓縮清單ziplist

  • ziplist結構
    • 頭尾結構
    • 節點結構
      • prev_entry_length
      • encoding
    • 編碼和解碼
  • ziplist基本操作
    • 建立空ziplist
    • 插入節點
    • 擷取指定索引上的節點
    • 删除給定節點
  • ziplist小結

壓縮清單(ziplist)是由 一系列特殊編碼的記憶體塊構成的清單,其是Redis的清單建和哈希鍵的底層實作之一。和整數集合一樣,二者都是為Redis節省記憶體而開發的資料結構。

ziplist可以用來存放字元串或者整數,其存儲資料的特點是:比較小的整數或比較短的字元串。Redis的清單建,哈希鍵,有序集合的底層實作都用到了ziplist。

ziplist結構

Redis的ziplist結構由三大部分組成,其分别是清單頭(ziplist Header),資料節點(Entries)和清單尾(ziplist tail),它們在記憶體上的布局如下:

Redis源碼剖析--壓縮清單ziplistziplist結構ziplist基本操作ziplist小結

頭尾結構

ziplist的頭部包含如下三個資訊:

  • zlbytes:表示壓縮清單占總記憶體的位元組數
  • zltail:表示壓縮清單頭和尾之間的偏移量
  • zllen:表示壓縮清單中節點的數量

ziplist尾部的zlend則表示壓縮清單結束,其值固定為0xFF。Redis提供了一個宏定義來表示ziplist header的大小。

// 總共10個位元組
#define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t))
           

節點結構

資料節點部分由若幹個節點緊密排列構成,每個節點也由三部分構成,分别是:

  • prev_entry_length:編碼前置節點的長度,用于從後往前周遊
  • encoding:編碼屬性
  • contents:負責儲存節點的值

prev_entry_length

ziplist在編碼前置節點長度的時候,采用以下規則:

  • 如果前置節點的長度小于254位元組,那麼采用1個位元組來儲存這個長度值
  • 如果前置節點的長度大于254位元組,則采用5個位元組來儲存這個長度值,其中,第一個位元組被設定為0xFE(254),用于表示該長度大于254位元組,後面四個位元組則用來存儲前置節點的長度值。

encoding

ziplist的節點可以儲存字元串值和整數值,二者的編碼屬性下面一一道來。

  • (一)節點儲存字元串值

如果節點儲存的是字元串值,那麼該編碼大小可能為1位元組,2位元組或5位元組,這與字元串的長度有關。編碼部分前兩位為00,01或者10,分别對應上述的三種大小,後面的位表示長度大小值。

字元串大小 編碼長度 編碼

<= 63 bytes 1 bytes 00bbbbbb <= 16383 bytes 2 bytes 01bbbbbb xxxxxxxx <= 4294967295 bytes 5 bytes 10________ aaaaaaaa bbbbbbbb cccccccc dddddddd

  • (二)節點儲存整數值

如果節點儲存的是整數值,那麼其編碼長度固定為1個位元組,該位元組的前兩位固定為11,用于表示節點儲存的是整數值。這裡也用一個表來說明。

整數類型 編碼長度 編碼

int16_t(2 bytes) 1 11000000 int32_t(4 bytes) 1 11010000 int64_t(8 bytes) 1 11100000 24位有符整數 1 11110000 8位有符整數 1 11111110 0~12 1 1111xxxx

上表中,當編碼為1111xxxx時,表示沒有内容部分,xxxx已經存放了目前的整數值,包括整數0~12,即xxxx可以表示0000~1101。編碼為11111111代表ziplist的結尾。

Redis提供了如下的宏定義用于定義不同的encoding和計算encoding類型。

// 定義不同的encoding
// 字元串
#define ZIP_STR_06B (0 << 6)
#define ZIP_STR_14B (1 << 6)
#define ZIP_STR_32B (2 << 6)
// 整數
#define ZIP_INT_16B (0xc0 | 0<<4) // '|'按位或,1&0=1,0&0=0
#define ZIP_INT_32B (0xc0 | 1<<4)
#define ZIP_INT_64B (0xc0 | 2<<4)

// 計算encoding類型,true或者false
#define ZIP_IS_STR(enc) (((enc) & 0xc0) < 0xc0) //'&'按位與,1&1=1,1&0=0
#define ZIP_IS_INT(enc) (!ZIP_IS_STR(enc) && ((enc) & 0x30) < 0x30)
           

編碼和解碼

在ziplist中,節點的資料結構定義如下:

typedef struct zlentry {
    unsigned int prevrawlensize, prevrawlen; // 前置節點長度和編碼所需長度
    unsigned int lensize, len; // 目前節點長度和編碼所需長度
    unsigned int headersize; // 頭的大小
    unsigned char encoding; // 編碼類型
    unsigned char *p; // 資料部分
} zlentry;
           

很顯然,記憶體上不能直接存放結構體,于是,Redis提供了一系列的編碼和解碼操作函數。這裡以編碼前置節點長度和解碼前置節點長度的源碼為例來講解這一過程:

/****************************** 編碼前置節點長度資訊 *******************************/
// 将長度資訊len寫入起止位址為p的記憶體
static unsigned int zipPrevEncodeLength(unsigned char *p, unsigned int len) {
    if (p == NULL) {
        return (len < ZIP_BIGLEN) ? 1 : sizeof(len)+1;
    } else {
        // 如果長度小于254位元組
        if (len < ZIP_BIGLEN) {
            // 采用1位元組編碼,該位元組存放長度資訊
            p[0] = len;
            return 1;
        } else {
            // 長度大于254位元組
            // 第一個位元組固定為0xFE
            p[0] = ZIP_BIGLEN;
            // 後四個位元組用來存放長度值
            memcpy(p+1,&len,sizeof(len));
            memrev32ifbe(p+1);
            return 1+sizeof(len);
        }
    }
}
/****************************** 解碼前置節點長度資訊 ******************************/
// 解壓prevlensize編碼所需長度
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do {                          \
    // 如果第一個位元組的值小于254位元組
    if ((ptr)[0] < ZIP_BIGLEN) {                                               \
        (prevlensize) = 1;                                                     \
    } else {                                                                   \
        (prevlensize) = 5;                                                     \
    }                                                                          \
} while(0);
// 解碼前置節點長度資訊
#define ZIP_DECODE_PREVLEN(ptr, prevlensize, prevlen) do {                     \
    // 先判斷編碼類型,1位元組或者5位元組
    ZIP_DECODE_PREVLENSIZE(ptr, prevlensize);                                  \
    // 1位元組的話直接讀取長度值
    if ((prevlensize) == 1) {                                                  \
        (prevlen) = (ptr)[0];                                                  \
    // 5位元組的話,讀取後四個位元組的值作為長度值
    } else if ((prevlensize) == 5) {                                           \
        assert(sizeof((prevlensize)) == 4);                                    \
        memcpy(&(prevlen), ((char*)(ptr)) + 1, 4);                             \
        memrev32ifbe(&prevlen);                                                \
    }                                                                          \
} while(0);
           

在編碼解碼目前節點的長度,ziplist提供了zipEncodeLength和ZIP_DECODE_LENGTH這兩個配套函數來完成。這裡就不加贅述了。

ziplist基本操作

建立空ziplist

// 建立一個空的ziplist
unsigned char *ziplistNew(void) {
    // 空ziplist的大小為11個位元組,頭部10位元組,尾部1位元組
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
    // 配置設定記憶體
    unsigned char *zl = zmalloc(bytes);
    // 設定ziplist的屬性
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); // 設定ziplist所占的位元組數,如有必要進行大小端轉換
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); // 設定尾節點相對頭部的偏移量
    ZIPLIST_LENGTH(zl) = 0; // 設定ziplist内的節點數
    // 設定尾部一個位元組位0xFF
    zl[bytes-1] = ZIP_END;
    return zl;
}
           

插入節點

ziplist中插入節點操作由ziplistPush函數完成。

// ziplist插入節點隻能往頭或者尾部插入
// zl: 待插入的ziplist
// s,slen: 待插入節點和其長度
// where: 帶插入的位置,0代表頭部插入,1代表尾部插入
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
    unsigned char *p;
    // 擷取待插入位置的指針
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
    return __ziplistInsert(zl,p,s,slen);
}
           

真正的插入操作由__ziplistInsert完成。

static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen; // 目前長度和插入節點後需要的長度
    unsigned int prevlensize, prevlen = 0; // 前置節點長度和編碼該長度值所需的長度
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; // 為了避免警告,初始化其值
    zlentry tail;
            
<span style="color:#75715e">// 找出待插入節點的前置節點長度
           

// 如果p[0]不指向清單末端,說明清單非空,并且p指向其中一個節點

if (p[0] != ZIP_END) {

// 解碼前置節點p的長度和編碼該長度需要的位元組

ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);

} else {

// 如果p指向清單末端,表示清單為空

unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);

<span style="color:#66d9ef">if</span> (ptail[<span style="color:#ae81ff">0</span>] <span style="color:#f92672">!=</span> ZIP_END) {
        <span style="color:#75715e">// 計算尾節點的長度
           

prevlen = zipRawEntryLength(ptail);

}

}

<span style="color:#75715e">// 判斷是否能夠編碼為整數
           

if (zipTryEncoding(s,slen,&value,&encoding)) {

// 該節點已經編碼為整數,通過encoding來擷取編碼長度

reqlen = zipIntSize(encoding);

} else {

// 采用字元串來編碼該節點

reqlen = slen;

}

// 擷取前置節點的編碼長度

reqlen += zipPrevEncodeLength(NULL,prevlen);

// 擷取目前節點的編碼長度

reqlen += zipEncodeLength(NULL,encoding,slen);

<span style="color:#75715e">// 隻要不是插入到清單的末端,都需要判斷目前p所指向的節點header是否能存放新節點的長度編碼
           

// nextdiff儲存新舊編碼之間的位元組大小差,如果這個值大于0

// 那就說明目前p指向的節點的header進行擴充

nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

<span style="color:#75715e">// 存儲p相對于清單zl的偏移位址
           

offset = p-zl;

// 重新配置設定空間,curlen目前清單的長度

// reqlen 新節點的全部長度

// nextdiff 新節點的後繼節點擴充header的長度

zl = ziplistResize(zl,curlen+reqlen+nextdiff);

// 重新擷取p的值

p = zl+offset;

<span style="color:#75715e">// 非表尾插入,需要重新計算表尾的偏移量
           

if (p[0] != ZIP_END) {

// 移動現有元素,為新元素的插入提供空間

memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

<span style="color:#75715e">// p+reqlen為新節點前置節點移動後的位置,将新節點的長度編碼至前置節點
           

zipPrevEncodeLength(p+reqlen,reqlen);

<span style="color:#75715e">// 更新清單尾相對于表頭的偏移量,将新節點的長度算上
           

ZIPLIST_TAIL_OFFSET(zl) =

intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

<span style="color:#75715e">// 如果新節點後面有多個節點,那麼表尾的偏移量需要算上nextdiff的值
           

zipEntry(p+reqlen, &tail);

if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {

ZIPLIST_TAIL_OFFSET(zl) =

intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);

}

} else {

// 表尾插入,直接計算偏移量

ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);

}

<span style="color:#75715e">// 當nextdiff不為0時,表示需要新節點的後繼節點對頭部進行擴充
           

if (nextdiff != 0) {

offset = p-zl;

// 需要對p所指向的機電header進行擴充更新

// 有可能會引起連鎖更新

zl = __ziplistCascadeUpdate(zl,p+reqlen);

p = zl+offset;

}

<span style="color:#75715e">// 将新節點前置節點的長度寫入新節點的header
           

p += zipPrevEncodeLength(p,prevlen);

// 将新節點的值長度寫入新節點的header

p += zipEncodeLength(p,encoding,slen);

// 寫入節點值

if (ZIP_IS_STR(encoding)) {

memcpy(p,s,slen);

} else {

zipSaveInteger(p,value,encoding);

}

// 更新清單節點計數

ZIPLIST_INCR_LENGTH(zl,1);

return zl;

}

  • 什麼是連鎖更新?

插入節點的時候,有時會引起連鎖更新。我們知道,當新節點插入後,需要改變新節點後繼節點的header資訊中的儲存前置節點長度的部分,如果這個pre_entry_length原存放的長度小于254位元組,也就是隻用了一個位元組,現在新節點的長度大于254位元組,需要用5個位元組儲存,這樣就要對這個pre_entry_length進行擴充。試想一下,如果擴充之後該節點的整體長度大于254位元組了,那麼該節點的後繼節點是不是也需要更新header資訊呢?答案是肯定的,這樣就引發了連鎖更新,導緻新節點後面的一連串節點都需要對header進行擴容。這就是連鎖更新。

連鎖更新的實作由如下函數完成。

// 檢查并修複後續節點的空間問題
static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
    size_t offset, noffset, extra;
    unsigned char *np;
    zlentry cur, next;
            
<span style="color:#66d9ef">while</span> (p[<span style="color:#ae81ff">0</span>] <span style="color:#f92672">!=</span> ZIP_END) {
    <span style="color:#75715e">// 将p所指向節點的資訊儲存到cur結構體中
           

zipEntry(p, &cur);

<span style="color:#75715e">// 目前節點的長度
           

rawlen = cur.headersize + cur.len;

// 編碼目前節點的長度所需的位元組數

rawlensize = zipPrevEncodeLength(NULL,rawlen);

<span style="color:#75715e">// 如果沒有後續節點需要更新了,就退出
           

if (p[rawlen] == ZIP_END) break;

// 去除後續節點的資訊儲存到next結構體中

zipEntry(p+rawlen, &next);

<span style="color:#75715e">// 當後續節點的空間已經足夠了,就直接退出
           

if (next.prevrawlen == rawlen) break;

<span style="color:#75715e">// 當後續節點的空間不足夠,則需要進行擴容操作
           

if (next.prevrawlensize < rawlensize) {

// 記錄p的偏移值

offset = p-zl;

// 記錄需要增加的長度

extra = rawlensize-next.prevrawlensize;

// 擴充zl的大小

zl = ziplistResize(zl,curlen+extra);

// 擷取p相對于新的zl的值

p = zl+offset;

<span style="color:#75715e">//記錄下一個節點的偏移量
           

np = p+rawlen;

noffset = np-zl;

<span style="color:#75715e">// 當 next 節點不是表尾節點時,更新清單到表尾節點的偏移量
           

if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {

ZIPLIST_TAIL_OFFSET(zl) =

intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);

}

<span style="color:#75715e">// 向後移動cur節點之後的資料,為新的header騰出空間
           

memmove(np+rawlensize,

np+next.prevrawlensize,

curlen-noffset-next.prevrawlensize-1);

zipPrevEncodeLength(np,rawlen);

<span style="color:#75715e">// 移動指針,繼續處理下一個節點
           

p += rawlen;

curlen += extra;

} else {

if (next.prevrawlensize > rawlensize) {

// 執行到這裡,next節點編碼前置節點的header空間有5個位元組

// 但是此時隻需要一個位元組

// Redis不提供縮小操作,而是直接将長度強制性寫入五個位元組中

zipPrevEncodeLengthForceLarge(p+rawlen,rawlen);

} else {

// 運作到這裡,說明剛好可以存放

zipPrevEncodeLength(p+rawlen,rawlen);

}

<span style="color:#75715e">// 退出,代表空間足夠,後續空間不需要更改
           

break;

}

}

return zl;

}

擷取指定索引上的節點

// 根據index的值,擷取壓縮清單第index個節點
unsigned char *ziplistIndex(unsigned char *zl, int index) {
    unsigned char *p;
    unsigned int prevlensize, prevlen = 0;
    // index為負,從尾部開始周遊
    if (index < 0) {
        index = (-index)-1;
        // 擷取尾指針
        p = ZIPLIST_ENTRY_TAIL(zl);
        if (p[0] != ZIP_END) {
            // 解碼前置節點長度
            ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
            while (prevlen > 0 && index--) {
                p -= prevlen;
                // 解碼前置節點長度
                ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
            }
        }
    } else {        // index為正,從頭部開始周遊
        p = ZIPLIST_ENTRY_HEAD(zl);
        while (p[0] != ZIP_END && index--) {
            // 擷取目前節點的整體長度,包括pre_entry_length,encoding,contents三部分
            p += zipRawEntryLength(p);
        }
    }
    return (p[0] == ZIP_END || index > 0) ? NULL : p;
}
           

删除給定節點

// 删除給定節點,輸入壓縮清單zl和指向删除節點的指針p
unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p) {
    size_t offset = *p-zl;
    // 調用底層函數__ziplistDelete進行删除操作
    zl = __ziplistDelete(zl,*p,1);
            
<span style="color:#75715e">// 删除操作可能會改變zl,因為會重新配置設定記憶體
           

*p = zl+offset;

return zl;

}

删除操作的底層實作由__ziplistDelete函數實作。

// 删除壓縮清單zl中以p起始的num個節點
static unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
    unsigned int i, totlen, deleted = 0;
    size_t offset;
    int nextdiff = 0;
    zlentry first, tail;
    // 擷取p指向的節點資訊
    zipEntry(p, &first);
    // 計算num個節點占用的記憶體
    for (i = 0; p[0] != ZIP_END && i < num; i++) {
        p += zipRawEntryLength(p);
        deleted++;
    }
            
totlen <span style="color:#f92672">=</span> p<span style="color:#f92672">-</span>first.p;
<span style="color:#66d9ef">if</span> (totlen <span style="color:#f92672">&gt;</span> <span style="color:#ae81ff">0</span>) {
    <span style="color:#66d9ef">if</span> (p[<span style="color:#ae81ff">0</span>] <span style="color:#f92672">!=</span> ZIP_END) {
        <span style="color:#75715e">// 執行到這裡,表示被删除節點後面還存在節點
           

// 判斷最後一個被删除的節點的後繼節點的header中的存放前置節點長度的空間

// 能不能容納第一個被删除節點的前置節點的長度

nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);

p -= nextdiff;

zipPrevEncodeLength(p,first.prevrawlen);

<span style="color:#75715e">// 更新尾部相對于頭部的便宜
           

ZIPLIST_TAIL_OFFSET(zl) =

intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);

<span style="color:#75715e">// 如果被删除節點後面還存在節點,就需要将nextdiff計算在内
           

zipEntry(p, &tail);

if (p[tail.headersize+tail.len] != ZIP_END) {

ZIPLIST_TAIL_OFFSET(zl) =

intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);

}

<span style="color:#75715e">// 将被删除節點後面的記憶體空間移動到删除的節點之後
           

memmove(first.p,p,

intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);

} else {

// 執行到這裡,表示被删除節點後面沒有節點了

ZIPLIST_TAIL_OFFSET(zl) =

intrev32ifbe((first.p-zl)-first.prevrawlen);

}

<span style="color:#75715e">// 縮小記憶體并更新ziplist的長度
           

offset = first.p-zl;

zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);

ZIPLIST_INCR_LENGTH(zl,-deleted);

p = zl+offset;

<span style="color:#75715e">// 如果nextdiff不等于0,說明被删除節點後面節點的header資訊還需要更改
           

if (nextdiff != 0)

// 連鎖更新

zl = __ziplistCascadeUpdate(zl,p);

}

return zl;

}

ziplist小結

ziplist實際上就可以了解為一個存放在連續記憶體空間上的雙向清單,其每一個節點都包含了前置指針和後置指針,隻是經過了特殊的編碼步驟了的。連鎖更新是ziplist的一大特點,為了節省記憶體,ziplist需要存放在一段連續的記憶體空間上,是以必然會引起節點空間不足的問題。但是,連鎖更新發生的機率比較小,不會影響其效率!

到此,Redis的基本資料結構已經全部分析完了,按照預先指定的計劃,下一階段主要剖析Redis各種鍵的實作,這一部分和Redis的互動相關。
  • Redis
  • SourceCode
Redis源碼剖析--壓縮清單ziplistziplist結構ziplist基本操作ziplist小結

About Zeech Nothing to say « Previous

Redis源碼剖析--整數集合Intset

Next »

Redis源碼剖析--對象object

Please enable JavaScript to view the comments powered by Disqus.

</div>
           

繼續閱讀