天天看點

源碼分析 golang ristretto 高性能緩存的設計實作原理

作者:區塊軟體開發

ristretto 是 golang 社群裡排頭部的高性能的資料緩存庫,支援鍵值過期和 LFU 緩存淘汰,還支援最大的記憶體空間限制,根據寫時傳入 cost 計算已使用的記憶體值,通常 cost 為對象的 size,但也可以當個數使用。其他緩存庫多按照條數進行限制,不合理使用時容易造成 OOM 的風險。

其相比 freecache、bigcache 來說,存儲 value 可以為任意值 interface{}。而 freecache、bigcache 内部通過一個大的 ringbuffer 來存放 value,而 value 需要是 []byte 位元組數組,該設計對于 golang gc 很友好,但應用上受限,讀寫的時候需要對進行編碼解碼,這個開銷不低的。

ristretto 在混合讀寫壓測場景下,其吞吐表現要比其他 go cahce 庫要好,當然壓測的 case 通常是偏向自己的,懂的自然懂。

源碼分析 golang ristretto 高性能緩存的設計實作原理

ristretto 使用方法

func main() {
    cache, err := ristretto.NewCache(&ristretto.Config{
        NumCounters: 1e7,     // number of keys to track frequency of (10M).
        MaxCost:     1 << 30, // maximum cost of cache (1GB).
        BufferItems: 64,      // number of keys per Get buffer.
    })
    if err != nil {
        panic(err)
    }

    // 寫資料,cost 為 1.
    cache.Set("key", "value", 1)

    // 由于 ristretto 是異步寫,是以需要等待協程消費處理完.
    cache.Wait()

    // 擷取資料
    value, found := cache.Get("key")
    if !found {
        panic("missing value")
    }
    fmt.Println(value)

    // 删除資料
    cache.Del("key")
}
           

ristretto 項目位址:

https://github.com/dgraph-io/ristretto

ristretto 設計原理

ristretto 的 kv 存儲使用分片map來實作的,而緩存淘汰則使用 tinyLFU 實作。

源碼分析 golang ristretto 高性能緩存的設計實作原理

ristretto store 的結構

源碼分析 golang ristretto 高性能緩存的設計實作原理

ristretto 設計了一個長度為 256 的分片集合,該結構類型為 shardedMap,其每個分片裡有一個存資料的 map 和獨立的鎖,其目的是為了減少鎖競争,提高讀寫性能。增删改查的時候,通過對 key 進行取摸定位資料在哪一個 shard 分片上。

shardedMap 内部使用 expirationMap 來存鍵值過期資料的,所有分片共用一個 expirationMap 對象,ristretto 沒有使用分片來建構過期資料集。

// 最大的分片數量,常量不能改.
const numShards uint64 = 256

type shardedMap struct {
    shards    []*lockedMap
    expiryMap *expirationMap
}

func newShardedMap() *shardedMap {
    sm := &shardedMap{
        shards:    make([]*lockedMap, int(numShards)),
        expiryMap: newExpirationMap(),
    }
    for i := range sm.shards {
        sm.shards[i] = newLockedMap(sm.expiryMap)
    }
    return sm
}

type lockedMap struct {
    sync.RWMutex
    data map[uint64]storeItem // 該 map 的 key 為資料key的哈希值.
    em   *expirationMap
}

type expirationMap struct {
    sync.RWMutex
    buckets map[int64]bucket // 該 map 的 key 過期時間的索引值,計算方法為 `ts/5 + 1`
}

// key 為鍵值 key 的 hash 值,value 為 conflict 值,是另一個 hash 值.
type bucket map[uint64]uint64
           

shardedMap 裡不存儲真正的 key 值,而是存儲資料 key 兩個 hash 值,為什麼使用兩個 hash 值标記對象,為了避免機率上的沖突,是以另使用另一個 xxhash 算法計算 hash 值,該值在 ristretto 定義為 conflict 哈希值。

KeyToHash 是 ristretto 預設的 hash 方法,傳入的 key 不能是指針類型,也不能是 struct。傳回值為兩個 hash 值,其内部的 MemHash 其實是 runtime.memhash 映射的方法,golang 内部的 map 也使用該 hash 算法。xxhash 則為社群中較為火熱的 hash 庫。

func KeyToHash(key interface{}) (uint64, uint64) {
    if key == nil {
        return 0, 0
    }
    switch k := key.(type) {
    case uint64:
        return k, 0
    case string:
        return MemHashString(k), xxhash.Sum64String(k)
    case []byte:
        return MemHash(k), xxhash.Sum64(k)
    case byte:
        return uint64(k), 0
    case int:
        return uint64(k), 0
    case int32:
        return uint64(k), 0
    case uint32:
        return uint64(k), 0
    case int64:
        return uint64(k), 0
    default:
        panic("Key type not supported")
    }
}
           

ristretto 緩存驅逐政策

社群中常常使用 LRU 和 LFU 算法來實作緩存淘汰驅逐,ristretto 則使用 LFU 算法,但由于 LFU 标準實作開銷過大,則使用 TinyLFU 實作資料淘汰。

TinyLFU 裡主要使用 count-min sketch 統計算法粗略記錄各個 key 的緩存命中計數。該 count-min Sketch 算法通常用在不要求精确計數,又想節省記憶體的場景,雖然拿到的計數缺失一定的精準度,但确實節省了記憶體。

但就 ristretto 緩存場景來說,記錄 100w 個 key 的計數也才占用 11MB 左右,公式是 ( 100 * 10000 ) * (8 + 4) / 1024/ 1024 = 11MB,這裡的 8 為 key hash 的大小,而 4 為 hit 的類型 uint32 大小。

count-min sketch 統計算法實作

源碼分析 golang ristretto 高性能緩存的設計實作原理

count-min sketch 中的 increment 和 estimate 方法實作原理很簡單,流程如下。

  1. 標明 d 個 hash 函數,開一個 dm 的二維整數數組作為哈希表 ;
  2. 對于每個元素,分别使用d個hash函數計算相應的哈希值,并對m取餘,然後在對應的位置上增1,二維數組中的每個整數稱為sketch ;
  3. 要查詢某個元素的頻率時,隻需要取出d個sketch, 傳回最小的那一個。

ristretto 對 count-min sketch 并不是一直累計累加的,在 policy 累計對象超過 resetAt 時,則會對 cm-sketch 統計對象進行重置。不然一直遞增,緩存淘汰場景下,對于新對象很不友好。

policy 資料結構的設計

源碼分析 golang ristretto 高性能緩存的設計實作原理
type defaultPolicy struct {
    sync.Mutex
    admit    *tinyLFU // 實作了 tinyLFU 淘汰算法.
    evict    *sampledLFU // 記錄最大 cost,已使用 cost,及 key 對應的 cost 值.

    // 異步的維護緩存淘汰政策.
    itemsCh  chan []uint64
    stop     chan struct{}
    isClosed bool
    metrics  *Metrics
}

type sampledLFU struct {
    maxCost  int64 // 最大 cost 門檻值
    used     int64 // 已使用 cost 值
    metrics  *Metrics // 名額
    keyCosts map[uint64]int64 // key 為資料的 key hash,value 為對應的 cost 值.
}

type tinyLFU struct {
    freq    *cmSketch // count min sketch 
    door    *z.Bloom  // bloomfilter
    incrs   int64
    resetAt int64
}
           

policy tinylfu

policy 在執行個體化是除了執行個體化 defaultPolicy 對象,還會啟動一個協程運作 processItems 方法,該方法用來監聽 cache 傳遞的增删改事件,在 tinylfu count-min-sketch 裡做記錄,超過一定門檻值後會重置,其目的為了避免老的 lfu freq 越來越大,不容易被淘汰掉。

func newDefaultPolicy(numCounters, maxCost int64) *defaultPolicy {
    p := &defaultPolicy{
        admit:   newTinyLFU(numCounters),
        evict:   newSampledLFU(maxCost),
        itemsCh: make(chan []uint64, 3),
        stop:    make(chan struct{}),
    }
    go p.processItems()
    return p
}

func (p *defaultPolicy) processItems() {
    for {
        select {
        case items := <-p.itemsCh:
            p.Lock()
            p.admit.Push(items)
            p.Unlock()
        case <-p.stop:
            return
        }
    }
}

func (p *tinyLFU) Push(keys []uint64) {
    for _, key := range keys {
        p.Increment(key)
    }
}

func (p *tinyLFU) Increment(key uint64) {
    // Flip doorkeeper bit if not already done.
    if added := p.door.AddIfNotHas(key); !added {
        // Increment count-min counter if doorkeeper bit is already set.
        p.freq.Increment(key)
    }
    p.incrs++
    if p.incrs >= p.resetAt {
        p.reset()
    }
}
           

寫時緩存淘汰驅逐

ristretto 沒有實作背景協程主動淘汰驅逐的邏輯,而是采用了寫時淘汰驅逐,每次寫資料時,判斷是否有足夠的 cost 插入資料。如果不足,則進行驅逐。采樣驅逐的方法有點類似 redis 的方案,每次從 simpleLFU 裡擷取 5 個 key,然後周遊計算這 5 個 key 的命中率,淘汰掉命中率最低的 key,然後再判斷是否有空閑 cost 寫入,不足繼續采樣淘汰,知道滿足目前 key 的寫入。

命中率是通過 tinyLFU 來計算的,由于該實作是通過 count-min sketch 算法實作,是以計算出的緩存命中數會産生些偏差。

func (p *defaultPolicy) Add(key uint64, cost int64) ([]*Item, bool) {
    p.Lock()
    defer p.Unlock()

    // 添加的 cost 不能超過總 cost 門檻值.
    if cost > p.evict.getMaxCost() {
        return nil, false
    }

    // 如果 simpleLfu 裡有 key 值,則更新 cost 值.
    if has := p.evict.updateIfHas(key, cost); has {
        return nil, false
    }

    // 如果 key 在 simpleLFU 不存在,則走下面的邏輯.
    // 計算減去 cost 後還剩多少可用的 cost.
    room := p.evict.roomLeft(cost)
    if room >= 0 {
        // 如果足夠 cost 開銷,則更新 lfu 内名額.
        p.evict.add(key, cost)
        // 這裡的 nil 代表,沒有需要淘汰的資料,true 代表可以插入.
        return nil, true
    }

    incHits := p.admit.Estimate(key)

    // 建構一個可以放 5 條采樣資料的集合
    sample := make([]*policyPair, 0, lfuSample)

    // 需要删除的 key
    victims := make([]*Item, 0)

    // 嘗試淘汰資料,直到有空餘的 cost.
    for ; room < 0; room = p.evict.roomLeft(cost) {
        // 采樣擷取 5 條 key,并填滿 sample 數組.
        sample = p.evict.fillSample(sample)

        // 在取樣集合裡擷取中最少被使用的 key,也就是緩存命中率最低的 key.
        minKey, minHits, minId, minCost := uint64(0), int64(math.MaxInt64), 0, int64(0)
        for i, pair := range sample {
            // 擷取該 key 的 hits 命中率
            if hits := p.admit.Estimate(pair.key); hits < minHits {
                minKey, minHits, minId, minCost = pair.key, hits, i, pair.cost
            }
        }

        if incHits < minHits {
            p.metrics.add(rejectSets, key, 1)
            return victims, false
        }

        // 在 simpleLFU 中删除 key,且減去 cost.
        p.evict.del(minKey)

        sample[minId] = sample[len(sample)-1]
        sample = sample[:len(sample)-1]

        // 把需要淘汰掉的 key 添加到 victims 集合裡.
        victims = append(victims, &Item{
            Key:      minKey,
            Conflict: 0,
            Cost:     minCost,
        })
    }

    // 在 simpleLFU 記錄 key 的 cost,且累加 cost.
    p.evict.add(key, cost)

    // 名額
    p.metrics.add(costAdd, key, uint64(cost))
    return victims, true
}
           

ristretto 讀寫流程源碼分析

Set 寫流程

// 帶 cost 參數來寫入 kv.
func (c *Cache) Set(key, value interface{}, cost int64) bool {
    return c.SetWithTTL(key, value, cost, 0*time.Second)
}

// 帶 cost 和過期參數來寫入 kv.
func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool {
    // 判空
    if c == nil || c.isClosed || key == nil {
        return false
    }

    // 判斷過期時間
    var expiration time.Time
    switch {
    case ttl == 0:
        break // 其他不加也行
    case ttl < 0:
        return false
    default:
        // 目前時間加上 ttl 過期時長為過期時間
        expiration = time.Now().Add(ttl)
    }

    // 使用不同的 hash 算法計算出 key 的兩個 hash 值,一個用來做 key 的哈希值,另一個用來用來做判斷沖突的哈希值.
    keyHash, conflictHash := c.keyToHash(key)

    // 建構 item 對象.
    i := &Item{
        flag:       itemNew, // 預設為新增
        Key:        keyHash,
        Conflict:   conflictHash,
        Value:      value,
        Cost:       cost,
        Expiration: expiration,
    }

    // 判斷是有存在值,存在則更新資料及 expire 資訊,另外把 flag 設為更新.
    if prev, ok := c.store.Update(i); ok {
        c.onExit(prev)
        i.flag = itemUpdate // 改為 update 更新
    }

    // 嘗試把 item 發給 policy 
    select {
    case c.setBuf <- i:
        return true
    default:
        if i.flag == itemUpdate {
            return true
        }
        // 統計
        c.Metrics.add(dropSets, keyHash, 1)
        return false
    }
}
           

store update 方法

func (sm *shardedMap) Update(newItem *Item) (interface{}, bool) {
    // 通過 key hash 取摸找到 shard,并執行 shard 的 update 方法.
    return sm.shards[newItem.Key%numShards].Update(newItem)
}

func (m *lockedMap) Update(newItem *Item) (interface{}, bool) {
    m.Lock()
    // 判斷是否已經存在 ?
    item, ok := m.data[newItem.Key]
    if !ok {
        m.Unlock()
        // 無,直接傳回
        return nil, false
    }
    // 有,則判斷 hash conflict hash 是否一緻,不一緻則不為同一個值,直接跳出.
    if newItem.Conflict != 0 && (newItem.Conflict != item.conflict) {
        m.Unlock()
        return nil, false
    }

    // 在 shard expirationMap 裡更新過期資訊.
    m.em.update(newItem.Key, newItem.Conflict, item.expiration, newItem.Expiration)

    // 把資料加到 shard data 裡.
    m.data[newItem.Key] = storeItem{
        key:        newItem.Key,
        conflict:   newItem.Conflict,
        value:      newItem.Value,
        expiration: newItem.Expiration,
    }

    m.Unlock()

    // 傳回資料和true, 表明更新成功.
    return item.value, true
}
           

ProcessItems

源碼分析 golang ristretto 高性能緩存的設計實作原理

ristretto 執行個體化的時候會開啟一個協程運作 processItems,processItems 方法可以處理 item 的增删改,還可以周期性進行 GC 垃圾回收。

// processItems is ran by goroutines processing the Set buffer.
func (c *Cache) processItems() {
    startTs := make(map[uint64]time.Time)
    numToKeep := 100000 // TODO: Make this configurable via options.

    trackAdmission := func(key uint64) {
        if c.Metrics == nil {
            return
        }
        startTs[key] = time.Now()
        if len(startTs) > numToKeep {
            for k := range startTs {
                if len(startTs) <= numToKeep {
                    break
                }
                delete(startTs, k)
            }
        }
    }
    onEvict := func(i *Item) {
        if ts, has := startTs[i.Key]; has {
            c.Metrics.trackEviction(int64(time.Since(ts) / time.Second))
            delete(startTs, i.Key)
        }
        if c.onEvict != nil {
            c.onEvict(i)
        }
    }

    for {
        select {
        case i := <-c.setBuf:
            if i.wg != nil {
                i.wg.Done()
                continue
            }

            // 如果是添加和更新,則需要計算 item 的開銷
            if i.Cost == 0 && c.cost != nil && i.flag != itemDelete {
                i.Cost = c.cost(i.Value)
            }

            // 判斷是否需要忽略内部資料結構的開銷.
            if !c.ignoreInternalCost {
                // 累加 Item 自身的位元組開銷
                i.Cost += itemSize
            }

            // 根據 flag 類型執行增删改邏輯.
            switch i.flag {
            case itemNew:
                // 當超過門檻值時,傳回需要删除的資料 victims,added 為是否添加成功.
                victims, added := c.policy.Add(i.Key, i.Cost)
                if added {
                    // 在 store 裡添加 item 對象.
                    c.store.Set(i)
                    // 統計資訊
                    c.Metrics.add(keyAdd, i.Key, 1)
                    trackAdmission(i.Key)
                } else {
                    // 回調傳入的 onreject 方法,且執行 onExit 方法.
                    c.onReject(i)
                }

                // 删除相關記錄
                for _, victim := range victims {
                    victim.Conflict, victim.Value = c.store.Del(victim.Key, 0)

                    // 被緩存政策驅逐需要回調 onEvict 方法.
                    onEvict(victim)
                }

            case itemUpdate:
                // 由于在 cache.Set 方法裡處理了 Update 的資料,這裡隻做 policy 通知即可.
                c.policy.Update(i.Key, i.Cost)

            case itemDelete:
                // 在 policy裡 删除
                c.policy.Del(i.Key) // Deals with metrics updates.

                // 在 store 删除該記錄
                _, val := c.store.Del(i.Key, i.Conflict)

                // 執行回調方法
                c.onExit(val)
            }
        case <-c.cleanupTicker.C:
            // 進行 GC 垃圾回收,删除過期的資料.
            c.store.Cleanup(c.policy, onEvict)
        case <-c.stop:
            return
        }
    }
}
           

store Set

先從 sharedMap 裡找到相關的 shard,然後在 shard 判斷是否以前有值, 如果有前值,且 key conflict hash 值不一緻,則直接跳出,否則更新 expireMap,然後把 item 接到 shard 的 data 裡。

代碼位置: github/ristretto/store.go

// 在 shardedMap 裡寫入.
func (sm *shardedMap) Set(i *Item) {
    if i == nil {
        return
    }

    // 對 key hash 取摸計算出 shard,再對 shard 調用 set 來寫入.
    sm.shards[i.Key%numShards].Set(i)
}

// 在 shard 裡寫入.
func (m *lockedMap) Set(i *Item) {
    if i == nil {
        // If the item is nil make this Set a no-op.
        return
    }

    // 執行 shard 内部的鎖
    m.Lock()
    defer m.Unlock()

    // 嘗試擷取先前資料
    item, ok := m.data[i.Key]

    if ok {
        // 如果有前值,且 key conflict hash 值不一緻,則直接跳出.
        if i.Conflict != 0 && (i.Conflict != item.conflict) {
            return
        }
        // 如果有舊值,則在 expire map 裡更新過期時間.
        m.em.update(i.Key, i.Conflict, item.expiration, i.Expiration)
    } else {
        // 如果沒有,在 expire map 裡添加過期資訊.
        m.em.add(i.Key, i.Conflict, i.Expiration)
    }

    // 把 item 裡加到 data 裡.
    m.data[i.Key] = storeItem{
        key:        i.Key,  // key hash1
        conflict:   i.Conflict, // key hash2
        value:      i.Value,
        expiration: i.Expiration,
    }
}
           

add 在 expirationMap 裡添加過期資料,update 則是在 expirationMap 删除以前的過期資料,添加新的過期資料。

代碼位置: github/ristretto/ttl.go

var (
    bucketDurationSecs = int64(5)
)

// 每 5 秒為一個 bucket.
func storageBucket(t time.Time) int64 {
    return (t.Unix() / bucketDurationSecs) + 1
}

func (m *expirationMap) add(key, conflict uint64, expiration time.Time) {
    // 判空
    if m == nil {
        return
    }

    // 過期為空, 則直接跳出.
    if expiration.IsZero() {
        return
    }

    // 通過過期時長擷取 bucket 索引.
    bucketNum := storageBucket(expiration)
    m.Lock()
    defer m.Unlock()

    // 把 key 和 conflict 加到 bucket 裡.
    b, ok := m.buckets[bucketNum]
    if !ok {
        b = make(bucket)
        m.buckets[bucketNum] = b
    }
    b[key] = conflict
}

func (m *expirationMap) update(key, conflict uint64, oldExpTime, newExpTime time.Time) {
    if m == nil {
        return
    }

    m.Lock()
    defer m.Unlock()

    // 删舊
    oldBucketNum := storageBucket(oldExpTime)
    oldBucket, ok := m.buckets[oldBucketNum]
    if ok {
        delete(oldBucket, key)
    }

    // 加新
    newBucketNum := storageBucket(newExpTime)
    newBucket, ok := m.buckets[newBucketNum]
    if !ok {
        newBucket = make(bucket)
        m.buckets[newBucketNum] = newBucket
    }
    newBucket[key] = conflict
}
           

Get 讀流程

Get 用來讀取資料, 其流程如下。

func (c *Cache) Get(key interface{}) (interface{}, bool) {
    // 基本判空
    if c == nil || c.isClosed || key == nil {
        return nil, false
    }

    // 使用不同的 hash 算法計算出 key 的兩個 hash 值,一個用來做 key 的哈希值,另一個用來用來做判斷沖突的哈希值.
    keyHash, conflictHash := c.keyToHash(key)

    // 調用 ringBuffer 的 push 接口,把 key hash 放進去.
    // 其目的用來實作 lfu 政策.
    c.getBuf.Push(keyHash)

    // 從 store 裡擷取 value
    value, ok := c.store.Get(keyHash, conflictHash)
    if ok {
        // 如果有值,則在 hit 統計裡加一.
        c.Metrics.add(hit, keyHash, 1)
    } else {
        // 如果沒有值,則在 miss 統計裡加一.
        c.Metrics.add(miss, keyHash, 1)
    }

    // 傳回
    return value, ok
}
           

從 shardedMap 裡擷取資料, 其内部流程是先取摸計算出 key 應該在哪個 shard,然後再從 shard 裡擷取 kv,然後對比 key conflict hash 值,不一緻則說明不是同一條資料,接着判斷 kv 是否過期,如過期也傳回 false,不過期則傳回資料。

// 從 shardedMap 裡擷取資料.
func (sm *shardedMap) Get(key, conflict uint64) (interface{}, bool) {
    return sm.shards[key%numShards].get(key, conflict)
}

// 從 shard 裡擷取資料.
func (m *lockedMap) get(key, conflict uint64) (interface{}, bool) {
    m.RLock()
    item, ok := m.data[key]
    m.RUnlock()
    if !ok {
        return nil, false
    }
    if conflict != 0 && (conflict != item.conflict) {
        return nil, false
    }

    // 判斷是否逾時,當 kv 過期逾時,這裡不主動删除資料,而直接傳回無資料的錯誤.
    // 這裡的删除等待定時器的 gc 垃圾回收處理.
    if !item.expiration.IsZero() && time.Now().After(item.expiration) {
        return nil, false
    }

    // 傳回資料
    return item.value, true
}
           

Del 删除資料

Del 用來删除資料, 其流程如下。

// Del deletes the key-value item from the cache if it exists.
func (c *Cache) Del(key interface{}) {
    // 判空
    if c == nil || c.isClosed || key == nil {
        return
    }
    // 通過不同的 hash 算法擷取 key 的兩個 hash 值.
    keyHash, conflictHash := c.keyToHash(key)

    // 在 store 裡删除資料
    _, prev := c.store.Del(keyHash, conflictHash)

    // 回調 exit 方法
    c.onExit(prev)

    // 像 setBuf 裡傳遞 item 對象,flag 标記為 delete.
    c.setBuf <- &Item{
        flag:     itemDelete,
        Key:      keyHash,
        Conflict: conflictHash,
    }
}
           

從 shardedMap 裡删除資料,其流程如下。

  1. 通過 key hash 擷取 item ;
  2. 判斷 hash conflict hash 值是否一緻,不一緻則直接 return ;
  3. 如果 item 配置了過期時間,則在 expirationMap 裡删除相關記錄 ;
  4. 在 map 裡删除 item.
// 從 shardedMap 裡删除資料.
func (sm *shardedMap) Del(key, conflict uint64) (uint64, interface{}) {
    return sm.shards[key%numShards].Del(key, conflict)
}

// 從 shard 裡删除資料.
func (m *lockedMap) Del(key, conflict uint64) (uint64, interface{}) {
    m.Lock()

    // 通過 key hash 擷取 item
    item, ok := m.data[key]
    if !ok {
        // 沒有值,無需删除,傳回
        m.Unlock()
        return 0, nil
    }

    // 判斷 hash conflict hash 值是否一緻.
    if conflict != 0 && (conflict != item.conflict) {
        m.Unlock()
        return 0, nil
    }

    // 如果 item 配置了過期時間,則在 expirationMap 裡删除.
    if !item.expiration.IsZero() {
        m.em.del(key, item.expiration)
    }

    // 删除 item.
    delete(m.data, key)
    m.Unlock()

    // 傳回 hash 沖突值和 value.
    return item.conflict, item.value
}
           

expire gc 過期垃圾回收的設計原理

源碼分析 golang ristretto 高性能緩存的設計實作原理

ristretto 初始化會啟動一個協程來執行 processItems, 該方法裡不僅監聽 setBuf 管道來對資料增删改,且還會監聽垃圾回收定時器來完成垃圾回收。 每 2.5 秒調用 Cleanup 方法執行過期鍵的垃圾回收。

var (
    bucketDurationSecs = int64(5)
    cleanupTicker = time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2),
)

func (c *Cache) processItems() {
    startTs := make(map[uint64]time.Time)
    numToKeep := 100000

    onEvict := func(i *Item) {
        if ts, has := startTs[i.Key]; has {
            c.Metrics.trackEviction(int64(time.Since(ts) / time.Second))
            delete(startTs, i.Key)
        }
        if c.onEvict != nil {
            c.onEvict(i)
        }
    }

    for {
        select {
        case i := <-c.setBuf:
            // ...
        case <-c.cleanupTicker.C:
            // 執行過期資料垃圾回收.
            c.store.Cleanup(c.policy, onEvict)
        }
    }
}
           

Cleanup 是過期鍵垃圾回收的核心方法,其内部流程如下。

先擷取目前時間點以前的 bucket,然後周遊該 bucket 内的所有 keys,判斷是否過期,如過期則在 store 裡删除 item,沒過期則跳過。

func (sm *shardedMap) Cleanup(policy policy, onEvict itemCallback) {
    sm.expiryMap.cleanup(sm, policy, onEvict)
}

func cleanupBucket(t time.Time) int64 {
    return storageBucket(t) - 1
}

func (m *expirationMap) cleanup(store store, policy policy, onEvict itemCallback) {
    // 判空
    if m == nil {
        return
    }

    m.Lock()
    now := time.Now()
    // 取出目前時間對應的上一個 bucket 時間點.
    bucketNum := cleanupBucket(now)

    // 取出該 bucket 的 key hash 和 conflict hash.
    keys := m.buckets[bucketNum]

    // 在 buckets 集合裡删該 bucket.
    delete(m.buckets, bucketNum)

    // 放鎖,因為後面的路基是慢邏輯.
    m.Unlock()

    // 周遊上一個 bucket 的 key hash 集合, 嘗試删除過期的鍵值.
    for key, conflict := range keys {
        // 沒過期,跳過
        if store.Expiration(key).After(now) {
            continue
        }

        // 如過期擷取 key 的 cost 開銷值.
        cost := policy.Cost(key)

        // 在 policy 裡删除 key.
        policy.Del(key)

        // store 裡删除該值.
        _, value := store.Del(key, conflict)

        // 執行回調方法.
        if onEvict != nil {
            onEvict(&Item{Key: key,
                Conflict: conflict,
                Value:    value,
                Cost:     cost,
            })
        }
    }
}
           

總結

簡單說, ristretto 的 kv 存儲使用分片map來實作的,而緩存淘汰則使用 tinyLFU 實作。

from https://xiaorui.cc/archives/7383