點選上方“芋道源碼”,選擇“設為星标”
管她前浪,還是後浪?
能浪的浪,才是好浪!
每天 8:55 更新文章,每天掉億點點頭發...
源碼精品專欄
- 原創 | Java 2020 超神之路,很肝~
- 中文詳細注釋的開源項目
- RPC 架構 Dubbo 源碼解析
- 消息中間件 RocketMQ 源碼解析
- 資料庫中間件 Sharding-JDBC 和 MyCAT 源碼解析
- 作業排程中間件 Elastic-Job 源碼解析
- 分布式事務中間件 TCC-Transaction 源碼解析
- Eureka 和 Hystrix 源碼解析
- Java 并發源碼
來源:albenw.github.io/posts/a4ae1aa2/
- 與Guava Cache比較
- Caffeine的高性能設計
-
- 異步的高性能讀寫
- 總結
概要
Caffeine是一個高性能,高命中率,低記憶體占用,near optimal 的本地緩存,簡單來說它是Guava Cache的優化加強版,有些文章把Caffeine稱為“新一代的緩存”、“現代緩存之王”。本文将重點講解Caffeine的高性能設計,以及對應部分的源碼分析。與Guava Cache比較
如果你對Guava Cache還不了解的話,可以點選這裡來看一下我之前寫過關于Guava Cache的文章。
大家都知道,Spring5即将放棄掉Guava Cache作為緩存機制,而改用Caffeine作為新的本地Cache的元件,這對于Caffeine來說是一個很大的肯定。為什麼Spring會這樣做呢?其實在Caffeine的Benchmarks裡給出了好靓仔的資料,對讀和寫的場景,還有跟其他幾個緩存工具進行了比較,Caffeine的性能都表現很突出。
upload successful萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 使用Caffeine
Caffeine為了友善大家使用以及從Guava Cache切換過來(很有針對性啊~),借鑒了Guava Cache大部分的概念(諸如核心概念Cache、LoadingCache、CacheLoader、CacheBuilder等等),對于Caffeine的了解隻要把它當作Guava Cache就可以了。
使用上,大家隻要把Caffeine的包引進來,然後換一下cache的實作類,基本應該就沒問題了。這對與已經使用過Guava Cache的同學來說沒有任何難度,甚至還有一點熟悉的味道,如果你之前沒有使用過Guava Cache,可以檢視Caffeine的官方API說明文檔,其中Population,Eviction,Removal,Refresh,Statistics,Cleanup,Policy等等這些特性都是跟Guava Cache基本一樣的。
下面給出一個例子說明怎樣建立一個Cache:
更多從Guava Cache遷移過來的使用說明,請看這裡private static LoadingCache<String, String> cache = Caffeine.newBuilder() //最大個數限制 .maximumSize(256L) //初始化容量 .initialCapacity(1) //通路後過期(包括讀和寫) .expireAfterAccess(2, TimeUnit.DAYS) //寫後過期 .expireAfterWrite(2, TimeUnit.HOURS) //寫後自動異步重新整理 .refreshAfterWrite(1, TimeUnit.HOURS) //記錄下緩存的一些統計資料,例如命中率等 .recordStats() //cache對緩存寫的通知回調 .writer(new CacheWriter<Object, Object>() { @Override public void write(@NonNull Object key, @NonNull Object value) { log.info("key={}, CacheWriter write", key); } @Override public void delete(@NonNull Object key, @Nullable Object value, @NonNull RemovalCause cause) { log.info("key={}, cause={}, CacheWriter delete", key, cause); } }) //使用CacheLoader建立一個LoadingCache .build(new CacheLoader<String, String>() { //同步加載資料 @Nullable @Override public String load(@NonNull String key) throws Exception { return "value_" + key; } //異步加載資料 @Nullable @Override public String reload(@NonNull String key, @NonNull String oldValue) throws Exception { return "value_" + key; } });
Caffeine的高性能設計
判斷一個緩存的好壞最核心的名額就是命中率,影響緩存命中率有很多因素,包括業務場景、淘汰政策、清理政策、緩存容量等等。如果作為本地緩存, 它的性能的情況,資源的占用也都是一個很重要的名額。下面
我們來看看Caffeine在這幾個方面是怎麼着手的,如何做優化的。
(注:本文不會分析Caffeine全部源碼,隻會對核心設計的實作進行分析,但我建議讀者把Caffeine的源碼都涉獵一下,有個overview才能更好了解本文。如果你看過Guava Cache的源碼也行,代碼的資料結構和處理邏輯很類似的。
源碼基于:caffeine-2.8.0.jar)
W-TinyLFU整體設計
上面說到淘汰政策是影響緩存命中率的因素之一,一般比較簡單的緩存就會直接用到LFU(Least Frequently Used,即最不經常使用)或者LRU(Least Recently Used,即最近最少使用),而Caffeine就是使用了W-TinyLFU算法。
W-TinyLFU看名字就能大概猜出來,它是LFU的變種,也是一種緩存淘汰算法。那為什麼要使用W-TinyLFU呢?
LRU和LFU的缺點
- LRU實作簡單,在一般情況下能夠表現出很好的命中率,是一個“成本效益”很高的算法,平時也很常用。雖然LRU對突發性的稀疏流量(sparse bursts)表現很好,但同時也會産生緩存污染,舉例來說,如果偶然性的要對全量資料進行周遊,那麼“曆史通路記錄”就會被刷走,造成污染。
- 如果資料的分布在一段時間内是固定的話,那麼LFU可以達到最高的命中率。但是LFU有兩個缺點,第一,它需要給每個記錄項維護頻率資訊,每次通路都需要更新,這是個巨大的開銷;第二,對突發性的稀疏流量無力,因為前期經常通路的記錄已經占用了緩存,偶然的流量不太可能會被保留下來,而且過去的一些大量被通路的記錄在将來也不一定會使用上,這樣就一直把“坑”占着了。
TinyLFU
TinyLFU就是其中一個優化算法,它是專門為了解決LFU上述提到的兩個問題而被設計出來的。
解決第一個問題是采用了Count–Min Sketch算法。
解決第二個問題是讓記錄盡量保持相對的“新鮮”(Freshness Mechanism),并且當有新的記錄插入時,可以讓它跟老的記錄進行“PK”,輸者就會被淘汰,這樣一些老的、不再需要的記錄就會被剔除。
下圖是TinyLFU設計圖(來自官方)
upload successful萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 統計頻率Count–Min Sketch算法
如何對一個key進行統計,但又可以節省空間呢?(不是簡單的使用HashMap,這太消耗記憶體了),注意哦,不需要精确的統計,隻需要一個近似值就可以了,怎麼樣,這樣場景是不是很熟悉,如果你是老司機,或許已經聯想到布隆過濾器(Bloom Filter)的應用了。
沒錯,将要介紹的Count–Min Sketch的原理跟Bloom Filter一樣,隻不過Bloom Filter隻有0和1的值,那麼你可以把Count–Min Sketch看作是“數值”版的Bloom Filter。
更多關于Count–Min Sketch的介紹請自行搜尋。
在TinyLFU中,近似頻率的統計如下圖所示:
萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 upload successful
對一個key進行多次hash函數後,index到多個數組位置後進行累加,查詢時取多個值中的最小值即可。
Caffeine對這個算法的實作在
FrequencySketch
類。但Caffeine對此有進一步的優化,例如Count–Min Sketch使用了二維數組,Caffeine隻是用了一個一維的數組;再者,如果是數值類型的話,這個數需要用int或long來存儲,但是Caffeine認為緩存的通路頻率不需要用到那麼大,隻需要15就足夠,一般認為達到15次的頻率算是很高的了,而且Caffeine還有另外一個機制來使得這個頻率進行衰退減半(下面就會講到)。如果最大是15的話,那麼隻需要4個bit就可以滿足了,一個long有64bit,可以存儲16個這樣的統計數,Caffeine就是這樣的設計,使得存儲效率提高了16倍。
Caffeine對緩存的讀寫(
和afterRead
方法)都會調用afterWrite
s方法,而onAccess方法裡有一句:onAccess
這句就是追加記錄的頻率,下面我們看看具體實作frequencySketch().increment(key);
知道了追加方法,那麼讀取方法//FrequencySketch的一些屬性 //種子數 static final long[] SEED = { // A mixture of seeds from FNV-1a, CityHash, and Murmur3 0xc3a5c85c97cb3127L, 0xb492b66fbe98f273L, 0x9ae16a3b2f90404fL, 0xcbf29ce484222325L}; static final long RESET_MASK = 0x7777777777777777L; static final long ONE_MASK = 0x1111111111111111L; int sampleSize; //為了快速根據hash值得到table的index值的掩碼 //table的長度size一般為2的n次方,而tableMask為size-1,這樣就可以通過&操作來模拟取餘操作,速度快很多,老司機都知道 int tableMask; //存儲資料的一維long數組 long[] table; int size; /** * Increments the popularity of the element if it does not exceed the maximum (15). The popularity * of all elements will be periodically down sampled when the observed events exceeds a threshold. * This process provides a frequency aging to allow expired long term entries to fade away. * * @param e the element to add */ public void increment(@NonNull E e) { if (isNotInitialized()) { return; } //根據key的hashCode通過一個哈希函數得到一個hash值 //本來就是hashCode了,為什麼還要再做一次hash?怕原來的hashCode不夠均勻分散,再打散一下。 int hash = spread(e.hashCode()); //這句光看有點難了解 //就如我剛才說的,Caffeine把一個long的64bit劃分成16個等分,每一等分4個bit。 //這個start就是用來定位到是哪一個等分的,用hash值低兩位作為随機數,再左移2位,得到一個小于16的值 int start = (hash & 3) << 2; //indexOf方法的意思就是,根據hash值和不同種子得到table的下标index //這裡通過四個不同的種子,得到四個不同的下标index int index0 = indexOf(hash, 0); int index1 = indexOf(hash, 1); int index2 = indexOf(hash, 2); int index3 = indexOf(hash, 3); //根據index和start(+1, +2, +3)的值,把table[index]對應的等分追加1 //這個incrementAt方法有點難了解,看我下面的解釋 boolean added = incrementAt(index0, start); added |= incrementAt(index1, start + 1); added |= incrementAt(index2, start + 2); added |= incrementAt(index3, start + 3); //這個reset等下說 if (added && (++size == sampleSize)) { reset(); } } /** * Increments the specified counter by 1 if it is not already at the maximum value (15). * * @param i the table index (16 counters) * @param j the counter to increment * @return if incremented */ boolean incrementAt(int i, int j) { //這個j表示16個等分的下标,那麼offset就是相當于在64位中的下标(這個自己想想) int offset = j << 2; //上面提到Caffeine把頻率統計最大定為15,即0xfL //mask就是在64位中的掩碼,即1111後面跟很多個0 long mask = (0xfL << offset); //如果&的結果不等于15,那麼就追加1。等于15就不會再加了 if ((table[i] & mask) != mask) { table[i] += (1L << offset); return true; } return false; } /** * Returns the table index for the counter at the specified depth. * * @param item the element's hash * @param i the counter depth * @return the table index */ int indexOf(int item, int i) { long hash = SEED[i] * item; hash += hash >>> 32; return ((int) hash) & tableMask; } /** * Applies a supplemental hash function to a given hashCode, which defends against poor quality * hash functions. */ int spread(int x) { x = ((x >>> 16) ^ x) * 0x45d9f3b; x = ((x >>> 16) ^ x) * 0x45d9f3b; return (x >>> 16) ^ x; }
就很容易了解了。frequency
/** * Returns the estimated number of occurrences of an element, up to the maximum (15). * * @param e the element to count occurrences of * @return the estimated number of occurrences of the element; possibly zero but never negative */ @NonNegative public int frequency(@NonNull E e) { if (isNotInitialized()) { return 0; } //得到hash值,跟上面一樣 int hash = spread(e.hashCode()); //得到等分的下标,跟上面一樣 int start = (hash & 3) << 2; int frequency = Integer.MAX_VALUE; //循環四次,分别擷取在table數組中不同的下标位置 for (int i = 0; i < 4; i++) { int index = indexOf(hash, i); //這個操作就不多說了,其實跟上面incrementAt是一樣的,定位到table[index] + 等分的位置,再根據mask取出計數值 int count = (int) ((table[index] >>> ((start + i) << 2)) & 0xfL); //取四個中的較小值 frequency = Math.min(frequency, count); } return frequency; }
通過代碼和注釋或者讀者可能難以了解,下圖是我畫出來幫助大家了解的結構圖。
注意紫色虛線框,其中藍色小格就是需要計算的位置:
upload successful萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 保新機制
為了讓緩存保持“新鮮”,剔除掉過往頻率很高但之後不經常的緩存,Caffeine有一個Freshness Mechanism。做法很簡答,就是當整體的統計計數(目前所有記錄的頻率統計之和,這個數值内部維護)達到某一個值時,那麼所有記錄的頻率統計除以2。
從上面的代碼
看到//size變量就是所有記錄的頻率統計之,即每個記錄加1,這個size都會加1 //sampleSize一個門檻值,從FrequencySketch初始化可以看到它的值為maximumSize的10倍 if (added && (++size == sampleSize)) { reset(); }
方法就是做這個事情reset
關于這個reset方法,為什麼是除以2,而不是其他,及其正确性,在最下面的參考資料的TinyLFU論文中3.3章節給出了數學證明,大家有興趣可以看看。/** Reduces every counter by half of its original value. */ void reset() { int count = 0; for (int i = 0; i < table.length; i++) { count += Long.bitCount(table[i] & ONE_MASK); table[i] = (table[i] >>> 1) & RESET_MASK; } size = (size >>> 1) - (count >>> 2); }
增加一個Window?
Caffeine通過測試發現TinyLFU在面對突發性的稀疏流量(sparse bursts)時表現很差,因為新的記錄(new items)還沒來得及建立足夠的頻率就被剔除出去了,這就使得命中率下降。
于是Caffeine設計出一種新的policy,即Window Tiny LFU(W-TinyLFU),并通過實驗和實踐發現W-TinyLFU比TinyLFU表現的更好。
W-TinyLFU的設計如下所示(兩圖等價):
upload successful萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 upload successful
它主要包括兩個緩存子產品,主緩存是SLRU(Segmented LRU,即分段LRU),SLRU包括一個名為protected和一個名為probation的緩存區。通過增加一個緩存區(即Window Cache),當有新的記錄插入時,會先在window區呆一下,就可以避免上述說的sparse bursts問題。
淘汰政策(eviction policy)
當window區滿了,就會根據LRU把candidate(即淘汰出來的元素)放到probation區,如果probation區也滿了,就把candidate和probation将要淘汰的元素victim,兩個進行“PK”,勝者留在probation,輸者就要被淘汰了。
而且經過實驗發現當window區配置為總容量的1%,剩餘的99%當中的80%分給protected區,20%分給probation區時,這時整體性能和命中率表現得最好,是以Caffeine預設的比例設定就是這個。
不過這個比例Caffeine會在運作時根據統計資料(statistics)去動态調整,如果你的應用程式的緩存随着時間變化比較快的話,那麼增加window區的比例可以提高命中率,相反緩存都是比較固定不變的話,增加Main Cache區(protected區 +probation區)的比例會有較好的效果。
下面我們看看上面說到的淘汰政策是怎麼實作的:
一般緩存對讀寫操作後都有後續的一系列“維護”操作,Caffeine也不例外,這些操作都在
maintenance
方法,我們将要說到的淘汰政策也在裡面。
這方法比較重要,下面也會提到,是以這裡隻先說跟“淘汰政策”有關的
和evictEntries
。climb
先說一下Caffeine對上面說到的W-TinyLFU政策的實作用到的資料結構:/** * Performs the pending maintenance work and sets the state flags during processing to avoid * excess scheduling attempts. The read buffer, write buffer, and reference queues are * drained, followed by expiration, and size-based eviction. * * @param task an additional pending task to run, or {@code null} if not present */ @GuardedBy("evictionLock") void maintenance(@Nullable Runnable task) { lazySetDrainStatus(PROCESSING_TO_IDLE); try { drainReadBuffer(); drainWriteBuffer(); if (task != null) { task.run(); } drainKeyReferences(); drainValueReferences(); expireEntries(); //把符合條件的記錄淘汰掉 evictEntries(); //動态調整window區和protected區的大小 climb(); } finally { if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) { lazySetDrainStatus(REQUIRED); } } }
以及預設比例設定(意思看注釋)//最大的個數限制 long maximum; //目前的個數 long weightedSize; //window區的最大限制 long windowMaximum; //window區目前的個數 long windowWeightedSize; //protected區的最大限制 long mainProtectedMaximum; //protected區目前的個數 long mainProtectedWeightedSize; //下一次需要調整的大小(還需要進一步計算) double stepSize; //window區需要調整的大小 long adjustment; //命中計數 int hitsInSample; //不命中的計數 int missesInSample; //上一次的緩存命中率 double previousSampleHitRate; final FrequencySketch<K> sketch; //window區的LRU queue(FIFO) final AccessOrderDeque<Node<K, V>> accessOrderWindowDeque; //probation區的LRU queue(FIFO) final AccessOrderDeque<Node<K, V>> accessOrderProbationDeque; //protected區的LRU queue(FIFO) final AccessOrderDeque<Node<K, V>> accessOrderProtectedDeque;
重點來了,evictEntries和climb方法:/** The initial percent of the maximum weighted capacity dedicated to the main space. */ static final double PERCENT_MAIN = 0.99d; /** The percent of the maximum weighted capacity dedicated to the main's protected space. */ static final double PERCENT_MAIN_PROTECTED = 0.80d; /** The difference in hit rates that restarts the climber. */ static final double HILL_CLIMBER_RESTART_THRESHOLD = 0.05d; /** The percent of the total size to adapt the window by. */ static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d; /** The rate to decrease the step size to adapt by. */ static final double HILL_CLIMBER_STEP_DECAY_RATE = 0.98d; /** The maximum number of entries that can be transfered between queues. */
/** Evicts entries if the cache exceeds the maximum. */ @GuardedBy("evictionLock") void evictEntries() { if (!evicts()) { return; } //淘汰window區的記錄 int candidates = evictFromWindow(); //淘汰Main區的記錄 evictFromMain(candidates); } /** * Evicts entries from the window space into the main space while the window size exceeds a * maximum. * * @return the number of candidate entries evicted from the window space */ //根據W-TinyLFU,新的資料都會無條件的加到admission window //但是window是有大小限制,是以要“定期”做一下“維護” @GuardedBy("evictionLock") int evictFromWindow() { int candidates = 0; //檢視window queue的頭部節點 Node<K, V> node = accessOrderWindowDeque().peek(); //如果window區超過了最大的限制,那麼就要把“多出來”的記錄做處理 while (windowWeightedSize() > windowMaximum()) { // The pending operations will adjust the size to reflect the correct weight if (node == null) { break; } //下一個節點 Node<K, V> next = node.getNextInAccessOrder(); if (node.getWeight() != 0) { //把node定位在probation區 node.makeMainProbation(); //從window區去掉 accessOrderWindowDeque().remove(node); //加入到probation queue,相當于把節點移動到probation區(晉升了) accessOrderProbationDeque().add(node); candidates++; //因為移除了一個節點,是以需要調整window的size setWindowWeightedSize(windowWeightedSize() - node.getPolicyWeight()); } //處理下一個節點 node = next; } return candidates; }
方法:evictFromMain
/** * Evicts entries from the main space if the cache exceeds the maximum capacity. The main space * determines whether admitting an entry (coming from the window space) is preferable to retaining * the eviction policy's victim. This is decision is made using a frequency filter so that the * least frequently used entry is removed. * * The window space candidates were previously placed in the MRU position and the eviction * policy's victim is at the LRU position. The two ends of the queue are evaluated while an * eviction is required. The number of remaining candidates is provided and decremented on * eviction, so that when there are no more candidates the victim is evicted. * * @param candidates the number of candidate entries evicted from the window space */ //根據W-TinyLFU,從window晉升過來的要跟probation區的進行“PK”,勝者才能留下 @GuardedBy("evictionLock") void evictFromMain(int candidates) { int victimQueue = PROBATION; //victim是probation queue的頭部 Node<K, V> victim = accessOrderProbationDeque().peekFirst(); //candidate是probation queue的尾部,也就是剛從window晉升來的 Node<K, V> candidate = accessOrderProbationDeque().peekLast(); //當cache不夠容量時才做處理 while (weightedSize() > maximum()) { // Stop trying to evict candidates and always prefer the victim if (candidates == 0) { candidate = null; } //對candidate為null且victim為bull的處理 if ((candidate == null) && (victim == null)) { if (victimQueue == PROBATION) { victim = accessOrderProtectedDeque().peekFirst(); victimQueue = PROTECTED; continue; } else if (victimQueue == PROTECTED) { victim = accessOrderWindowDeque().peekFirst(); victimQueue = WINDOW; continue; } // The pending operations will adjust the size to reflect the correct weight break; } //對節點的weight為0的處理 if ((victim != null) && (victim.getPolicyWeight() == 0)) { victim = victim.getNextInAccessOrder(); continue; } else if ((candidate != null) && (candidate.getPolicyWeight() == 0)) { candidate = candidate.getPreviousInAccessOrder(); candidates--; continue; } // Evict immediately if only one of the entries is present if (victim == null) { @SuppressWarnings("NullAway") Node<K, V> previous = candidate.getPreviousInAccessOrder(); Node<K, V> evict = candidate; candidate = previous; candidates--; evictEntry(evict, RemovalCause.SIZE, 0L); continue; } else if (candidate == null) { Node<K, V> evict = victim; victim = victim.getNextInAccessOrder(); evictEntry(evict, RemovalCause.SIZE, 0L); continue; } // Evict immediately if an entry was collected K victimKey = victim.getKey(); K candidateKey = candidate.getKey(); if (victimKey == null) { @NonNull Node<K, V> evict = victim; victim = victim.getNextInAccessOrder(); evictEntry(evict, RemovalCause.COLLECTED, 0L); continue; } else if (candidateKey == null) { candidates--; @NonNull Node<K, V> evict = candidate; candidate = candidate.getPreviousInAccessOrder(); evictEntry(evict, RemovalCause.COLLECTED, 0L); continue; } //放不下的節點直接處理掉 if (candidate.getPolicyWeight() > maximum()) { candidates--; Node<K, V> evict = candidate; candidate = candidate.getPreviousInAccessOrder(); evictEntry(evict, RemovalCause.SIZE, 0L); continue; } //根據節點的統計頻率frequency來做比較,看看要處理掉victim還是candidate //admit是具體的比較規則,看下面 candidates--; //如果candidate勝出則淘汰victim if (admit(candidateKey, victimKey)) { Node<K, V> evict = victim; victim = victim.getNextInAccessOrder(); evictEntry(evict, RemovalCause.SIZE, 0L); candidate = candidate.getPreviousInAccessOrder(); } else { //如果是victim勝出,則淘汰candidate Node<K, V> evict = candidate; candidate = candidate.getPreviousInAccessOrder(); evictEntry(evict, RemovalCause.SIZE, 0L); } } } /** * Determines if the candidate should be accepted into the main space, as determined by its * frequency relative to the victim. A small amount of randomness is used to protect against hash * collision attacks, where the victim's frequency is artificially raised so that no new entries * are admitted. * * @param candidateKey the key for the entry being proposed for long term retention * @param victimKey the key for the entry chosen by the eviction policy for replacement * @return if the candidate should be admitted and the victim ejected */ @GuardedBy("evictionLock") boolean admit(K candidateKey, K victimKey) { //分别擷取victim和candidate的統計頻率 //frequency這個方法的原理和實作上面已經解釋了 int victimFreq = frequencySketch().frequency(victimKey); int candidateFreq = frequencySketch().frequency(candidateKey); //誰大誰赢 if (candidateFreq > victimFreq) { return true; //如果相等,candidate小于5都當輸了 } else if (candidateFreq <= 5) { // The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack // exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm // candidate reduces the number of random acceptances to minimize the impact on the hit rate. return false; } //如果相等且candidate大于5,則随機淘汰一個 int random = ThreadLocalRandom.current().nextInt(); return ((random & 127) == 0); }
climb
方法主要是用來調整window size的,使得Caffeine可以适應你的應用類型(如OLAP或OLTP)表現出最佳的命中率。
下圖是官方測試的資料:
萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 upload successful
我們看看window size的調整是怎麼實作的。
調整時用到的預設比例資料:
下面分别展開每個方法來解釋://與上次命中率之差的門檻值 static final double HILL_CLIMBER_RESTART_THRESHOLD = 0.05d; //步長(調整)的大小(跟最大值maximum的比例) static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d; //步長的衰減比例 static final double HILL_CLIMBER_STEP_DECAY_RATE = 0.98d; /** Adapts the eviction policy to towards the optimal recency / frequency configuration. */ //climb方法的主要作用就是動态調整window區的大小(相應的,main區的大小也會發生變化,兩個之和為100%)。 //因為區域的大小發生了變化,那麼區域内的資料也可能需要發生相應的移動。 @GuardedBy("evictionLock") void climb() { if (!evicts()) { return; } //确定window需要調整的大小 determineAdjustment(); //如果protected區有溢出,把溢出部分移動到probation區。因為下面的操作有可能需要調整到protected區。 demoteFromMainProtected(); long amount = adjustment(); if (amount == 0) { return; } else if (amount > 0) { //增加window的大小 increaseWindow(); } else { //減少window的大小 decreaseWindow(); } }
以上,是Caffeine的W-TinyLFU政策的設計原理及代碼實作解析。/** Calculates the amount to adapt the window by and sets {@link #adjustment()} accordingly. */ @GuardedBy("evictionLock") void determineAdjustment() { //如果frequencySketch還沒初始化,則傳回 if (frequencySketch().isNotInitialized()) { setPreviousSampleHitRate(0.0); setMissesInSample(0); setHitsInSample(0); return; } //總請求量 = 命中 + miss int requestCount = hitsInSample() + missesInSample(); //沒達到sampleSize則傳回 //預設下sampleSize = 10 * maximum。用sampleSize來判斷緩存是否足夠”熱“。 if (requestCount < frequencySketch().sampleSize) { return; } //命中率的公式 = 命中 / 總請求 double hitRate = (double) hitsInSample() / requestCount; //命中率的內插補點 double hitRateChange = hitRate - previousSampleHitRate(); //本次調整的大小,是由命中率的內插補點和上次的stepSize決定的 double amount = (hitRateChange >= 0) ? stepSize() : -stepSize(); //下次的調整大小:如果命中率的之差大于0.05,則重置為0.065 * maximum,否則按照0.98來進行衰減 double nextStepSize = (Math.abs(hitRateChange) >= HILL_CLIMBER_RESTART_THRESHOLD) ? HILL_CLIMBER_STEP_PERCENT * maximum() * (amount >= 0 ? 1 : -1) : HILL_CLIMBER_STEP_DECAY_RATE * amount; setPreviousSampleHitRate(hitRate); setAdjustment((long) amount); setStepSize(nextStepSize); setMissesInSample(0); setHitsInSample(0); } /** Transfers the nodes from the protected to the probation region if it exceeds the maximum. */ //這個方法比較簡單,減少protected區溢出的部分 @GuardedBy("evictionLock") void demoteFromMainProtected() { long mainProtectedMaximum = mainProtectedMaximum(); long mainProtectedWeightedSize = mainProtectedWeightedSize(); if (mainProtectedWeightedSize <= mainProtectedMaximum) { return; } for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) { if (mainProtectedWeightedSize <= mainProtectedMaximum) { break; } Node<K, V> demoted = accessOrderProtectedDeque().poll(); if (demoted == null) { break; } demoted.makeMainProbation(); accessOrderProbationDeque().add(demoted); mainProtectedWeightedSize -= demoted.getPolicyWeight(); } setMainProtectedWeightedSize(mainProtectedWeightedSize); } /** * Increases the size of the admission window by shrinking the portion allocated to the main * space. As the main space is partitioned into probation and protected regions (80% / 20%), for * simplicity only the protected is reduced. If the regions exceed their maximums, this may cause * protected items to be demoted to the probation region and probation items to be demoted to the * admission window. */ //增加window區的大小,這個方法比較簡單,思路就像我上面說的 @GuardedBy("evictionLock") void increaseWindow() { if (mainProtectedMaximum() == 0) { return; } long quota = Math.min(adjustment(), mainProtectedMaximum()); setMainProtectedMaximum(mainProtectedMaximum() - quota); setWindowMaximum(windowMaximum() + quota); demoteFromMainProtected(); for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) { Node<K, V> candidate = accessOrderProbationDeque().peek(); boolean probation = true; if ((candidate == null) || (quota < candidate.getPolicyWeight())) { candidate = accessOrderProtectedDeque().peek(); probation = false; } if (candidate == null) { break; } int weight = candidate.getPolicyWeight(); if (quota < weight) { break; } quota -= weight; if (probation) { accessOrderProbationDeque().remove(candidate); } else { setMainProtectedWeightedSize(mainProtectedWeightedSize() - weight); accessOrderProtectedDeque().remove(candidate); } setWindowWeightedSize(windowWeightedSize() + weight); accessOrderWindowDeque().add(candidate); candidate.makeWindow(); } setMainProtectedMaximum(mainProtectedMaximum() + quota); setWindowMaximum(windowMaximum() - quota); setAdjustment(quota); } /** Decreases the size of the admission window and increases the main's protected region. */ //同上increaseWindow差不多,反操作 @GuardedBy("evictionLock") void decreaseWindow() { if (windowMaximum() <= 1) { return; } long quota = Math.min(-adjustment(), Math.max(0, windowMaximum() - 1)); setMainProtectedMaximum(mainProtectedMaximum() + quota); setWindowMaximum(windowMaximum() - quota); for (int i = 0; i < QUEUE_TRANSFER_THRESHOLD; i++) { Node<K, V> candidate = accessOrderWindowDeque().peek(); if (candidate == null) { break; } int weight = candidate.getPolicyWeight(); if (quota < weight) { break; } quota -= weight; setMainProtectedWeightedSize(mainProtectedWeightedSize() + weight); setWindowWeightedSize(windowWeightedSize() - weight); accessOrderWindowDeque().remove(candidate); accessOrderProbationDeque().add(candidate); candidate.makeMainProbation(); } setMainProtectedMaximum(mainProtectedMaximum() - quota); setWindowMaximum(windowMaximum() + quota); setAdjustment(-quota); }
異步的高性能讀寫
一般的緩存每次對資料處理完之後(讀的話,已經存在則直接傳回,不存在則load資料,儲存,再傳回;寫的話,則直接插入或更新),但是因為要維護一些淘汰政策,則需要一些額外的操作,諸如:- 計算和比較資料的是否過期
- 統計頻率(像LFU或其變種)
- 維護read queue和write queue
- 淘汰符合條件的資料
- 等等。。。
ReadBuffer
在Caffeine的内部實作中,為了很好的支援不同的Features(如Eviction,Removal,Refresh,Statistics,Cleanup,Policy等等),擴充了很多子類,它們共同的父類是
,而BoundedLocalCache
就是作為它們共有的屬性,即都是用一樣的readBuffer,看定義:readBuffer
上面提到Caffeine對每次緩存的讀操作都會觸發final Buffer<Node<K, V>> readBuffer; readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess() ? new BoundedBuffer<>() : Buffer.disabled();
afterRead
重點看/** * Performs the post-processing work required after a read. * * @param node the entry in the page replacement policy * @param now the current time, in nanoseconds * @param recordHit if the hit count should be incremented */ void afterRead(Node<K, V> node, long now, boolean recordHit) { if (recordHit) { statsCounter().recordHits(1); } //把記錄加入到readBuffer //判斷是否需要立即處理readBuffer //注意這裡無論offer是否成功都可以走下去的,即允許寫入readBuffer丢失,因為這個 boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL); if (shouldDrainBuffers(delayable)) { scheduleDrainBuffers(); } refreshIfNeeded(node, now); } /** * Returns whether maintenance work is needed. * * @param delayable if draining the read buffer can be delayed */ //caffeine用了一組狀态來定義和管理“維護”的過程 boolean shouldDrainBuffers(boolean delayable) { switch (drainStatus()) { case IDLE: return !delayable; case REQUIRED: return true; case PROCESSING_TO_IDLE: case PROCESSING_TO_REQUIRED: return false; default: throw new IllegalStateException(); } }
BoundedBuffer
它是一個striped、非阻塞、有界限的buffer,繼承于/** * A striped, non-blocking, bounded buffer. * * @author [email protected] (Ben Manes) * @param <E> the type of elements maintained by this buffer */ final class BoundedBuffer<E> extends StripedBuffer<E>
類。下面看看StripedBuffer的實作:StripedBuffer
這個StripedBuffer設計的思想是跟/** * A base class providing the mechanics for supporting dynamic striping of bounded buffers. This * implementation is an adaption of the numeric 64-bit {@link java.util.concurrent.atomic.Striped64} * class, which is used by atomic counters. The approach was modified to lazily grow an array of * buffers in order to minimize memory usage for caches that are not heavily contended on. * * @author [email protected] (Doug Lea) * @author [email protected] (Ben Manes) */ abstract class StripedBuffer<E> implements Buffer<E>
Striped64
類似的,通過擴充結構把競争熱點分離。
具體實作是這樣的,StripedBuffer維護一個Buffer[]數組,每個元素就是一個
,每個線程用自己RingBuffer
threadLocalRandomProbe
屬性作為hash值,這樣就相當于每個線程都有自己“專屬”的RingBuffer,就不會産生競争啦,而不是用key的hashCode作為hash值,因為會産生熱點資料問題。
看看StripedBuffer的屬性
/** Table of buffers. When non-null, size is a power of 2. */ //RingBuffer數組 transient volatile Buffer<E> @Nullable[] table; //當進行resize時,需要整個table鎖住。tableBusy作為CAS的标記。 static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy"); static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe"); /** Number of CPUS. */ static final int NCPU = Runtime.getRuntime().availableProcessors(); /** The bound on the table size. */ //table最大size static final int MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU); /** The maximum number of attempts when trying to expand the table. */ //如果發生競争時(CAS失敗)的嘗試次數 static final int ATTEMPTS = 3; /** Table of buffers. When non-null, size is a power of 2. */ //核心資料結構 transient volatile Buffer<E> @Nullable[] table; /** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */ transient volatile int tableBusy; /** CASes the tableBusy field from 0 to 1 to acquire lock. */ final boolean casTableBusy() { return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1); } /** * Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of * packaging restrictions. */ static final int getProbe() { return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE); }
offer
方法,當沒初始化或存在競争時,則擴容為2倍。
實際是調用
的offer方法,把資料追加到RingBuffer後面。RingBuffer
最後看看@Override public int offer(E e) { int mask; int result = 0; Buffer<E> buffer; //是否不存在競争 boolean uncontended = true; Buffer<E>[] buffers = table //是否已經初始化 if ((buffers == null) || (mask = buffers.length - 1) < 0 //用thread的随機值作為hash值,得到對應位置的RingBuffer || (buffer = buffers[getProbe() & mask]) == null //檢查追加到RingBuffer是否成功 || !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) { //其中一個符合條件則進行擴容 expandOrRetry(e, uncontended); } return result; } /** * Handles cases of updates involving initialization, resizing, creating new Buffers, and/or * contention. See above for explanation. This method suffers the usual non-modularity problems of * optimistic retry code, relying on rechecked sets of reads. * * @param e the element to add * @param wasUncontended false if CAS failed before call */ //這個方法比較長,但思路還是相對清晰的。 @SuppressWarnings("PMD.ConfusingTernary") final void expandOrRetry(E e, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (int attempt = 0; attempt < ATTEMPTS; attempt++) { Buffer<E>[] buffers; Buffer<E> buffer; int n; if (((buffers = table) != null) && ((n = buffers.length) > 0)) { if ((buffer = buffers[(n - 1) & h]) == null) { if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer boolean created = false; try { // Recheck under lock Buffer<E>[] rs; int mask, j; if (((rs = table) != null) && ((mask = rs.length) > 0) && (rs[j = (mask - 1) & h] == null)) { rs[j] = create(e); created = true; } } finally { tableBusy = 0; } if (created) { break; } continue; // Slot is now non-empty } collide = false; } else if (!wasUncontended) { // CAS already known to fail wasUncontended = true; // Continue after rehash } else if (buffer.offer(e) != Buffer.FAILED) { break; } else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) { collide = false; // At max size or stale } else if (!collide) { collide = true; } else if (tableBusy == 0 && casTableBusy()) { try { if (table == buffers) { // Expand table unless stale table = Arrays.copyOf(buffers, n << 1); } } finally { tableBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) { boolean init = false; try { // Initialize table if (table == buffers) { @SuppressWarnings({"unchecked", "rawtypes"}) Buffer<E>[] rs = new Buffer[1]; rs[0] = create(e); table = rs; init = true; } } finally { tableBusy = 0; } if (init) { break; } } } }
,注意RingBuffer是RingBuffer
的内部類。BoundedBuffer
/** The maximum number of elements per buffer. */ static final int BUFFER_SIZE = 16; // Assume 4-byte references and 64-byte cache line (16 elements per line) //256長度,但是是以16為機關,是以最多存放16個元素 static final int SPACED_SIZE = BUFFER_SIZE << 4; static final int SPACED_MASK = SPACED_SIZE - 1; static final int OFFSET = 16; //RingBuffer數組 final AtomicReferenceArray<E> buffer; //插入方法 @Override public int offer(E e) { long head = readCounter; long tail = relaxedWriteCounter(); //用head和tail來限制個數 long size = (tail - head); if (size >= SPACED_SIZE) { return Buffer.FULL; } //tail追加16 if (casWriteCounter(tail, tail + OFFSET)) { //用tail“取餘”得到下标 int index = (int) (tail & SPACED_MASK); //用unsafe.putOrderedObject設值 buffer.lazySet(index, e); return Buffer.SUCCESS; } //如果CAS失敗則傳回失敗 return Buffer.FAILED; } //用consumer來處理buffer的資料 @Override public void drainTo(Consumer<E> consumer) { long head = readCounter; long tail = relaxedWriteCounter(); //判斷資料多少 long size = (tail - head); if (size == 0) { return; } do { int index = (int) (head & SPACED_MASK); E e = buffer.get(index); if (e == null) { // not published yet break; } buffer.lazySet(index, null); consumer.accept(e); //head也跟tail一樣,每次遞增16 head += OFFSET; } while (head != tail); lazySetReadCounter(head); }
注意,ring buffer的size(固定是16個)是不變的,變的是head和tail而已。
總的來說ReadBuffer有如下特點:
- 使用 Striped-RingBuffer來提升對buffer的讀寫
- 用thread的hash來避開熱點key的競争
- 允許寫入的丢失
WriteBuffer
writeBuffer跟readBuffer不一樣,主要展現在使用場景的不一樣。本來緩存的一般場景是讀多寫少的,讀的并發會更高,且afterRead顯得沒那麼重要,允許延遲甚至丢失。寫不一樣,寫afterWrite不允許丢失,且要求盡量馬上執行。Caffeine使用MPSC(Multiple Producer / Single Consumer)作為buffer數組,實作在MpscGrowableArrayQueue
類,它是仿照JCTools的MpscGrowableArrayQueue來寫的。
MPSC允許無鎖的高并發寫入,但隻允許一個消費者,同時也犧牲了部分操作。
MPSC我打算另外分析,這裡不展開了。
TimerWheel
除了支援
和expireAfterAccess
之外(Guava Cache也支援這兩個特性),Caffeine還支援expireAfterWrite
expireAfter
。因為expireAfterAccess和expireAfterWrite都隻能是固定的過期時間,這可能滿足不了某些場景,譬如記錄的過期時間是需要根據某些條件而不一樣的,這就需要使用者自定義過期時間。
先看看expireAfter的用法
private static LoadingCache<String, String> cache = Caffeine.newBuilder() .maximumSize(256L) .initialCapacity(1) //.expireAfterAccess(2, TimeUnit.DAYS) //.expireAfterWrite(2, TimeUnit.HOURS) .refreshAfterWrite(1, TimeUnit.HOURS) //自定義過期時間 .expireAfter(new Expiry<String, String>() { //傳回建立後的過期時間 @Override public long expireAfterCreate(@NonNull String key, @NonNull String value, long currentTime) { return 0; } //傳回更新後的過期時間 @Override public long expireAfterUpdate(@NonNull String key, @NonNull String value, long currentTime, @NonNegative long currentDuration) { return 0; } //傳回讀取後的過期時間 @Override public long expireAfterRead(@NonNull String key, @NonNull String value, long currentTime, @NonNegative long currentDuration) { return 0; } }) .recordStats() .build(new CacheLoader<String, String>() { @Nullable @Override public String load(@NonNull String key) throws Exception { return "value_" + key; } });
通過自定義過期時間,使得不同的key可以動态的得到不同的過期時間。
注意,我把expireAfterAccess和expireAfterWrite注釋了,因為這兩個特性不能跟expireAfter一起使用。
而當使用了expireAfter特性後,Caffeine會啟用一種叫“時間輪”的算法來實作這個功能。更多關于時間輪的介紹,可以看我的文章HashedWheelTimer時間輪原理分析。
好,重點來了,為什麼要用時間輪?
對expireAfterAccess和expireAfterWrite的實作是用一個
AccessOrderDeque
雙端隊列,它是FIFO的,因為它們的過期時間是固定的,是以在隊列頭的資料肯定是最早過期的,要處理過期資料時,隻需要首先看看頭部是否過期,然後再挨個檢查就可以了。但是,如果過期時間不一樣的話,這需要對accessOrderQueue進行排序&插入,這個代價太大了。于是,Caffeine用了一種更加高效、優雅的算法-時間輪。
時間輪的結構:
萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 upload successful
因為在我的對時間輪分析的文章裡已經說了時間輪的原理和機制了,是以我就不展開Caffeine對時間輪的實作了。
Caffeine對時間輪的實作在
TimerWheel
,它是一種多層時間輪(hierarchical timing wheels )。
看看元素加入到時間輪的
方法:schedule
/** * Schedules a timer event for the node. * * @param node the entry in the cache */ public void schedule(@NonNull Node<K, V> node) { Node<K, V> sentinel = findBucket(node.getVariableTime()); link(sentinel, node); } /** * Determines the bucket that the timer event should be added to. * * @param time the time when the event fires * @return the sentinel at the head of the bucket */ Node<K, V> findBucket(long time) { long duration = time - nanos; int length = wheel.length - 1; for (int i = 0; i < length; i++) { if (duration < SPANS[i + 1]) { long ticks = (time >>> SHIFT[i]); int index = (int) (ticks & (wheel[i].length - 1)); return wheel[i][index]; } } return wheel[length][0]; } /** Adds the entry at the tail of the bucket's list. */ void link(Node<K, V> sentinel, Node<K, V> node) { node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder()); node.setNextInVariableOrder(sentinel); sentinel.getPreviousInVariableOrder().setNextInVariableOrder(node); sentinel.setPreviousInVariableOrder(node); }
其他
Caffeine還有其他的優化性能的手段,如使用軟引用和弱引用、消除僞共享、CompletableFuture異步等等。總結
Caffeien是一個優秀的本地緩存,通過使用W-TinyLFU算法, 高性能的readBuffer和WriteBuffer,時間輪算法等,使得它擁有高性能,高命中率(near optimal),低記憶體占用等特點。
歡迎加入我的知識星球,一起探讨架構,交流源碼。加入方式,長按下方二維碼噢:
已在知識星球更新源碼解析如下:萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 萬字詳解本地緩存之王 Caffeine,SpringBoot2.X 官方推薦~概要與Guava Cache比較使用CaffeineCaffeine的高性能設計總結 最近更新《芋道 SpringBoot 2.X 入門》系列,已經 20 餘篇,覆寫了 MyBatis、Redis、MongoDB、ES、分庫分表、讀寫分離、SpringMVC、Webflux、權限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能測試等等内容。
提供近 3W 行代碼的 SpringBoot 示例,以及超 4W 行代碼的電商微服務項目。
擷取方式:點“在看”,關注公衆号并回複 666 領取,更多内容陸續奉上。
兄弟,艿一口,點個贊!????