ristretto 是 golang 社群裡排頭部的高性能的資料緩存庫,支援鍵值過期和 LFU 緩存淘汰,還支援最大的記憶體空間限制,根據寫時傳入 cost 計算已使用的記憶體值,通常 cost 為對象的 size,但也可以當個數使用。其他緩存庫多按照條數進行限制,不合理使用時容易造成 OOM 的風險。
其相比 freecache、bigcache 來說,存儲 value 可以為任意值 interface{}。而 freecache、bigcache 内部通過一個大的 ringbuffer 來存放 value,而 value 需要是 []byte 位元組數組,該設計對于 golang gc 很友好,但應用上受限,讀寫的時候需要對進行編碼解碼,這個開銷不低的。
ristretto 在混合讀寫壓測場景下,其吞吐表現要比其他 go cahce 庫要好,當然壓測的 case 通常是偏向自己的,懂的自然懂。
ristretto 使用方法
func main() {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e7, // number of keys to track frequency of (10M).
MaxCost: 1 << 30, // maximum cost of cache (1GB).
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
panic(err)
}
// 寫資料,cost 為 1.
cache.Set("key", "value", 1)
// 由于 ristretto 是異步寫,是以需要等待協程消費處理完.
cache.Wait()
// 擷取資料
value, found := cache.Get("key")
if !found {
panic("missing value")
}
fmt.Println(value)
// 删除資料
cache.Del("key")
}
ristretto 項目位址:
https://github.com/dgraph-io/ristretto
ristretto 設計原理
ristretto 的 kv 存儲使用分片map來實作的,而緩存淘汰則使用 tinyLFU 實作。
ristretto store 的結構
ristretto 設計了一個長度為 256 的分片集合,該結構類型為 shardedMap,其每個分片裡有一個存資料的 map 和獨立的鎖,其目的是為了減少鎖競争,提高讀寫性能。增删改查的時候,通過對 key 進行取摸定位資料在哪一個 shard 分片上。
shardedMap 内部使用 expirationMap 來存鍵值過期資料的,所有分片共用一個 expirationMap 對象,ristretto 沒有使用分片來建構過期資料集。
// 最大的分片數量,常量不能改.
const numShards uint64 = 256
type shardedMap struct {
shards []*lockedMap
expiryMap *expirationMap
}
func newShardedMap() *shardedMap {
sm := &shardedMap{
shards: make([]*lockedMap, int(numShards)),
expiryMap: newExpirationMap(),
}
for i := range sm.shards {
sm.shards[i] = newLockedMap(sm.expiryMap)
}
return sm
}
type lockedMap struct {
sync.RWMutex
data map[uint64]storeItem // 該 map 的 key 為資料key的哈希值.
em *expirationMap
}
type expirationMap struct {
sync.RWMutex
buckets map[int64]bucket // 該 map 的 key 過期時間的索引值,計算方法為 `ts/5 + 1`
}
// key 為鍵值 key 的 hash 值,value 為 conflict 值,是另一個 hash 值.
type bucket map[uint64]uint64
shardedMap 裡不存儲真正的 key 值,而是存儲資料 key 兩個 hash 值,為什麼使用兩個 hash 值标記對象,為了避免機率上的沖突,是以另使用另一個 xxhash 算法計算 hash 值,該值在 ristretto 定義為 conflict 哈希值。
KeyToHash 是 ristretto 預設的 hash 方法,傳入的 key 不能是指針類型,也不能是 struct。傳回值為兩個 hash 值,其内部的 MemHash 其實是 runtime.memhash 映射的方法,golang 内部的 map 也使用該 hash 算法。xxhash 則為社群中較為火熱的 hash 庫。
func KeyToHash(key interface{}) (uint64, uint64) {
if key == nil {
return 0, 0
}
switch k := key.(type) {
case uint64:
return k, 0
case string:
return MemHashString(k), xxhash.Sum64String(k)
case []byte:
return MemHash(k), xxhash.Sum64(k)
case byte:
return uint64(k), 0
case int:
return uint64(k), 0
case int32:
return uint64(k), 0
case uint32:
return uint64(k), 0
case int64:
return uint64(k), 0
default:
panic("Key type not supported")
}
}
ristretto 緩存驅逐政策
社群中常常使用 LRU 和 LFU 算法來實作緩存淘汰驅逐,ristretto 則使用 LFU 算法,但由于 LFU 标準實作開銷過大,則使用 TinyLFU 實作資料淘汰。
TinyLFU 裡主要使用 count-min sketch 統計算法粗略記錄各個 key 的緩存命中計數。該 count-min Sketch 算法通常用在不要求精确計數,又想節省記憶體的場景,雖然拿到的計數缺失一定的精準度,但确實節省了記憶體。
但就 ristretto 緩存場景來說,記錄 100w 個 key 的計數也才占用 11MB 左右,公式是 ( 100 * 10000 ) * (8 + 4) / 1024/ 1024 = 11MB,這裡的 8 為 key hash 的大小,而 4 為 hit 的類型 uint32 大小。
count-min sketch 統計算法實作
count-min sketch 中的 increment 和 estimate 方法實作原理很簡單,流程如下。
- 標明 d 個 hash 函數,開一個 dm 的二維整數數組作為哈希表 ;
- 對于每個元素,分别使用d個hash函數計算相應的哈希值,并對m取餘,然後在對應的位置上增1,二維數組中的每個整數稱為sketch ;
- 要查詢某個元素的頻率時,隻需要取出d個sketch, 傳回最小的那一個。
ristretto 對 count-min sketch 并不是一直累計累加的,在 policy 累計對象超過 resetAt 時,則會對 cm-sketch 統計對象進行重置。不然一直遞增,緩存淘汰場景下,對于新對象很不友好。
policy 資料結構的設計
type defaultPolicy struct {
sync.Mutex
admit *tinyLFU // 實作了 tinyLFU 淘汰算法.
evict *sampledLFU // 記錄最大 cost,已使用 cost,及 key 對應的 cost 值.
// 異步的維護緩存淘汰政策.
itemsCh chan []uint64
stop chan struct{}
isClosed bool
metrics *Metrics
}
type sampledLFU struct {
maxCost int64 // 最大 cost 門檻值
used int64 // 已使用 cost 值
metrics *Metrics // 名額
keyCosts map[uint64]int64 // key 為資料的 key hash,value 為對應的 cost 值.
}
type tinyLFU struct {
freq *cmSketch // count min sketch
door *z.Bloom // bloomfilter
incrs int64
resetAt int64
}
policy tinylfu
policy 在執行個體化是除了執行個體化 defaultPolicy 對象,還會啟動一個協程運作 processItems 方法,該方法用來監聽 cache 傳遞的增删改事件,在 tinylfu count-min-sketch 裡做記錄,超過一定門檻值後會重置,其目的為了避免老的 lfu freq 越來越大,不容易被淘汰掉。
func newDefaultPolicy(numCounters, maxCost int64) *defaultPolicy {
p := &defaultPolicy{
admit: newTinyLFU(numCounters),
evict: newSampledLFU(maxCost),
itemsCh: make(chan []uint64, 3),
stop: make(chan struct{}),
}
go p.processItems()
return p
}
func (p *defaultPolicy) processItems() {
for {
select {
case items := <-p.itemsCh:
p.Lock()
p.admit.Push(items)
p.Unlock()
case <-p.stop:
return
}
}
}
func (p *tinyLFU) Push(keys []uint64) {
for _, key := range keys {
p.Increment(key)
}
}
func (p *tinyLFU) Increment(key uint64) {
// Flip doorkeeper bit if not already done.
if added := p.door.AddIfNotHas(key); !added {
// Increment count-min counter if doorkeeper bit is already set.
p.freq.Increment(key)
}
p.incrs++
if p.incrs >= p.resetAt {
p.reset()
}
}
寫時緩存淘汰驅逐
ristretto 沒有實作背景協程主動淘汰驅逐的邏輯,而是采用了寫時淘汰驅逐,每次寫資料時,判斷是否有足夠的 cost 插入資料。如果不足,則進行驅逐。采樣驅逐的方法有點類似 redis 的方案,每次從 simpleLFU 裡擷取 5 個 key,然後周遊計算這 5 個 key 的命中率,淘汰掉命中率最低的 key,然後再判斷是否有空閑 cost 寫入,不足繼續采樣淘汰,知道滿足目前 key 的寫入。
命中率是通過 tinyLFU 來計算的,由于該實作是通過 count-min sketch 算法實作,是以計算出的緩存命中數會産生些偏差。
func (p *defaultPolicy) Add(key uint64, cost int64) ([]*Item, bool) {
p.Lock()
defer p.Unlock()
// 添加的 cost 不能超過總 cost 門檻值.
if cost > p.evict.getMaxCost() {
return nil, false
}
// 如果 simpleLfu 裡有 key 值,則更新 cost 值.
if has := p.evict.updateIfHas(key, cost); has {
return nil, false
}
// 如果 key 在 simpleLFU 不存在,則走下面的邏輯.
// 計算減去 cost 後還剩多少可用的 cost.
room := p.evict.roomLeft(cost)
if room >= 0 {
// 如果足夠 cost 開銷,則更新 lfu 内名額.
p.evict.add(key, cost)
// 這裡的 nil 代表,沒有需要淘汰的資料,true 代表可以插入.
return nil, true
}
incHits := p.admit.Estimate(key)
// 建構一個可以放 5 條采樣資料的集合
sample := make([]*policyPair, 0, lfuSample)
// 需要删除的 key
victims := make([]*Item, 0)
// 嘗試淘汰資料,直到有空餘的 cost.
for ; room < 0; room = p.evict.roomLeft(cost) {
// 采樣擷取 5 條 key,并填滿 sample 數組.
sample = p.evict.fillSample(sample)
// 在取樣集合裡擷取中最少被使用的 key,也就是緩存命中率最低的 key.
minKey, minHits, minId, minCost := uint64(0), int64(math.MaxInt64), 0, int64(0)
for i, pair := range sample {
// 擷取該 key 的 hits 命中率
if hits := p.admit.Estimate(pair.key); hits < minHits {
minKey, minHits, minId, minCost = pair.key, hits, i, pair.cost
}
}
if incHits < minHits {
p.metrics.add(rejectSets, key, 1)
return victims, false
}
// 在 simpleLFU 中删除 key,且減去 cost.
p.evict.del(minKey)
sample[minId] = sample[len(sample)-1]
sample = sample[:len(sample)-1]
// 把需要淘汰掉的 key 添加到 victims 集合裡.
victims = append(victims, &Item{
Key: minKey,
Conflict: 0,
Cost: minCost,
})
}
// 在 simpleLFU 記錄 key 的 cost,且累加 cost.
p.evict.add(key, cost)
// 名額
p.metrics.add(costAdd, key, uint64(cost))
return victims, true
}
ristretto 讀寫流程源碼分析
Set 寫流程
// 帶 cost 參數來寫入 kv.
func (c *Cache) Set(key, value interface{}, cost int64) bool {
return c.SetWithTTL(key, value, cost, 0*time.Second)
}
// 帶 cost 和過期參數來寫入 kv.
func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool {
// 判空
if c == nil || c.isClosed || key == nil {
return false
}
// 判斷過期時間
var expiration time.Time
switch {
case ttl == 0:
break // 其他不加也行
case ttl < 0:
return false
default:
// 目前時間加上 ttl 過期時長為過期時間
expiration = time.Now().Add(ttl)
}
// 使用不同的 hash 算法計算出 key 的兩個 hash 值,一個用來做 key 的哈希值,另一個用來用來做判斷沖突的哈希值.
keyHash, conflictHash := c.keyToHash(key)
// 建構 item 對象.
i := &Item{
flag: itemNew, // 預設為新增
Key: keyHash,
Conflict: conflictHash,
Value: value,
Cost: cost,
Expiration: expiration,
}
// 判斷是有存在值,存在則更新資料及 expire 資訊,另外把 flag 設為更新.
if prev, ok := c.store.Update(i); ok {
c.onExit(prev)
i.flag = itemUpdate // 改為 update 更新
}
// 嘗試把 item 發給 policy
select {
case c.setBuf <- i:
return true
default:
if i.flag == itemUpdate {
return true
}
// 統計
c.Metrics.add(dropSets, keyHash, 1)
return false
}
}
store update 方法
func (sm *shardedMap) Update(newItem *Item) (interface{}, bool) {
// 通過 key hash 取摸找到 shard,并執行 shard 的 update 方法.
return sm.shards[newItem.Key%numShards].Update(newItem)
}
func (m *lockedMap) Update(newItem *Item) (interface{}, bool) {
m.Lock()
// 判斷是否已經存在 ?
item, ok := m.data[newItem.Key]
if !ok {
m.Unlock()
// 無,直接傳回
return nil, false
}
// 有,則判斷 hash conflict hash 是否一緻,不一緻則不為同一個值,直接跳出.
if newItem.Conflict != 0 && (newItem.Conflict != item.conflict) {
m.Unlock()
return nil, false
}
// 在 shard expirationMap 裡更新過期資訊.
m.em.update(newItem.Key, newItem.Conflict, item.expiration, newItem.Expiration)
// 把資料加到 shard data 裡.
m.data[newItem.Key] = storeItem{
key: newItem.Key,
conflict: newItem.Conflict,
value: newItem.Value,
expiration: newItem.Expiration,
}
m.Unlock()
// 傳回資料和true, 表明更新成功.
return item.value, true
}
ProcessItems
ristretto 執行個體化的時候會開啟一個協程運作 processItems,processItems 方法可以處理 item 的增删改,還可以周期性進行 GC 垃圾回收。
// processItems is ran by goroutines processing the Set buffer.
func (c *Cache) processItems() {
startTs := make(map[uint64]time.Time)
numToKeep := 100000 // TODO: Make this configurable via options.
trackAdmission := func(key uint64) {
if c.Metrics == nil {
return
}
startTs[key] = time.Now()
if len(startTs) > numToKeep {
for k := range startTs {
if len(startTs) <= numToKeep {
break
}
delete(startTs, k)
}
}
}
onEvict := func(i *Item) {
if ts, has := startTs[i.Key]; has {
c.Metrics.trackEviction(int64(time.Since(ts) / time.Second))
delete(startTs, i.Key)
}
if c.onEvict != nil {
c.onEvict(i)
}
}
for {
select {
case i := <-c.setBuf:
if i.wg != nil {
i.wg.Done()
continue
}
// 如果是添加和更新,則需要計算 item 的開銷
if i.Cost == 0 && c.cost != nil && i.flag != itemDelete {
i.Cost = c.cost(i.Value)
}
// 判斷是否需要忽略内部資料結構的開銷.
if !c.ignoreInternalCost {
// 累加 Item 自身的位元組開銷
i.Cost += itemSize
}
// 根據 flag 類型執行增删改邏輯.
switch i.flag {
case itemNew:
// 當超過門檻值時,傳回需要删除的資料 victims,added 為是否添加成功.
victims, added := c.policy.Add(i.Key, i.Cost)
if added {
// 在 store 裡添加 item 對象.
c.store.Set(i)
// 統計資訊
c.Metrics.add(keyAdd, i.Key, 1)
trackAdmission(i.Key)
} else {
// 回調傳入的 onreject 方法,且執行 onExit 方法.
c.onReject(i)
}
// 删除相關記錄
for _, victim := range victims {
victim.Conflict, victim.Value = c.store.Del(victim.Key, 0)
// 被緩存政策驅逐需要回調 onEvict 方法.
onEvict(victim)
}
case itemUpdate:
// 由于在 cache.Set 方法裡處理了 Update 的資料,這裡隻做 policy 通知即可.
c.policy.Update(i.Key, i.Cost)
case itemDelete:
// 在 policy裡 删除
c.policy.Del(i.Key) // Deals with metrics updates.
// 在 store 删除該記錄
_, val := c.store.Del(i.Key, i.Conflict)
// 執行回調方法
c.onExit(val)
}
case <-c.cleanupTicker.C:
// 進行 GC 垃圾回收,删除過期的資料.
c.store.Cleanup(c.policy, onEvict)
case <-c.stop:
return
}
}
}
store Set
先從 sharedMap 裡找到相關的 shard,然後在 shard 判斷是否以前有值, 如果有前值,且 key conflict hash 值不一緻,則直接跳出,否則更新 expireMap,然後把 item 接到 shard 的 data 裡。
代碼位置: github/ristretto/store.go
// 在 shardedMap 裡寫入.
func (sm *shardedMap) Set(i *Item) {
if i == nil {
return
}
// 對 key hash 取摸計算出 shard,再對 shard 調用 set 來寫入.
sm.shards[i.Key%numShards].Set(i)
}
// 在 shard 裡寫入.
func (m *lockedMap) Set(i *Item) {
if i == nil {
// If the item is nil make this Set a no-op.
return
}
// 執行 shard 内部的鎖
m.Lock()
defer m.Unlock()
// 嘗試擷取先前資料
item, ok := m.data[i.Key]
if ok {
// 如果有前值,且 key conflict hash 值不一緻,則直接跳出.
if i.Conflict != 0 && (i.Conflict != item.conflict) {
return
}
// 如果有舊值,則在 expire map 裡更新過期時間.
m.em.update(i.Key, i.Conflict, item.expiration, i.Expiration)
} else {
// 如果沒有,在 expire map 裡添加過期資訊.
m.em.add(i.Key, i.Conflict, i.Expiration)
}
// 把 item 裡加到 data 裡.
m.data[i.Key] = storeItem{
key: i.Key, // key hash1
conflict: i.Conflict, // key hash2
value: i.Value,
expiration: i.Expiration,
}
}
add 在 expirationMap 裡添加過期資料,update 則是在 expirationMap 删除以前的過期資料,添加新的過期資料。
代碼位置: github/ristretto/ttl.go
var (
bucketDurationSecs = int64(5)
)
// 每 5 秒為一個 bucket.
func storageBucket(t time.Time) int64 {
return (t.Unix() / bucketDurationSecs) + 1
}
func (m *expirationMap) add(key, conflict uint64, expiration time.Time) {
// 判空
if m == nil {
return
}
// 過期為空, 則直接跳出.
if expiration.IsZero() {
return
}
// 通過過期時長擷取 bucket 索引.
bucketNum := storageBucket(expiration)
m.Lock()
defer m.Unlock()
// 把 key 和 conflict 加到 bucket 裡.
b, ok := m.buckets[bucketNum]
if !ok {
b = make(bucket)
m.buckets[bucketNum] = b
}
b[key] = conflict
}
func (m *expirationMap) update(key, conflict uint64, oldExpTime, newExpTime time.Time) {
if m == nil {
return
}
m.Lock()
defer m.Unlock()
// 删舊
oldBucketNum := storageBucket(oldExpTime)
oldBucket, ok := m.buckets[oldBucketNum]
if ok {
delete(oldBucket, key)
}
// 加新
newBucketNum := storageBucket(newExpTime)
newBucket, ok := m.buckets[newBucketNum]
if !ok {
newBucket = make(bucket)
m.buckets[newBucketNum] = newBucket
}
newBucket[key] = conflict
}
Get 讀流程
Get 用來讀取資料, 其流程如下。
func (c *Cache) Get(key interface{}) (interface{}, bool) {
// 基本判空
if c == nil || c.isClosed || key == nil {
return nil, false
}
// 使用不同的 hash 算法計算出 key 的兩個 hash 值,一個用來做 key 的哈希值,另一個用來用來做判斷沖突的哈希值.
keyHash, conflictHash := c.keyToHash(key)
// 調用 ringBuffer 的 push 接口,把 key hash 放進去.
// 其目的用來實作 lfu 政策.
c.getBuf.Push(keyHash)
// 從 store 裡擷取 value
value, ok := c.store.Get(keyHash, conflictHash)
if ok {
// 如果有值,則在 hit 統計裡加一.
c.Metrics.add(hit, keyHash, 1)
} else {
// 如果沒有值,則在 miss 統計裡加一.
c.Metrics.add(miss, keyHash, 1)
}
// 傳回
return value, ok
}
從 shardedMap 裡擷取資料, 其内部流程是先取摸計算出 key 應該在哪個 shard,然後再從 shard 裡擷取 kv,然後對比 key conflict hash 值,不一緻則說明不是同一條資料,接着判斷 kv 是否過期,如過期也傳回 false,不過期則傳回資料。
// 從 shardedMap 裡擷取資料.
func (sm *shardedMap) Get(key, conflict uint64) (interface{}, bool) {
return sm.shards[key%numShards].get(key, conflict)
}
// 從 shard 裡擷取資料.
func (m *lockedMap) get(key, conflict uint64) (interface{}, bool) {
m.RLock()
item, ok := m.data[key]
m.RUnlock()
if !ok {
return nil, false
}
if conflict != 0 && (conflict != item.conflict) {
return nil, false
}
// 判斷是否逾時,當 kv 過期逾時,這裡不主動删除資料,而直接傳回無資料的錯誤.
// 這裡的删除等待定時器的 gc 垃圾回收處理.
if !item.expiration.IsZero() && time.Now().After(item.expiration) {
return nil, false
}
// 傳回資料
return item.value, true
}
Del 删除資料
Del 用來删除資料, 其流程如下。
// Del deletes the key-value item from the cache if it exists.
func (c *Cache) Del(key interface{}) {
// 判空
if c == nil || c.isClosed || key == nil {
return
}
// 通過不同的 hash 算法擷取 key 的兩個 hash 值.
keyHash, conflictHash := c.keyToHash(key)
// 在 store 裡删除資料
_, prev := c.store.Del(keyHash, conflictHash)
// 回調 exit 方法
c.onExit(prev)
// 像 setBuf 裡傳遞 item 對象,flag 标記為 delete.
c.setBuf <- &Item{
flag: itemDelete,
Key: keyHash,
Conflict: conflictHash,
}
}
從 shardedMap 裡删除資料,其流程如下。
- 通過 key hash 擷取 item ;
- 判斷 hash conflict hash 值是否一緻,不一緻則直接 return ;
- 如果 item 配置了過期時間,則在 expirationMap 裡删除相關記錄 ;
- 在 map 裡删除 item.
// 從 shardedMap 裡删除資料.
func (sm *shardedMap) Del(key, conflict uint64) (uint64, interface{}) {
return sm.shards[key%numShards].Del(key, conflict)
}
// 從 shard 裡删除資料.
func (m *lockedMap) Del(key, conflict uint64) (uint64, interface{}) {
m.Lock()
// 通過 key hash 擷取 item
item, ok := m.data[key]
if !ok {
// 沒有值,無需删除,傳回
m.Unlock()
return 0, nil
}
// 判斷 hash conflict hash 值是否一緻.
if conflict != 0 && (conflict != item.conflict) {
m.Unlock()
return 0, nil
}
// 如果 item 配置了過期時間,則在 expirationMap 裡删除.
if !item.expiration.IsZero() {
m.em.del(key, item.expiration)
}
// 删除 item.
delete(m.data, key)
m.Unlock()
// 傳回 hash 沖突值和 value.
return item.conflict, item.value
}
expire gc 過期垃圾回收的設計原理
ristretto 初始化會啟動一個協程來執行 processItems, 該方法裡不僅監聽 setBuf 管道來對資料增删改,且還會監聽垃圾回收定時器來完成垃圾回收。 每 2.5 秒調用 Cleanup 方法執行過期鍵的垃圾回收。
var (
bucketDurationSecs = int64(5)
cleanupTicker = time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2),
)
func (c *Cache) processItems() {
startTs := make(map[uint64]time.Time)
numToKeep := 100000
onEvict := func(i *Item) {
if ts, has := startTs[i.Key]; has {
c.Metrics.trackEviction(int64(time.Since(ts) / time.Second))
delete(startTs, i.Key)
}
if c.onEvict != nil {
c.onEvict(i)
}
}
for {
select {
case i := <-c.setBuf:
// ...
case <-c.cleanupTicker.C:
// 執行過期資料垃圾回收.
c.store.Cleanup(c.policy, onEvict)
}
}
}
Cleanup 是過期鍵垃圾回收的核心方法,其内部流程如下。
先擷取目前時間點以前的 bucket,然後周遊該 bucket 内的所有 keys,判斷是否過期,如過期則在 store 裡删除 item,沒過期則跳過。
func (sm *shardedMap) Cleanup(policy policy, onEvict itemCallback) {
sm.expiryMap.cleanup(sm, policy, onEvict)
}
func cleanupBucket(t time.Time) int64 {
return storageBucket(t) - 1
}
func (m *expirationMap) cleanup(store store, policy policy, onEvict itemCallback) {
// 判空
if m == nil {
return
}
m.Lock()
now := time.Now()
// 取出目前時間對應的上一個 bucket 時間點.
bucketNum := cleanupBucket(now)
// 取出該 bucket 的 key hash 和 conflict hash.
keys := m.buckets[bucketNum]
// 在 buckets 集合裡删該 bucket.
delete(m.buckets, bucketNum)
// 放鎖,因為後面的路基是慢邏輯.
m.Unlock()
// 周遊上一個 bucket 的 key hash 集合, 嘗試删除過期的鍵值.
for key, conflict := range keys {
// 沒過期,跳過
if store.Expiration(key).After(now) {
continue
}
// 如過期擷取 key 的 cost 開銷值.
cost := policy.Cost(key)
// 在 policy 裡删除 key.
policy.Del(key)
// store 裡删除該值.
_, value := store.Del(key, conflict)
// 執行回調方法.
if onEvict != nil {
onEvict(&Item{Key: key,
Conflict: conflict,
Value: value,
Cost: cost,
})
}
}
}
總結
簡單說, ristretto 的 kv 存儲使用分片map來實作的,而緩存淘汰則使用 tinyLFU 實作。
from https://xiaorui.cc/archives/7383